• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "GRPCVehicleProxyServer.h"
18 
19 #include "ProtoMessageConverter.h"
20 
21 #include <grpc++/grpc++.h>
22 
23 #include <android-base/logging.h>
24 
25 #include <algorithm>
26 #include <condition_variable>
27 #include <mutex>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 
32 namespace android::hardware::automotive::vehicle::virtualization {
33 
34 namespace {
getServerCredentials()35 std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
36     return ::grpc::InsecureServerCredentials();
37 }
38 
39 template <class ProtoRequestType>
getPropIdAreaIdsFromProtoRequest(const ProtoRequestType * request)40 std::vector<PropIdAreaId> getPropIdAreaIdsFromProtoRequest(const ProtoRequestType* request) {
41     std::vector<PropIdAreaId> propIdAreaIds;
42     for (const proto::PropIdAreaId& protoPropIdAreaId : request->prop_id_area_id()) {
43         PropIdAreaId aidlPropIdAreaId = {};
44         proto_msg_converter::protoToAidl(protoPropIdAreaId, &aidlPropIdAreaId);
45         propIdAreaIds.push_back(aidlPropIdAreaId);
46     }
47     return propIdAreaIds;
48 }
49 
50 template <class AidlResultType, class ProtoResultType>
aidlResultsToProtoResults(const AidlResultType & aidlResults,ProtoResultType * result)51 void aidlResultsToProtoResults(const AidlResultType& aidlResults, ProtoResultType* result) {
52     for (const auto& aidlResult : aidlResults) {
53         auto* protoResult = result->add_result();
54         proto_msg_converter::aidlToProto(aidlResult, protoResult);
55     }
56 }
57 }  // namespace
58 
59 std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
60 
GrpcVehicleProxyServer(std::string serverAddr,std::unique_ptr<IVehicleHardware> && hardware)61 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
62                                                std::unique_ptr<IVehicleHardware>&& hardware)
63     : GrpcVehicleProxyServer(std::vector<std::string>({serverAddr}), std::move(hardware)) {};
64 
GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,std::unique_ptr<IVehicleHardware> && hardware)65 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAddrs,
66                                                std::unique_ptr<IVehicleHardware>&& hardware)
67     : mServiceAddrs(std::move(serverAddrs)), mHardware(std::move(hardware)) {
68     mHardware->registerOnPropertyChangeEvent(
69             std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
70                     [this](std::vector<aidlvhal::VehiclePropValue> values) {
71                         OnVehiclePropChange(values);
72                     }));
73 }
74 
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropConfig> * stream)75 ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
76         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
77         ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
78     for (const auto& config : mHardware->getAllPropertyConfigs()) {
79         proto::VehiclePropConfig protoConfig;
80         proto_msg_converter::aidlToProto(config, &protoConfig);
81         if (!stream->Write(protoConfig)) {
82             return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
83         }
84     }
85     return ::grpc::Status::OK;
86 }
87 
SetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::SetValueResults * results)88 ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
89                                                  const proto::VehiclePropValueRequests* requests,
90                                                  proto::SetValueResults* results) {
91     std::vector<aidlvhal::SetValueRequest> aidlRequests;
92     std::unordered_set<int64_t> requestIds;
93     for (const auto& protoRequest : requests->requests()) {
94         auto& aidlRequest = aidlRequests.emplace_back();
95         int64_t requestId = protoRequest.request_id();
96         aidlRequest.requestId = requestId;
97         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
98         requestIds.insert(requestId);
99     }
100     auto waitMtx = std::make_shared<std::mutex>();
101     auto waitCV = std::make_shared<std::condition_variable>();
102     auto complete = std::make_shared<bool>(false);
103     auto tmpResults = std::make_shared<proto::SetValueResults>();
104     auto aidlStatus = mHardware->setValues(
105             std::make_shared<const IVehicleHardware::SetValuesCallback>(
106                     [waitMtx, waitCV, complete, tmpResults,
107                      &requestIds](std::vector<aidlvhal::SetValueResult> setValueResults) {
108                         bool receivedAllResults = false;
109                         {
110                             std::lock_guard lck(*waitMtx);
111                             for (const auto& aidlResult : setValueResults) {
112                                 auto& protoResult = *tmpResults->add_results();
113                                 int64_t requestIdForResult = aidlResult.requestId;
114                                 protoResult.set_request_id(requestIdForResult);
115                                 protoResult.set_status(
116                                         static_cast<proto::StatusCode>(aidlResult.status));
117                                 requestIds.erase(requestIdForResult);
118                             }
119                             if (requestIds.empty()) {
120                                 receivedAllResults = true;
121                                 *complete = true;
122                             }
123                         }
124                         if (receivedAllResults) {
125                             waitCV->notify_all();
126                         }
127                     }),
128             aidlRequests);
129     if (aidlStatus != aidlvhal::StatusCode::OK) {
130         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
131                               "The underlying hardware fails to set values, VHAL status: " +
132                                       toString(aidlStatus));
133     }
134     std::unique_lock lck(*waitMtx);
135     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
136     if (!success) {
137         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
138                               "The underlying hardware set values timeout.");
139     }
140     *results = std::move(*tmpResults);
141     return ::grpc::Status::OK;
142 }
143 
GetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::GetValueResults * results)144 ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
145                                                  const proto::VehiclePropValueRequests* requests,
146                                                  proto::GetValueResults* results) {
147     std::vector<aidlvhal::GetValueRequest> aidlRequests;
148     std::unordered_set<int64_t> requestIds;
149     for (const auto& protoRequest : requests->requests()) {
150         auto& aidlRequest = aidlRequests.emplace_back();
151         int64_t requestId = protoRequest.request_id();
152         aidlRequest.requestId = requestId;
153         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
154         requestIds.insert(requestId);
155     }
156     auto waitMtx = std::make_shared<std::mutex>();
157     auto waitCV = std::make_shared<std::condition_variable>();
158     auto complete = std::make_shared<bool>(false);
159     auto tmpResults = std::make_shared<proto::GetValueResults>();
160     auto aidlStatus = mHardware->getValues(
161             std::make_shared<const IVehicleHardware::GetValuesCallback>(
162                     [waitMtx, waitCV, complete, tmpResults,
163                      &requestIds](std::vector<aidlvhal::GetValueResult> getValueResults) {
164                         bool receivedAllResults = false;
165                         {
166                             std::lock_guard lck(*waitMtx);
167                             for (const auto& aidlResult : getValueResults) {
168                                 auto& protoResult = *tmpResults->add_results();
169                                 int64_t requestIdForResult = aidlResult.requestId;
170                                 protoResult.set_request_id(requestIdForResult);
171                                 protoResult.set_status(
172                                         static_cast<proto::StatusCode>(aidlResult.status));
173                                 if (aidlResult.prop) {
174                                     auto* valuePtr = protoResult.mutable_value();
175                                     proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
176                                 }
177                                 requestIds.erase(requestIdForResult);
178                             }
179                             if (requestIds.empty()) {
180                                 receivedAllResults = true;
181                                 *complete = true;
182                             }
183                         }
184                         if (receivedAllResults) {
185                             waitCV->notify_all();
186                         }
187                     }),
188             aidlRequests);
189     if (aidlStatus != aidlvhal::StatusCode::OK) {
190         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
191                               "The underlying hardware fails to get values, VHAL status: " +
192                                       toString(aidlStatus));
193     }
194     std::unique_lock lck(*waitMtx);
195     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
196     if (!success) {
197         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
198                               "The underlying hardware get values timeout.");
199     }
200     *results = std::move(*tmpResults);
201     return ::grpc::Status::OK;
202 }
203 
UpdateSampleRate(::grpc::ServerContext * context,const proto::UpdateSampleRateRequest * request,proto::VehicleHalCallStatus * status)204 ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
205         ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
206         proto::VehicleHalCallStatus* status) {
207     const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
208                                                          request->sample_rate());
209     status->set_status_code(static_cast<proto::StatusCode>(status_code));
210     return ::grpc::Status::OK;
211 }
212 
Subscribe(::grpc::ServerContext * context,const proto::SubscribeRequest * request,proto::VehicleHalCallStatus * status)213 ::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
214                                                  const proto::SubscribeRequest* request,
215                                                  proto::VehicleHalCallStatus* status) {
216     const auto& protoSubscribeOptions = request->options();
217     aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
218     proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
219     const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
220     status->set_status_code(static_cast<proto::StatusCode>(status_code));
221     return ::grpc::Status::OK;
222 }
223 
Unsubscribe(::grpc::ServerContext * context,const proto::UnsubscribeRequest * request,proto::VehicleHalCallStatus * status)224 ::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
225                                                    const proto::UnsubscribeRequest* request,
226                                                    proto::VehicleHalCallStatus* status) {
227     int32_t propId = request->prop_id();
228     int32_t areaId = request->area_id();
229     const auto status_code = mHardware->unsubscribe(propId, areaId);
230     status->set_status_code(static_cast<proto::StatusCode>(status_code));
231     return ::grpc::Status::OK;
232 }
233 
CheckHealth(::grpc::ServerContext * context,const::google::protobuf::Empty *,proto::VehicleHalCallStatus * status)234 ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
235                                                    const ::google::protobuf::Empty*,
236                                                    proto::VehicleHalCallStatus* status) {
237     status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
238     return ::grpc::Status::OK;
239 }
240 
Dump(::grpc::ServerContext * context,const proto::DumpOptions * options,proto::DumpResult * result)241 ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
242                                             const proto::DumpOptions* options,
243                                             proto::DumpResult* result) {
244     std::vector<std::string> dumpOptionStrings(options->options().begin(),
245                                                options->options().end());
246     auto dumpResult = mHardware->dump(dumpOptionStrings);
247     result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
248     result->set_buffer(dumpResult.buffer);
249     result->set_refresh_property_configs(dumpResult.refreshPropertyConfigs);
250     return ::grpc::Status::OK;
251 }
252 
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropValues> * stream)253 ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
254         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
255         ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
256     auto conn = std::make_shared<ConnectionDescriptor>(stream);
257     {
258         std::lock_guard lck(mConnectionMutex);
259         mValueStreamingConnections.push_back(conn);
260     }
261     conn->Wait();
262     LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
263     return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
264 }
265 
GetMinMaxSupportedValues(::grpc::ServerContext * context,const proto::GetMinMaxSupportedValuesRequest * request,proto::GetMinMaxSupportedValuesResult * result)266 ::grpc::Status GrpcVehicleProxyServer::GetMinMaxSupportedValues(
267         ::grpc::ServerContext* context, const proto::GetMinMaxSupportedValuesRequest* request,
268         proto::GetMinMaxSupportedValuesResult* result) {
269     std::vector<PropIdAreaId> propIdAreaIds = getPropIdAreaIdsFromProtoRequest(request);
270     std::vector<aidlvhal::MinMaxSupportedValueResult> minMaxSupportedValueResults =
271             mHardware->getMinMaxSupportedValues(propIdAreaIds);
272     aidlResultsToProtoResults(minMaxSupportedValueResults, result);
273     return ::grpc::Status::OK;
274 }
275 
GetSupportedValuesLists(::grpc::ServerContext * context,const proto::GetSupportedValuesListsRequest * request,proto::GetSupportedValuesListsResult * result)276 ::grpc::Status GrpcVehicleProxyServer::GetSupportedValuesLists(
277         ::grpc::ServerContext* context, const proto::GetSupportedValuesListsRequest* request,
278         proto::GetSupportedValuesListsResult* result) {
279     std::vector<PropIdAreaId> propIdAreaIds = getPropIdAreaIdsFromProtoRequest(request);
280     std::vector<aidlvhal::SupportedValuesListResult> supportedValuesListResults =
281             mHardware->getSupportedValuesLists(propIdAreaIds);
282     aidlResultsToProtoResults(supportedValuesListResults, result);
283     return ::grpc::Status::OK;
284 }
285 
OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue> & values)286 void GrpcVehicleProxyServer::OnVehiclePropChange(
287         const std::vector<aidlvhal::VehiclePropValue>& values) {
288     std::unordered_set<uint64_t> brokenConn;
289     proto::VehiclePropValues protoValues;
290     for (const auto& value : values) {
291         auto* protoValuePtr = protoValues.add_values();
292         proto_msg_converter::aidlToProto(value, protoValuePtr);
293     }
294     {
295         std::shared_lock read_lock(mConnectionMutex);
296         for (auto& connection : mValueStreamingConnections) {
297             auto writeOK = connection->Write(protoValues);
298             if (!writeOK) {
299                 LOG(ERROR) << __func__
300                            << ": Server Write failed, connection lost. ID: " << connection->ID();
301                 brokenConn.insert(connection->ID());
302             }
303         }
304     }
305     if (brokenConn.empty()) {
306         return;
307     }
308     std::unique_lock write_lock(mConnectionMutex);
309     mValueStreamingConnections.erase(
310             std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
311                            [&brokenConn](const auto& conn) {
312                                return brokenConn.find(conn->ID()) != brokenConn.end();
313                            }),
314             mValueStreamingConnections.end());
315 }
316 
Start()317 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
318     if (mServer) {
319         LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
320         return *this;
321     }
322     ::grpc::ServerBuilder builder;
323     builder.RegisterService(this);
324     for (const std::string& serviceAddr : mServiceAddrs) {
325         builder.AddListeningPort(serviceAddr, getServerCredentials());
326     }
327     mServer = builder.BuildAndStart();
328     CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
329                    << "please make sure the configuration and permissions are correct";
330     return *this;
331 }
332 
Shutdown()333 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
334     std::shared_lock read_lock(mConnectionMutex);
335     for (auto& conn : mValueStreamingConnections) {
336         conn->Shutdown();
337     }
338     if (mServer) {
339         mServer->Shutdown();
340     }
341     return *this;
342 }
343 
Wait()344 void GrpcVehicleProxyServer::Wait() {
345     if (mServer) {
346         mServer->Wait();
347     }
348     mServer.reset();
349 }
350 
~ConnectionDescriptor()351 GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
352     Shutdown();
353 }
354 
Write(const proto::VehiclePropValues & values)355 bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
356     if (!mStream) {
357         LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
358         Shutdown();
359         return false;
360     }
361     {
362         std::lock_guard lck(*mMtx);
363         if (!mShutdownFlag && mStream->Write(values)) {
364             return true;
365         } else {
366             LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
367         }
368     }
369     Shutdown();
370     return false;
371 }
372 
Wait()373 void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
374     std::unique_lock lck(*mMtx);
375     mCV->wait(lck, [this] { return mShutdownFlag; });
376 }
377 
Shutdown()378 void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
379     {
380         std::lock_guard lck(*mMtx);
381         mShutdownFlag = true;
382     }
383     mCV->notify_all();
384 }
385 
386 }  // namespace android::hardware::automotive::vehicle::virtualization
387