• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "GRPCVehicleProxyServer.h"
18 
19 #include "ProtoMessageConverter.h"
20 
21 #include <grpc++/grpc++.h>
22 
23 #include <android-base/logging.h>
24 
25 #include <algorithm>
26 #include <condition_variable>
27 #include <mutex>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 
32 namespace android::hardware::automotive::vehicle::virtualization {
33 
34 std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
35 
getServerCredentials()36 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
37     // TODO(chenhaosjtuacm): get secured credentials here
38     return ::grpc::InsecureServerCredentials();
39 }
40 
GrpcVehicleProxyServer(std::string serverAddr,std::unique_ptr<IVehicleHardware> && hardware)41 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
42                                                std::unique_ptr<IVehicleHardware>&& hardware)
43     : mServiceAddr(std::move(serverAddr)), mHardware(std::move(hardware)) {
44     mHardware->registerOnPropertyChangeEvent(
45             std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
46                     [this](std::vector<aidlvhal::VehiclePropValue> values) {
47                         OnVehiclePropChange(values);
48                     }));
49 }
50 
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropConfig> * stream)51 ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
52         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
53         ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
54     for (const auto& config : mHardware->getAllPropertyConfigs()) {
55         proto::VehiclePropConfig protoConfig;
56         proto_msg_converter::aidlToProto(config, &protoConfig);
57         if (!stream->Write(protoConfig)) {
58             return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
59         }
60     }
61     return ::grpc::Status::OK;
62 }
63 
SetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::SetValueResults * results)64 ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
65                                                  const proto::VehiclePropValueRequests* requests,
66                                                  proto::SetValueResults* results) {
67     std::vector<aidlvhal::SetValueRequest> aidlRequests;
68     for (const auto& protoRequest : requests->requests()) {
69         auto& aidlRequest = aidlRequests.emplace_back();
70         aidlRequest.requestId = protoRequest.request_id();
71         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
72     }
73     auto waitMtx = std::make_shared<std::mutex>();
74     auto waitCV = std::make_shared<std::condition_variable>();
75     auto complete = std::make_shared<bool>(false);
76     auto tmpResults = std::make_shared<proto::SetValueResults>();
77     auto aidlStatus = mHardware->setValues(
78             std::make_shared<const IVehicleHardware::SetValuesCallback>(
79                     [waitMtx, waitCV, complete,
80                      tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) {
81                         for (const auto& aidlResult : setValueResults) {
82                             auto& protoResult = *tmpResults->add_results();
83                             protoResult.set_request_id(aidlResult.requestId);
84                             protoResult.set_status(
85                                     static_cast<proto::StatusCode>(aidlResult.status));
86                         }
87                         {
88                             std::lock_guard lck(*waitMtx);
89                             *complete = true;
90                         }
91                         waitCV->notify_all();
92                     }),
93             aidlRequests);
94     if (aidlStatus != aidlvhal::StatusCode::OK) {
95         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
96                               "The underlying hardware fails to set values, VHAL status: " +
97                                       toString(aidlStatus));
98     }
99     std::unique_lock lck(*waitMtx);
100     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
101     if (!success) {
102         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
103                               "The underlying hardware set values timeout.");
104     }
105     *results = std::move(*tmpResults);
106     return ::grpc::Status::OK;
107 }
108 
GetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::GetValueResults * results)109 ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
110                                                  const proto::VehiclePropValueRequests* requests,
111                                                  proto::GetValueResults* results) {
112     std::vector<aidlvhal::GetValueRequest> aidlRequests;
113     for (const auto& protoRequest : requests->requests()) {
114         auto& aidlRequest = aidlRequests.emplace_back();
115         aidlRequest.requestId = protoRequest.request_id();
116         proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
117     }
118     auto waitMtx = std::make_shared<std::mutex>();
119     auto waitCV = std::make_shared<std::condition_variable>();
120     auto complete = std::make_shared<bool>(false);
121     auto tmpResults = std::make_shared<proto::GetValueResults>();
122     auto aidlStatus = mHardware->getValues(
123             std::make_shared<const IVehicleHardware::GetValuesCallback>(
124                     [waitMtx, waitCV, complete,
125                      tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) {
126                         for (const auto& aidlResult : getValueResults) {
127                             auto& protoResult = *tmpResults->add_results();
128                             protoResult.set_request_id(aidlResult.requestId);
129                             protoResult.set_status(
130                                     static_cast<proto::StatusCode>(aidlResult.status));
131                             if (aidlResult.prop) {
132                                 auto* valuePtr = protoResult.mutable_value();
133                                 proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
134                             }
135                         }
136                         {
137                             std::lock_guard lck(*waitMtx);
138                             *complete = true;
139                         }
140                         waitCV->notify_all();
141                     }),
142             aidlRequests);
143     if (aidlStatus != aidlvhal::StatusCode::OK) {
144         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
145                               "The underlying hardware fails to get values, VHAL status: " +
146                                       toString(aidlStatus));
147     }
148     std::unique_lock lck(*waitMtx);
149     bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
150     if (!success) {
151         return ::grpc::Status(::grpc::StatusCode::INTERNAL,
152                               "The underlying hardware get values timeout.");
153     }
154     *results = std::move(*tmpResults);
155     return ::grpc::Status::OK;
156 }
157 
UpdateSampleRate(::grpc::ServerContext * context,const proto::UpdateSampleRateRequest * request,proto::VehicleHalCallStatus * status)158 ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
159         ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
160         proto::VehicleHalCallStatus* status) {
161     const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
162                                                          request->sample_rate());
163     status->set_status_code(static_cast<proto::StatusCode>(status_code));
164     return ::grpc::Status::OK;
165 }
166 
CheckHealth(::grpc::ServerContext * context,const::google::protobuf::Empty *,proto::VehicleHalCallStatus * status)167 ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
168                                                    const ::google::protobuf::Empty*,
169                                                    proto::VehicleHalCallStatus* status) {
170     status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
171     return ::grpc::Status::OK;
172 }
173 
Dump(::grpc::ServerContext * context,const proto::DumpOptions * options,proto::DumpResult * result)174 ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
175                                             const proto::DumpOptions* options,
176                                             proto::DumpResult* result) {
177     std::vector<std::string> dumpOptionStrings(options->options().begin(),
178                                                options->options().end());
179     auto dumpResult = mHardware->dump(dumpOptionStrings);
180     result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
181     result->set_buffer(dumpResult.buffer);
182     return ::grpc::Status::OK;
183 }
184 
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropValues> * stream)185 ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
186         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
187         ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
188     auto conn = std::make_shared<ConnectionDescriptor>(stream);
189     {
190         std::lock_guard lck(mConnectionMutex);
191         mValueStreamingConnections.push_back(conn);
192     }
193     conn->Wait();
194     LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
195     return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
196 }
197 
OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue> & values)198 void GrpcVehicleProxyServer::OnVehiclePropChange(
199         const std::vector<aidlvhal::VehiclePropValue>& values) {
200     std::unordered_set<uint64_t> brokenConn;
201     proto::VehiclePropValues protoValues;
202     for (const auto& value : values) {
203         auto* protoValuePtr = protoValues.add_values();
204         proto_msg_converter::aidlToProto(value, protoValuePtr);
205     }
206     {
207         std::shared_lock read_lock(mConnectionMutex);
208         for (auto& connection : mValueStreamingConnections) {
209             auto writeOK = connection->Write(protoValues);
210             if (!writeOK) {
211                 LOG(ERROR) << __func__
212                            << ": Server Write failed, connection lost. ID: " << connection->ID();
213                 brokenConn.insert(connection->ID());
214             }
215         }
216     }
217     if (brokenConn.empty()) {
218         return;
219     }
220     std::unique_lock write_lock(mConnectionMutex);
221     mValueStreamingConnections.erase(
222             std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
223                            [&brokenConn](const auto& conn) {
224                                return brokenConn.find(conn->ID()) != brokenConn.end();
225                            }),
226             mValueStreamingConnections.end());
227 }
228 
Start()229 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
230     if (mServer) {
231         LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
232         return *this;
233     }
234     ::grpc::ServerBuilder builder;
235     builder.RegisterService(this);
236     builder.AddListeningPort(mServiceAddr, getServerCredentials());
237     mServer = builder.BuildAndStart();
238     CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
239                    << "please make sure the configuration and permissions are correct";
240     return *this;
241 }
242 
Shutdown()243 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
244     std::shared_lock read_lock(mConnectionMutex);
245     for (auto& conn : mValueStreamingConnections) {
246         conn->Shutdown();
247     }
248     if (mServer) {
249         mServer->Shutdown();
250     }
251     return *this;
252 }
253 
Wait()254 void GrpcVehicleProxyServer::Wait() {
255     if (mServer) {
256         mServer->Wait();
257     }
258     mServer.reset();
259 }
260 
~ConnectionDescriptor()261 GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
262     Shutdown();
263 }
264 
Write(const proto::VehiclePropValues & values)265 bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
266     if (!mStream) {
267         LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
268         Shutdown();
269         return false;
270     }
271     {
272         std::lock_guard lck(*mMtx);
273         if (!mShutdownFlag && mStream->Write(values)) {
274             return true;
275         } else {
276             LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
277         }
278     }
279     Shutdown();
280     return false;
281 }
282 
Wait()283 void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
284     std::unique_lock lck(*mMtx);
285     mCV->wait(lck, [this] { return mShutdownFlag; });
286 }
287 
Shutdown()288 void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
289     {
290         std::lock_guard lck(*mMtx);
291         mShutdownFlag = true;
292     }
293     mCV->notify_all();
294 }
295 
296 }  // namespace android::hardware::automotive::vehicle::virtualization
297