1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "GRPCVehicleProxyServer.h"
18
19 #include "ProtoMessageConverter.h"
20
21 #include <grpc++/grpc++.h>
22
23 #include <android-base/logging.h>
24
25 #include <algorithm>
26 #include <condition_variable>
27 #include <mutex>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31
32 namespace android::hardware::automotive::vehicle::virtualization {
33
34 namespace {
getServerCredentials()35 std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
36 return ::grpc::InsecureServerCredentials();
37 }
38
39 template <class ProtoRequestType>
getPropIdAreaIdsFromProtoRequest(const ProtoRequestType * request)40 std::vector<PropIdAreaId> getPropIdAreaIdsFromProtoRequest(const ProtoRequestType* request) {
41 std::vector<PropIdAreaId> propIdAreaIds;
42 for (const proto::PropIdAreaId& protoPropIdAreaId : request->prop_id_area_id()) {
43 PropIdAreaId aidlPropIdAreaId = {};
44 proto_msg_converter::protoToAidl(protoPropIdAreaId, &aidlPropIdAreaId);
45 propIdAreaIds.push_back(aidlPropIdAreaId);
46 }
47 return propIdAreaIds;
48 }
49
50 template <class AidlResultType, class ProtoResultType>
aidlResultsToProtoResults(const AidlResultType & aidlResults,ProtoResultType * result)51 void aidlResultsToProtoResults(const AidlResultType& aidlResults, ProtoResultType* result) {
52 for (const auto& aidlResult : aidlResults) {
53 auto* protoResult = result->add_result();
54 proto_msg_converter::aidlToProto(aidlResult, protoResult);
55 }
56 }
57 } // namespace
58
59 std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
60
GrpcVehicleProxyServer(std::string serverAddr,std::unique_ptr<IVehicleHardware> && hardware)61 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
62 std::unique_ptr<IVehicleHardware>&& hardware)
63 : GrpcVehicleProxyServer(std::vector<std::string>({serverAddr}), std::move(hardware)) {};
64
GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,std::unique_ptr<IVehicleHardware> && hardware)65 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,
66 std::unique_ptr<IVehicleHardware>&& hardware)
67 : mServiceAddrs(std::move(serverAddrs)), mHardware(std::move(hardware)) {
68 mHardware->registerOnPropertyChangeEvent(
69 std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
70 [this](std::vector<aidlvhal::VehiclePropValue> values) {
71 OnVehiclePropChange(values);
72 }));
73 }
74
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropConfig> * stream)75 ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
76 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
77 ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
78 for (const auto& config : mHardware->getAllPropertyConfigs()) {
79 proto::VehiclePropConfig protoConfig;
80 proto_msg_converter::aidlToProto(config, &protoConfig);
81 if (!stream->Write(protoConfig)) {
82 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
83 }
84 }
85 return ::grpc::Status::OK;
86 }
87
SetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::SetValueResults * results)88 ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
89 const proto::VehiclePropValueRequests* requests,
90 proto::SetValueResults* results) {
91 std::vector<aidlvhal::SetValueRequest> aidlRequests;
92 std::unordered_set<int64_t> requestIds;
93 for (const auto& protoRequest : requests->requests()) {
94 auto& aidlRequest = aidlRequests.emplace_back();
95 int64_t requestId = protoRequest.request_id();
96 aidlRequest.requestId = requestId;
97 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
98 requestIds.insert(requestId);
99 }
100 auto waitMtx = std::make_shared<std::mutex>();
101 auto waitCV = std::make_shared<std::condition_variable>();
102 auto complete = std::make_shared<bool>(false);
103 auto tmpResults = std::make_shared<proto::SetValueResults>();
104 auto aidlStatus = mHardware->setValues(
105 std::make_shared<const IVehicleHardware::SetValuesCallback>(
106 [waitMtx, waitCV, complete, tmpResults,
107 &requestIds](std::vector<aidlvhal::SetValueResult> setValueResults) {
108 bool receivedAllResults = false;
109 {
110 std::lock_guard lck(*waitMtx);
111 for (const auto& aidlResult : setValueResults) {
112 auto& protoResult = *tmpResults->add_results();
113 int64_t requestIdForResult = aidlResult.requestId;
114 protoResult.set_request_id(requestIdForResult);
115 protoResult.set_status(
116 static_cast<proto::StatusCode>(aidlResult.status));
117 requestIds.erase(requestIdForResult);
118 }
119 if (requestIds.empty()) {
120 receivedAllResults = true;
121 *complete = true;
122 }
123 }
124 if (receivedAllResults) {
125 waitCV->notify_all();
126 }
127 }),
128 aidlRequests);
129 if (aidlStatus != aidlvhal::StatusCode::OK) {
130 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
131 "The underlying hardware fails to set values, VHAL status: " +
132 toString(aidlStatus));
133 }
134 std::unique_lock lck(*waitMtx);
135 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
136 if (!success) {
137 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
138 "The underlying hardware set values timeout.");
139 }
140 *results = std::move(*tmpResults);
141 return ::grpc::Status::OK;
142 }
143
GetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::GetValueResults * results)144 ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
145 const proto::VehiclePropValueRequests* requests,
146 proto::GetValueResults* results) {
147 std::vector<aidlvhal::GetValueRequest> aidlRequests;
148 std::unordered_set<int64_t> requestIds;
149 for (const auto& protoRequest : requests->requests()) {
150 auto& aidlRequest = aidlRequests.emplace_back();
151 int64_t requestId = protoRequest.request_id();
152 aidlRequest.requestId = requestId;
153 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
154 requestIds.insert(requestId);
155 }
156 auto waitMtx = std::make_shared<std::mutex>();
157 auto waitCV = std::make_shared<std::condition_variable>();
158 auto complete = std::make_shared<bool>(false);
159 auto tmpResults = std::make_shared<proto::GetValueResults>();
160 auto aidlStatus = mHardware->getValues(
161 std::make_shared<const IVehicleHardware::GetValuesCallback>(
162 [waitMtx, waitCV, complete, tmpResults,
163 &requestIds](std::vector<aidlvhal::GetValueResult> getValueResults) {
164 bool receivedAllResults = false;
165 {
166 std::lock_guard lck(*waitMtx);
167 for (const auto& aidlResult : getValueResults) {
168 auto& protoResult = *tmpResults->add_results();
169 int64_t requestIdForResult = aidlResult.requestId;
170 protoResult.set_request_id(requestIdForResult);
171 protoResult.set_status(
172 static_cast<proto::StatusCode>(aidlResult.status));
173 if (aidlResult.prop) {
174 auto* valuePtr = protoResult.mutable_value();
175 proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
176 }
177 requestIds.erase(requestIdForResult);
178 }
179 if (requestIds.empty()) {
180 receivedAllResults = true;
181 *complete = true;
182 }
183 }
184 if (receivedAllResults) {
185 waitCV->notify_all();
186 }
187 }),
188 aidlRequests);
189 if (aidlStatus != aidlvhal::StatusCode::OK) {
190 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
191 "The underlying hardware fails to get values, VHAL status: " +
192 toString(aidlStatus));
193 }
194 std::unique_lock lck(*waitMtx);
195 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
196 if (!success) {
197 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
198 "The underlying hardware get values timeout.");
199 }
200 *results = std::move(*tmpResults);
201 return ::grpc::Status::OK;
202 }
203
UpdateSampleRate(::grpc::ServerContext * context,const proto::UpdateSampleRateRequest * request,proto::VehicleHalCallStatus * status)204 ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
205 ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
206 proto::VehicleHalCallStatus* status) {
207 const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
208 request->sample_rate());
209 status->set_status_code(static_cast<proto::StatusCode>(status_code));
210 return ::grpc::Status::OK;
211 }
212
Subscribe(::grpc::ServerContext * context,const proto::SubscribeRequest * request,proto::VehicleHalCallStatus * status)213 ::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
214 const proto::SubscribeRequest* request,
215 proto::VehicleHalCallStatus* status) {
216 const auto& protoSubscribeOptions = request->options();
217 aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
218 proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
219 const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
220 status->set_status_code(static_cast<proto::StatusCode>(status_code));
221 return ::grpc::Status::OK;
222 }
223
Unsubscribe(::grpc::ServerContext * context,const proto::UnsubscribeRequest * request,proto::VehicleHalCallStatus * status)224 ::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
225 const proto::UnsubscribeRequest* request,
226 proto::VehicleHalCallStatus* status) {
227 int32_t propId = request->prop_id();
228 int32_t areaId = request->area_id();
229 const auto status_code = mHardware->unsubscribe(propId, areaId);
230 status->set_status_code(static_cast<proto::StatusCode>(status_code));
231 return ::grpc::Status::OK;
232 }
233
CheckHealth(::grpc::ServerContext * context,const::google::protobuf::Empty *,proto::VehicleHalCallStatus * status)234 ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
235 const ::google::protobuf::Empty*,
236 proto::VehicleHalCallStatus* status) {
237 status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
238 return ::grpc::Status::OK;
239 }
240
Dump(::grpc::ServerContext * context,const proto::DumpOptions * options,proto::DumpResult * result)241 ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
242 const proto::DumpOptions* options,
243 proto::DumpResult* result) {
244 std::vector<std::string> dumpOptionStrings(options->options().begin(),
245 options->options().end());
246 auto dumpResult = mHardware->dump(dumpOptionStrings);
247 result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
248 result->set_buffer(dumpResult.buffer);
249 result->set_refresh_property_configs(dumpResult.refreshPropertyConfigs);
250 return ::grpc::Status::OK;
251 }
252
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropValues> * stream)253 ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
254 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
255 ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
256 auto conn = std::make_shared<ConnectionDescriptor>(stream);
257 {
258 std::lock_guard lck(mConnectionMutex);
259 mValueStreamingConnections.push_back(conn);
260 }
261 conn->Wait();
262 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
263 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
264 }
265
GetMinMaxSupportedValues(::grpc::ServerContext * context,const proto::GetMinMaxSupportedValuesRequest * request,proto::GetMinMaxSupportedValuesResult * result)266 ::grpc::Status GrpcVehicleProxyServer::GetMinMaxSupportedValues(
267 ::grpc::ServerContext* context, const proto::GetMinMaxSupportedValuesRequest* request,
268 proto::GetMinMaxSupportedValuesResult* result) {
269 std::vector<PropIdAreaId> propIdAreaIds = getPropIdAreaIdsFromProtoRequest(request);
270 std::vector<aidlvhal::MinMaxSupportedValueResult> minMaxSupportedValueResults =
271 mHardware->getMinMaxSupportedValues(propIdAreaIds);
272 aidlResultsToProtoResults(minMaxSupportedValueResults, result);
273 return ::grpc::Status::OK;
274 }
275
GetSupportedValuesLists(::grpc::ServerContext * context,const proto::GetSupportedValuesListsRequest * request,proto::GetSupportedValuesListsResult * result)276 ::grpc::Status GrpcVehicleProxyServer::GetSupportedValuesLists(
277 ::grpc::ServerContext* context, const proto::GetSupportedValuesListsRequest* request,
278 proto::GetSupportedValuesListsResult* result) {
279 std::vector<PropIdAreaId> propIdAreaIds = getPropIdAreaIdsFromProtoRequest(request);
280 std::vector<aidlvhal::SupportedValuesListResult> supportedValuesListResults =
281 mHardware->getSupportedValuesLists(propIdAreaIds);
282 aidlResultsToProtoResults(supportedValuesListResults, result);
283 return ::grpc::Status::OK;
284 }
285
OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue> & values)286 void GrpcVehicleProxyServer::OnVehiclePropChange(
287 const std::vector<aidlvhal::VehiclePropValue>& values) {
288 std::unordered_set<uint64_t> brokenConn;
289 proto::VehiclePropValues protoValues;
290 for (const auto& value : values) {
291 auto* protoValuePtr = protoValues.add_values();
292 proto_msg_converter::aidlToProto(value, protoValuePtr);
293 }
294 {
295 std::shared_lock read_lock(mConnectionMutex);
296 for (auto& connection : mValueStreamingConnections) {
297 auto writeOK = connection->Write(protoValues);
298 if (!writeOK) {
299 LOG(ERROR) << __func__
300 << ": Server Write failed, connection lost. ID: " << connection->ID();
301 brokenConn.insert(connection->ID());
302 }
303 }
304 }
305 if (brokenConn.empty()) {
306 return;
307 }
308 std::unique_lock write_lock(mConnectionMutex);
309 mValueStreamingConnections.erase(
310 std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
311 [&brokenConn](const auto& conn) {
312 return brokenConn.find(conn->ID()) != brokenConn.end();
313 }),
314 mValueStreamingConnections.end());
315 }
316
Start()317 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
318 if (mServer) {
319 LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
320 return *this;
321 }
322 ::grpc::ServerBuilder builder;
323 builder.RegisterService(this);
324 for (const std::string& serviceAddr : mServiceAddrs) {
325 builder.AddListeningPort(serviceAddr, getServerCredentials());
326 }
327 mServer = builder.BuildAndStart();
328 CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
329 << "please make sure the configuration and permissions are correct";
330 return *this;
331 }
332
Shutdown()333 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
334 std::shared_lock read_lock(mConnectionMutex);
335 for (auto& conn : mValueStreamingConnections) {
336 conn->Shutdown();
337 }
338 if (mServer) {
339 mServer->Shutdown();
340 }
341 return *this;
342 }
343
Wait()344 void GrpcVehicleProxyServer::Wait() {
345 if (mServer) {
346 mServer->Wait();
347 }
348 mServer.reset();
349 }
350
~ConnectionDescriptor()351 GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
352 Shutdown();
353 }
354
Write(const proto::VehiclePropValues & values)355 bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
356 if (!mStream) {
357 LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
358 Shutdown();
359 return false;
360 }
361 {
362 std::lock_guard lck(*mMtx);
363 if (!mShutdownFlag && mStream->Write(values)) {
364 return true;
365 } else {
366 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
367 }
368 }
369 Shutdown();
370 return false;
371 }
372
Wait()373 void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
374 std::unique_lock lck(*mMtx);
375 mCV->wait(lck, [this] { return mShutdownFlag; });
376 }
377
Shutdown()378 void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
379 {
380 std::lock_guard lck(*mMtx);
381 mShutdownFlag = true;
382 }
383 mCV->notify_all();
384 }
385
386 } // namespace android::hardware::automotive::vehicle::virtualization
387