1 /*
2 * Copyright (C) 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "GRPCVehicleProxyServer.h"
18
19 #include "ProtoMessageConverter.h"
20
21 #include <grpc++/grpc++.h>
22
23 #include <android-base/logging.h>
24
25 #include <algorithm>
26 #include <condition_variable>
27 #include <mutex>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31
32 namespace android::hardware::automotive::vehicle::virtualization {
33
34 std::atomic<uint64_t> GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0};
35
getServerCredentials()36 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
37 // TODO(chenhaosjtuacm): get secured credentials here
38 return ::grpc::InsecureServerCredentials();
39 }
40
GrpcVehicleProxyServer(std::string serverAddr,std::unique_ptr<IVehicleHardware> && hardware)41 GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr,
42 std::unique_ptr<IVehicleHardware>&& hardware)
43 : mServiceAddr(std::move(serverAddr)), mHardware(std::move(hardware)) {
44 mHardware->registerOnPropertyChangeEvent(
45 std::make_unique<const IVehicleHardware::PropertyChangeCallback>(
46 [this](std::vector<aidlvhal::VehiclePropValue> values) {
47 OnVehiclePropChange(values);
48 }));
49 }
50
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropConfig> * stream)51 ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig(
52 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
53 ::grpc::ServerWriter<proto::VehiclePropConfig>* stream) {
54 for (const auto& config : mHardware->getAllPropertyConfigs()) {
55 proto::VehiclePropConfig protoConfig;
56 proto_msg_converter::aidlToProto(config, &protoConfig);
57 if (!stream->Write(protoConfig)) {
58 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
59 }
60 }
61 return ::grpc::Status::OK;
62 }
63
SetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::SetValueResults * results)64 ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context,
65 const proto::VehiclePropValueRequests* requests,
66 proto::SetValueResults* results) {
67 std::vector<aidlvhal::SetValueRequest> aidlRequests;
68 for (const auto& protoRequest : requests->requests()) {
69 auto& aidlRequest = aidlRequests.emplace_back();
70 aidlRequest.requestId = protoRequest.request_id();
71 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
72 }
73 auto waitMtx = std::make_shared<std::mutex>();
74 auto waitCV = std::make_shared<std::condition_variable>();
75 auto complete = std::make_shared<bool>(false);
76 auto tmpResults = std::make_shared<proto::SetValueResults>();
77 auto aidlStatus = mHardware->setValues(
78 std::make_shared<const IVehicleHardware::SetValuesCallback>(
79 [waitMtx, waitCV, complete,
80 tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) {
81 for (const auto& aidlResult : setValueResults) {
82 auto& protoResult = *tmpResults->add_results();
83 protoResult.set_request_id(aidlResult.requestId);
84 protoResult.set_status(
85 static_cast<proto::StatusCode>(aidlResult.status));
86 }
87 {
88 std::lock_guard lck(*waitMtx);
89 *complete = true;
90 }
91 waitCV->notify_all();
92 }),
93 aidlRequests);
94 if (aidlStatus != aidlvhal::StatusCode::OK) {
95 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
96 "The underlying hardware fails to set values, VHAL status: " +
97 toString(aidlStatus));
98 }
99 std::unique_lock lck(*waitMtx);
100 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
101 if (!success) {
102 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
103 "The underlying hardware set values timeout.");
104 }
105 *results = std::move(*tmpResults);
106 return ::grpc::Status::OK;
107 }
108
GetValues(::grpc::ServerContext * context,const proto::VehiclePropValueRequests * requests,proto::GetValueResults * results)109 ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context,
110 const proto::VehiclePropValueRequests* requests,
111 proto::GetValueResults* results) {
112 std::vector<aidlvhal::GetValueRequest> aidlRequests;
113 for (const auto& protoRequest : requests->requests()) {
114 auto& aidlRequest = aidlRequests.emplace_back();
115 aidlRequest.requestId = protoRequest.request_id();
116 proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
117 }
118 auto waitMtx = std::make_shared<std::mutex>();
119 auto waitCV = std::make_shared<std::condition_variable>();
120 auto complete = std::make_shared<bool>(false);
121 auto tmpResults = std::make_shared<proto::GetValueResults>();
122 auto aidlStatus = mHardware->getValues(
123 std::make_shared<const IVehicleHardware::GetValuesCallback>(
124 [waitMtx, waitCV, complete,
125 tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) {
126 for (const auto& aidlResult : getValueResults) {
127 auto& protoResult = *tmpResults->add_results();
128 protoResult.set_request_id(aidlResult.requestId);
129 protoResult.set_status(
130 static_cast<proto::StatusCode>(aidlResult.status));
131 if (aidlResult.prop) {
132 auto* valuePtr = protoResult.mutable_value();
133 proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
134 }
135 }
136 {
137 std::lock_guard lck(*waitMtx);
138 *complete = true;
139 }
140 waitCV->notify_all();
141 }),
142 aidlRequests);
143 if (aidlStatus != aidlvhal::StatusCode::OK) {
144 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
145 "The underlying hardware fails to get values, VHAL status: " +
146 toString(aidlStatus));
147 }
148 std::unique_lock lck(*waitMtx);
149 bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; });
150 if (!success) {
151 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
152 "The underlying hardware get values timeout.");
153 }
154 *results = std::move(*tmpResults);
155 return ::grpc::Status::OK;
156 }
157
UpdateSampleRate(::grpc::ServerContext * context,const proto::UpdateSampleRateRequest * request,proto::VehicleHalCallStatus * status)158 ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate(
159 ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request,
160 proto::VehicleHalCallStatus* status) {
161 const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(),
162 request->sample_rate());
163 status->set_status_code(static_cast<proto::StatusCode>(status_code));
164 return ::grpc::Status::OK;
165 }
166
Subscribe(::grpc::ServerContext * context,const proto::SubscribeRequest * request,proto::VehicleHalCallStatus * status)167 ::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context,
168 const proto::SubscribeRequest* request,
169 proto::VehicleHalCallStatus* status) {
170 const auto& protoSubscribeOptions = request->options();
171 aidlvhal::SubscribeOptions aidlSubscribeOptions = {};
172 proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions);
173 const auto status_code = mHardware->subscribe(aidlSubscribeOptions);
174 status->set_status_code(static_cast<proto::StatusCode>(status_code));
175 return ::grpc::Status::OK;
176 }
177
Unsubscribe(::grpc::ServerContext * context,const proto::UnsubscribeRequest * request,proto::VehicleHalCallStatus * status)178 ::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context,
179 const proto::UnsubscribeRequest* request,
180 proto::VehicleHalCallStatus* status) {
181 int32_t propId = request->prop_id();
182 int32_t areaId = request->area_id();
183 const auto status_code = mHardware->unsubscribe(propId, areaId);
184 status->set_status_code(static_cast<proto::StatusCode>(status_code));
185 return ::grpc::Status::OK;
186 }
187
CheckHealth(::grpc::ServerContext * context,const::google::protobuf::Empty *,proto::VehicleHalCallStatus * status)188 ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context,
189 const ::google::protobuf::Empty*,
190 proto::VehicleHalCallStatus* status) {
191 status->set_status_code(static_cast<proto::StatusCode>(mHardware->checkHealth()));
192 return ::grpc::Status::OK;
193 }
194
Dump(::grpc::ServerContext * context,const proto::DumpOptions * options,proto::DumpResult * result)195 ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context,
196 const proto::DumpOptions* options,
197 proto::DumpResult* result) {
198 std::vector<std::string> dumpOptionStrings(options->options().begin(),
199 options->options().end());
200 auto dumpResult = mHardware->dump(dumpOptionStrings);
201 result->set_caller_should_dump_state(dumpResult.callerShouldDumpState);
202 result->set_buffer(dumpResult.buffer);
203 return ::grpc::Status::OK;
204 }
205
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<proto::VehiclePropValues> * stream)206 ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream(
207 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
208 ::grpc::ServerWriter<proto::VehiclePropValues>* stream) {
209 auto conn = std::make_shared<ConnectionDescriptor>(stream);
210 {
211 std::lock_guard lck(mConnectionMutex);
212 mValueStreamingConnections.push_back(conn);
213 }
214 conn->Wait();
215 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID();
216 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
217 }
218
OnVehiclePropChange(const std::vector<aidlvhal::VehiclePropValue> & values)219 void GrpcVehicleProxyServer::OnVehiclePropChange(
220 const std::vector<aidlvhal::VehiclePropValue>& values) {
221 std::unordered_set<uint64_t> brokenConn;
222 proto::VehiclePropValues protoValues;
223 for (const auto& value : values) {
224 auto* protoValuePtr = protoValues.add_values();
225 proto_msg_converter::aidlToProto(value, protoValuePtr);
226 }
227 {
228 std::shared_lock read_lock(mConnectionMutex);
229 for (auto& connection : mValueStreamingConnections) {
230 auto writeOK = connection->Write(protoValues);
231 if (!writeOK) {
232 LOG(ERROR) << __func__
233 << ": Server Write failed, connection lost. ID: " << connection->ID();
234 brokenConn.insert(connection->ID());
235 }
236 }
237 }
238 if (brokenConn.empty()) {
239 return;
240 }
241 std::unique_lock write_lock(mConnectionMutex);
242 mValueStreamingConnections.erase(
243 std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(),
244 [&brokenConn](const auto& conn) {
245 return brokenConn.find(conn->ID()) != brokenConn.end();
246 }),
247 mValueStreamingConnections.end());
248 }
249
Start()250 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() {
251 if (mServer) {
252 LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started.";
253 return *this;
254 }
255 ::grpc::ServerBuilder builder;
256 builder.RegisterService(this);
257 builder.AddListeningPort(mServiceAddr, getServerCredentials());
258 mServer = builder.BuildAndStart();
259 CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
260 << "please make sure the configuration and permissions are correct";
261 return *this;
262 }
263
Shutdown()264 GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() {
265 std::shared_lock read_lock(mConnectionMutex);
266 for (auto& conn : mValueStreamingConnections) {
267 conn->Shutdown();
268 }
269 if (mServer) {
270 mServer->Shutdown();
271 }
272 return *this;
273 }
274
Wait()275 void GrpcVehicleProxyServer::Wait() {
276 if (mServer) {
277 mServer->Wait();
278 }
279 mServer.reset();
280 }
281
~ConnectionDescriptor()282 GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() {
283 Shutdown();
284 }
285
Write(const proto::VehiclePropValues & values)286 bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) {
287 if (!mStream) {
288 LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID();
289 Shutdown();
290 return false;
291 }
292 {
293 std::lock_guard lck(*mMtx);
294 if (!mShutdownFlag && mStream->Write(values)) {
295 return true;
296 } else {
297 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID();
298 }
299 }
300 Shutdown();
301 return false;
302 }
303
Wait()304 void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() {
305 std::unique_lock lck(*mMtx);
306 mCV->wait(lck, [this] { return mShutdownFlag; });
307 }
308
Shutdown()309 void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() {
310 {
311 std::lock_guard lck(*mMtx);
312 mShutdownFlag = true;
313 }
314 mCV->notify_all();
315 }
316
317 } // namespace android::hardware::automotive::vehicle::virtualization
318