1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "GrpcVehicleClient.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <thread>
21
22 #include <android-base/logging.h>
23 #include <grpc++/grpc++.h>
24
25 #include "VehicleServer.grpc.pb.h"
26 #include "VehicleServer.pb.h"
27 #include "vhal_v2_0/DefaultConfig.h"
28 #include "vhal_v2_0/ProtoMessageConverter.h"
29
30 namespace android {
31 namespace hardware {
32 namespace automotive {
33 namespace vehicle {
34 namespace V2_0 {
35
36 namespace impl {
37
getChannelCredentials()38 static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
39 // TODO(chenhaosjtuacm): get secured credentials here
40 return ::grpc::InsecureChannelCredentials();
41 }
42
43 class GrpcVehicleClientImpl : public VehicleHalClient {
44 public:
GrpcVehicleClientImpl(const std::string & addr)45 explicit GrpcVehicleClientImpl(const std::string& addr)
46 : mServiceAddr(addr),
47 mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
48 mGrpcStub(vhal_proto::VehicleServer::NewStub(mGrpcChannel)) {
49 StartValuePollingThread();
50 }
51
~GrpcVehicleClientImpl()52 ~GrpcVehicleClientImpl() {
53 mShuttingDownFlag.store(true);
54 mShutdownCV.notify_all();
55
56 if (mPollingThread.joinable()) {
57 mPollingThread.join();
58 }
59 }
60
61 // methods from IVehicleClient
62
63 std::vector<VehiclePropConfig> getAllPropertyConfig() const override;
64
65 StatusCode setProperty(const VehiclePropValue& value, bool updateStatus) override;
66
67 private:
68 void StartValuePollingThread();
69
70 // private data members
71
72 std::string mServiceAddr;
73 std::shared_ptr<::grpc::Channel> mGrpcChannel;
74 std::unique_ptr<vhal_proto::VehicleServer::Stub> mGrpcStub;
75 std::thread mPollingThread;
76
77 std::mutex mShutdownMutex;
78 std::condition_variable mShutdownCV;
79 std::atomic<bool> mShuttingDownFlag{false};
80 };
81
makeGrpcVehicleClient(const std::string & addr)82 std::unique_ptr<VehicleHalClient> makeGrpcVehicleClient(const std::string& addr) {
83 return std::make_unique<GrpcVehicleClientImpl>(addr);
84 }
85
getAllPropertyConfig() const86 std::vector<VehiclePropConfig> GrpcVehicleClientImpl::getAllPropertyConfig() const {
87 std::vector<VehiclePropConfig> configs;
88 ::grpc::ClientContext context;
89 auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
90 vhal_proto::VehiclePropConfig protoConfig;
91 while (config_stream->Read(&protoConfig)) {
92 VehiclePropConfig config;
93 proto_msg_converter::fromProto(&config, protoConfig);
94 configs.emplace_back(std::move(config));
95 }
96 auto grpc_status = config_stream->Finish();
97 if (!grpc_status.ok()) {
98 LOG(ERROR) << __func__
99 << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
100 configs.clear();
101 }
102
103 return configs;
104 }
105
setProperty(const VehiclePropValue & value,bool updateStatus)106 StatusCode GrpcVehicleClientImpl::setProperty(const VehiclePropValue& value, bool updateStatus) {
107 ::grpc::ClientContext context;
108 vhal_proto::WrappedVehiclePropValue wrappedProtoValue;
109 vhal_proto::VehicleHalCallStatus vhal_status;
110 proto_msg_converter::toProto(wrappedProtoValue.mutable_value(), value);
111 wrappedProtoValue.set_update_status(updateStatus);
112
113 auto grpc_status = mGrpcStub->SetProperty(&context, wrappedProtoValue, &vhal_status);
114 if (!grpc_status.ok()) {
115 LOG(ERROR) << __func__ << ": GRPC SetProperty Failed: " << grpc_status.error_message();
116 return StatusCode::INTERNAL_ERROR;
117 }
118
119 return static_cast<StatusCode>(vhal_status.status_code());
120 }
121
StartValuePollingThread()122 void GrpcVehicleClientImpl::StartValuePollingThread() {
123 mPollingThread = std::thread([this]() {
124 while (!mShuttingDownFlag.load()) {
125 ::grpc::ClientContext context;
126
127 std::atomic<bool> rpc_ok{true};
128 std::thread shuttingdown_watcher([this, &rpc_ok, &context]() {
129 std::unique_lock<std::mutex> shutdownLock(mShutdownMutex);
130 mShutdownCV.wait(shutdownLock, [this, &rpc_ok]() {
131 return !rpc_ok.load() || mShuttingDownFlag.load();
132 });
133 context.TryCancel();
134 });
135
136 auto value_stream =
137 mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
138 LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
139 vhal_proto::WrappedVehiclePropValue wrappedProtoValue;
140 while (!mShuttingDownFlag.load() && value_stream->Read(&wrappedProtoValue)) {
141 VehiclePropValue value;
142 proto_msg_converter::fromProto(&value, wrappedProtoValue.value());
143 onPropertyValue(value, wrappedProtoValue.update_status());
144 }
145
146 rpc_ok.store(false);
147 mShutdownCV.notify_all();
148 shuttingdown_watcher.join();
149
150 auto grpc_status = value_stream->Finish();
151 // never reach here until connection lost
152 LOG(ERROR) << __func__
153 << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
154
155 // try to reconnect
156 }
157 });
158 }
159
160 } // namespace impl
161
162 } // namespace V2_0
163 } // namespace vehicle
164 } // namespace automotive
165 } // namespace hardware
166 } // namespace android
167