1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <GRPCVehicleHardware.h>
18
19 #include "ProtoMessageConverter.h"
20
21 #include <android-base/logging.h>
22 #include <grpc++/grpc++.h>
23
24 #include <cstdlib>
25 #include <mutex>
26 #include <shared_mutex>
27 #include <utility>
28
29 namespace android::hardware::automotive::vehicle::virtualization {
30
getChannelCredentials()31 static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
32 // TODO(chenhaosjtuacm): get secured credentials here
33 return ::grpc::InsecureChannelCredentials();
34 }
35
GRPCVehicleHardware(std::string service_addr)36 GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
37 : mServiceAddr(std::move(service_addr)),
38 mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
39 mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)),
40 mValuePollingThread([this] { ValuePollingLoop(); }) {}
41
42 // Only used for unit testing.
GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)43 GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)
44 : mServiceAddr(""),
45 mGrpcChannel(nullptr),
46 mGrpcStub(std::move(stub)),
47 mValuePollingThread([] {}) {}
48
~GRPCVehicleHardware()49 GRPCVehicleHardware::~GRPCVehicleHardware() {
50 {
51 std::lock_guard lck(mShutdownMutex);
52 mShuttingDownFlag.store(true);
53 }
54 mShutdownCV.notify_all();
55 mValuePollingThread.join();
56 }
57
getAllPropertyConfigs() const58 std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
59 std::vector<aidlvhal::VehiclePropConfig> configs;
60 ::grpc::ClientContext context;
61 auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
62 proto::VehiclePropConfig protoConfig;
63 while (config_stream->Read(&protoConfig)) {
64 aidlvhal::VehiclePropConfig config;
65 proto_msg_converter::protoToAidl(protoConfig, &config);
66 configs.push_back(std::move(config));
67 }
68 auto grpc_status = config_stream->Finish();
69 if (!grpc_status.ok()) {
70 LOG(ERROR) << __func__
71 << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
72 }
73 return configs;
74 }
75
setValues(std::shared_ptr<const SetValuesCallback> callback,const std::vector<aidlvhal::SetValueRequest> & requests)76 aidlvhal::StatusCode GRPCVehicleHardware::setValues(
77 std::shared_ptr<const SetValuesCallback> callback,
78 const std::vector<aidlvhal::SetValueRequest>& requests) {
79 ::grpc::ClientContext context;
80 proto::VehiclePropValueRequests protoRequests;
81 proto::SetValueResults protoResults;
82 for (const auto& request : requests) {
83 auto& protoRequest = *protoRequests.add_requests();
84 protoRequest.set_request_id(request.requestId);
85 proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value());
86 }
87 // TODO(chenhaosjtuacm): Make it Async.
88 auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults);
89 if (!grpc_status.ok()) {
90 LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message();
91 {
92 std::shared_lock lck(mCallbackMutex);
93 // TODO(chenhaosjtuacm): call on-set-error callback.
94 }
95 return aidlvhal::StatusCode::INTERNAL_ERROR;
96 }
97 std::vector<aidlvhal::SetValueResult> results;
98 for (const auto& protoResult : protoResults.results()) {
99 auto& result = results.emplace_back();
100 result.requestId = protoResult.request_id();
101 result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
102 // TODO(chenhaosjtuacm): call on-set-error callback.
103 }
104 (*callback)(std::move(results));
105
106 return aidlvhal::StatusCode::OK;
107 }
108
getValues(std::shared_ptr<const GetValuesCallback> callback,const std::vector<aidlvhal::GetValueRequest> & requests) const109 aidlvhal::StatusCode GRPCVehicleHardware::getValues(
110 std::shared_ptr<const GetValuesCallback> callback,
111 const std::vector<aidlvhal::GetValueRequest>& requests) const {
112 ::grpc::ClientContext context;
113 proto::VehiclePropValueRequests protoRequests;
114 proto::GetValueResults protoResults;
115 for (const auto& request : requests) {
116 auto& protoRequest = *protoRequests.add_requests();
117 protoRequest.set_request_id(request.requestId);
118 proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
119 }
120 // TODO(chenhaosjtuacm): Make it Async.
121 auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
122 if (!grpc_status.ok()) {
123 LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
124 return aidlvhal::StatusCode::INTERNAL_ERROR;
125 }
126 std::vector<aidlvhal::GetValueResult> results;
127 for (const auto& protoResult : protoResults.results()) {
128 auto& result = results.emplace_back();
129 result.requestId = protoResult.request_id();
130 result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
131 if (protoResult.has_value()) {
132 aidlvhal::VehiclePropValue value;
133 proto_msg_converter::protoToAidl(protoResult.value(), &value);
134 result.prop = std::move(value);
135 }
136 }
137 (*callback)(std::move(results));
138
139 return aidlvhal::StatusCode::OK;
140 }
141
registerOnPropertyChangeEvent(std::unique_ptr<const PropertyChangeCallback> callback)142 void GRPCVehicleHardware::registerOnPropertyChangeEvent(
143 std::unique_ptr<const PropertyChangeCallback> callback) {
144 std::lock_guard lck(mCallbackMutex);
145 if (mOnPropChange) {
146 LOG(ERROR) << __func__ << " must only be called once.";
147 return;
148 }
149 mOnPropChange = std::move(callback);
150 }
151
registerOnPropertySetErrorEvent(std::unique_ptr<const PropertySetErrorCallback> callback)152 void GRPCVehicleHardware::registerOnPropertySetErrorEvent(
153 std::unique_ptr<const PropertySetErrorCallback> callback) {
154 std::lock_guard lck(mCallbackMutex);
155 if (mOnSetErr) {
156 LOG(ERROR) << __func__ << " must only be called once.";
157 return;
158 }
159 mOnSetErr = std::move(callback);
160 }
161
dump(const std::vector<std::string> & options)162 DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) {
163 ::grpc::ClientContext context;
164 proto::DumpOptions protoDumpOptions;
165 proto::DumpResult protoDumpResult;
166 for (const auto& option : options) {
167 protoDumpOptions.add_options(option);
168 }
169 auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult);
170 if (!grpc_status.ok()) {
171 LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message();
172 return {};
173 }
174 return {
175 .callerShouldDumpState = protoDumpResult.caller_should_dump_state(),
176 .buffer = protoDumpResult.buffer(),
177 };
178 }
179
checkHealth()180 aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() {
181 ::grpc::ClientContext context;
182 proto::VehicleHalCallStatus protoStatus;
183 auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus);
184 if (!grpc_status.ok()) {
185 LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message();
186 return aidlvhal::StatusCode::INTERNAL_ERROR;
187 }
188 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
189 }
190
subscribe(aidlvhal::SubscribeOptions options)191 aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) {
192 proto::SubscribeRequest request;
193 ::grpc::ClientContext context;
194 proto::VehicleHalCallStatus protoStatus;
195 proto_msg_converter::aidlToProto(options, request.mutable_options());
196 auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus);
197 if (!grpc_status.ok()) {
198 if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
199 // This is a legacy sever. It should handle updateSampleRate.
200 LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server";
201 return aidlvhal::StatusCode::OK;
202 }
203 LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message();
204 return aidlvhal::StatusCode::INTERNAL_ERROR;
205 }
206 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
207 }
208
unsubscribe(int32_t propId,int32_t areaId)209 aidlvhal::StatusCode GRPCVehicleHardware::unsubscribe(int32_t propId, int32_t areaId) {
210 proto::UnsubscribeRequest request;
211 ::grpc::ClientContext context;
212 proto::VehicleHalCallStatus protoStatus;
213 request.set_prop_id(propId);
214 request.set_area_id(areaId);
215 auto grpc_status = mGrpcStub->Unsubscribe(&context, request, &protoStatus);
216 if (!grpc_status.ok()) {
217 if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
218 // This is a legacy sever. Ignore unsubscribe request.
219 LOG(INFO) << __func__ << ": GRPC Unsubscribe is not supported by the server";
220 return aidlvhal::StatusCode::OK;
221 }
222 LOG(ERROR) << __func__ << ": GRPC Unsubscribe Failed: " << grpc_status.error_message();
223 return aidlvhal::StatusCode::INTERNAL_ERROR;
224 }
225 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
226 }
227
updateSampleRate(int32_t propId,int32_t areaId,float sampleRate)228 aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId,
229 float sampleRate) {
230 ::grpc::ClientContext context;
231 proto::UpdateSampleRateRequest request;
232 proto::VehicleHalCallStatus protoStatus;
233 request.set_prop(propId);
234 request.set_area_id(areaId);
235 request.set_sample_rate(sampleRate);
236 auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus);
237 if (!grpc_status.ok()) {
238 LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message();
239 return aidlvhal::StatusCode::INTERNAL_ERROR;
240 }
241 return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
242 }
243
waitForConnected(std::chrono::milliseconds waitTime)244 bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {
245 return mGrpcChannel->WaitForConnected(gpr_time_add(
246 gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN)));
247 }
248
ValuePollingLoop()249 void GRPCVehicleHardware::ValuePollingLoop() {
250 while (!mShuttingDownFlag.load()) {
251 ::grpc::ClientContext context;
252
253 bool rpc_stopped{false};
254 std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
255 std::unique_lock<std::mutex> lck(mShutdownMutex);
256 mShutdownCV.wait(lck, [this, &rpc_stopped]() {
257 return rpc_stopped || mShuttingDownFlag.load();
258 });
259 context.TryCancel();
260 });
261
262 auto value_stream =
263 mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
264 LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
265 proto::VehiclePropValues protoValues;
266 while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
267 std::vector<aidlvhal::VehiclePropValue> values;
268 for (const auto protoValue : protoValues.values()) {
269 values.push_back(aidlvhal::VehiclePropValue());
270 proto_msg_converter::protoToAidl(protoValue, &values.back());
271 }
272 std::shared_lock lck(mCallbackMutex);
273 if (mOnPropChange) {
274 (*mOnPropChange)(values);
275 }
276 }
277
278 {
279 std::lock_guard lck(mShutdownMutex);
280 rpc_stopped = true;
281 }
282 mShutdownCV.notify_all();
283 shuttingdown_watcher.join();
284
285 auto grpc_status = value_stream->Finish();
286 // never reach here until connection lost
287 LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
288
289 // try to reconnect
290 }
291 }
292
293 } // namespace android::hardware::automotive::vehicle::virtualization
294