• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <GRPCVehicleHardware.h>
18 
19 #include "ProtoMessageConverter.h"
20 
21 #include <android-base/logging.h>
22 #include <grpc++/grpc++.h>
23 
24 #include <cstdlib>
25 #include <mutex>
26 #include <shared_mutex>
27 #include <utility>
28 
29 namespace android::hardware::automotive::vehicle::virtualization {
30 
getChannelCredentials()31 static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
32     // TODO(chenhaosjtuacm): get secured credentials here
33     return ::grpc::InsecureChannelCredentials();
34 }
35 
GRPCVehicleHardware(std::string service_addr)36 GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr)
37     : mServiceAddr(std::move(service_addr)),
38       mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
39       mGrpcStub(proto::VehicleServer::NewStub(mGrpcChannel)),
40       mValuePollingThread([this] { ValuePollingLoop(); }) {}
41 
42 // Only used for unit testing.
GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)43 GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr<proto::VehicleServer::StubInterface> stub)
44     : mServiceAddr(""),
45       mGrpcChannel(nullptr),
46       mGrpcStub(std::move(stub)),
47       mValuePollingThread([] {}) {}
48 
~GRPCVehicleHardware()49 GRPCVehicleHardware::~GRPCVehicleHardware() {
50     {
51         std::lock_guard lck(mShutdownMutex);
52         mShuttingDownFlag.store(true);
53     }
54     mShutdownCV.notify_all();
55     mValuePollingThread.join();
56 }
57 
getAllPropertyConfigs() const58 std::vector<aidlvhal::VehiclePropConfig> GRPCVehicleHardware::getAllPropertyConfigs() const {
59     std::vector<aidlvhal::VehiclePropConfig> configs;
60     ::grpc::ClientContext context;
61     auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
62     proto::VehiclePropConfig protoConfig;
63     while (config_stream->Read(&protoConfig)) {
64         aidlvhal::VehiclePropConfig config;
65         proto_msg_converter::protoToAidl(protoConfig, &config);
66         configs.push_back(std::move(config));
67     }
68     auto grpc_status = config_stream->Finish();
69     if (!grpc_status.ok()) {
70         LOG(ERROR) << __func__
71                    << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
72     }
73     return configs;
74 }
75 
setValues(std::shared_ptr<const SetValuesCallback> callback,const std::vector<aidlvhal::SetValueRequest> & requests)76 aidlvhal::StatusCode GRPCVehicleHardware::setValues(
77         std::shared_ptr<const SetValuesCallback> callback,
78         const std::vector<aidlvhal::SetValueRequest>& requests) {
79     ::grpc::ClientContext context;
80     proto::VehiclePropValueRequests protoRequests;
81     proto::SetValueResults protoResults;
82     for (const auto& request : requests) {
83         auto& protoRequest = *protoRequests.add_requests();
84         protoRequest.set_request_id(request.requestId);
85         proto_msg_converter::aidlToProto(request.value, protoRequest.mutable_value());
86     }
87     // TODO(chenhaosjtuacm): Make it Async.
88     auto grpc_status = mGrpcStub->SetValues(&context, protoRequests, &protoResults);
89     if (!grpc_status.ok()) {
90         LOG(ERROR) << __func__ << ": GRPC SetValues Failed: " << grpc_status.error_message();
91         {
92             std::shared_lock lck(mCallbackMutex);
93             // TODO(chenhaosjtuacm): call on-set-error callback.
94         }
95         return aidlvhal::StatusCode::INTERNAL_ERROR;
96     }
97     std::vector<aidlvhal::SetValueResult> results;
98     for (const auto& protoResult : protoResults.results()) {
99         auto& result = results.emplace_back();
100         result.requestId = protoResult.request_id();
101         result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
102         // TODO(chenhaosjtuacm): call on-set-error callback.
103     }
104     (*callback)(std::move(results));
105 
106     return aidlvhal::StatusCode::OK;
107 }
108 
getValues(std::shared_ptr<const GetValuesCallback> callback,const std::vector<aidlvhal::GetValueRequest> & requests) const109 aidlvhal::StatusCode GRPCVehicleHardware::getValues(
110         std::shared_ptr<const GetValuesCallback> callback,
111         const std::vector<aidlvhal::GetValueRequest>& requests) const {
112     ::grpc::ClientContext context;
113     proto::VehiclePropValueRequests protoRequests;
114     proto::GetValueResults protoResults;
115     for (const auto& request : requests) {
116         auto& protoRequest = *protoRequests.add_requests();
117         protoRequest.set_request_id(request.requestId);
118         proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value());
119     }
120     // TODO(chenhaosjtuacm): Make it Async.
121     auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults);
122     if (!grpc_status.ok()) {
123         LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message();
124         return aidlvhal::StatusCode::INTERNAL_ERROR;
125     }
126     std::vector<aidlvhal::GetValueResult> results;
127     for (const auto& protoResult : protoResults.results()) {
128         auto& result = results.emplace_back();
129         result.requestId = protoResult.request_id();
130         result.status = static_cast<aidlvhal::StatusCode>(protoResult.status());
131         if (protoResult.has_value()) {
132             aidlvhal::VehiclePropValue value;
133             proto_msg_converter::protoToAidl(protoResult.value(), &value);
134             result.prop = std::move(value);
135         }
136     }
137     (*callback)(std::move(results));
138 
139     return aidlvhal::StatusCode::OK;
140 }
141 
registerOnPropertyChangeEvent(std::unique_ptr<const PropertyChangeCallback> callback)142 void GRPCVehicleHardware::registerOnPropertyChangeEvent(
143         std::unique_ptr<const PropertyChangeCallback> callback) {
144     std::lock_guard lck(mCallbackMutex);
145     if (mOnPropChange) {
146         LOG(ERROR) << __func__ << " must only be called once.";
147         return;
148     }
149     mOnPropChange = std::move(callback);
150 }
151 
registerOnPropertySetErrorEvent(std::unique_ptr<const PropertySetErrorCallback> callback)152 void GRPCVehicleHardware::registerOnPropertySetErrorEvent(
153         std::unique_ptr<const PropertySetErrorCallback> callback) {
154     std::lock_guard lck(mCallbackMutex);
155     if (mOnSetErr) {
156         LOG(ERROR) << __func__ << " must only be called once.";
157         return;
158     }
159     mOnSetErr = std::move(callback);
160 }
161 
dump(const std::vector<std::string> & options)162 DumpResult GRPCVehicleHardware::dump(const std::vector<std::string>& options) {
163     ::grpc::ClientContext context;
164     proto::DumpOptions protoDumpOptions;
165     proto::DumpResult protoDumpResult;
166     for (const auto& option : options) {
167         protoDumpOptions.add_options(option);
168     }
169     auto grpc_status = mGrpcStub->Dump(&context, protoDumpOptions, &protoDumpResult);
170     if (!grpc_status.ok()) {
171         LOG(ERROR) << __func__ << ": GRPC Dump Failed: " << grpc_status.error_message();
172         return {};
173     }
174     return {
175             .callerShouldDumpState = protoDumpResult.caller_should_dump_state(),
176             .buffer = protoDumpResult.buffer(),
177     };
178 }
179 
checkHealth()180 aidlvhal::StatusCode GRPCVehicleHardware::checkHealth() {
181     ::grpc::ClientContext context;
182     proto::VehicleHalCallStatus protoStatus;
183     auto grpc_status = mGrpcStub->CheckHealth(&context, ::google::protobuf::Empty(), &protoStatus);
184     if (!grpc_status.ok()) {
185         LOG(ERROR) << __func__ << ": GRPC CheckHealth Failed: " << grpc_status.error_message();
186         return aidlvhal::StatusCode::INTERNAL_ERROR;
187     }
188     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
189 }
190 
subscribe(aidlvhal::SubscribeOptions options)191 aidlvhal::StatusCode GRPCVehicleHardware::subscribe(aidlvhal::SubscribeOptions options) {
192     proto::SubscribeRequest request;
193     ::grpc::ClientContext context;
194     proto::VehicleHalCallStatus protoStatus;
195     proto_msg_converter::aidlToProto(options, request.mutable_options());
196     auto grpc_status = mGrpcStub->Subscribe(&context, request, &protoStatus);
197     if (!grpc_status.ok()) {
198         if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
199             // This is a legacy sever. It should handle updateSampleRate.
200             LOG(INFO) << __func__ << ": GRPC Subscribe is not supported by the server";
201             return aidlvhal::StatusCode::OK;
202         }
203         LOG(ERROR) << __func__ << ": GRPC Subscribe Failed: " << grpc_status.error_message();
204         return aidlvhal::StatusCode::INTERNAL_ERROR;
205     }
206     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
207 }
208 
unsubscribe(int32_t propId,int32_t areaId)209 aidlvhal::StatusCode GRPCVehicleHardware::unsubscribe(int32_t propId, int32_t areaId) {
210     proto::UnsubscribeRequest request;
211     ::grpc::ClientContext context;
212     proto::VehicleHalCallStatus protoStatus;
213     request.set_prop_id(propId);
214     request.set_area_id(areaId);
215     auto grpc_status = mGrpcStub->Unsubscribe(&context, request, &protoStatus);
216     if (!grpc_status.ok()) {
217         if (grpc_status.error_code() == ::grpc::StatusCode::UNIMPLEMENTED) {
218             // This is a legacy sever. Ignore unsubscribe request.
219             LOG(INFO) << __func__ << ": GRPC Unsubscribe is not supported by the server";
220             return aidlvhal::StatusCode::OK;
221         }
222         LOG(ERROR) << __func__ << ": GRPC Unsubscribe Failed: " << grpc_status.error_message();
223         return aidlvhal::StatusCode::INTERNAL_ERROR;
224     }
225     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
226 }
227 
updateSampleRate(int32_t propId,int32_t areaId,float sampleRate)228 aidlvhal::StatusCode GRPCVehicleHardware::updateSampleRate(int32_t propId, int32_t areaId,
229                                                            float sampleRate) {
230     ::grpc::ClientContext context;
231     proto::UpdateSampleRateRequest request;
232     proto::VehicleHalCallStatus protoStatus;
233     request.set_prop(propId);
234     request.set_area_id(areaId);
235     request.set_sample_rate(sampleRate);
236     auto grpc_status = mGrpcStub->UpdateSampleRate(&context, request, &protoStatus);
237     if (!grpc_status.ok()) {
238         LOG(ERROR) << __func__ << ": GRPC UpdateSampleRate Failed: " << grpc_status.error_message();
239         return aidlvhal::StatusCode::INTERNAL_ERROR;
240     }
241     return static_cast<aidlvhal::StatusCode>(protoStatus.status_code());
242 }
243 
waitForConnected(std::chrono::milliseconds waitTime)244 bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) {
245     return mGrpcChannel->WaitForConnected(gpr_time_add(
246             gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(waitTime.count(), GPR_TIMESPAN)));
247 }
248 
ValuePollingLoop()249 void GRPCVehicleHardware::ValuePollingLoop() {
250     while (!mShuttingDownFlag.load()) {
251         ::grpc::ClientContext context;
252 
253         bool rpc_stopped{false};
254         std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() {
255             std::unique_lock<std::mutex> lck(mShutdownMutex);
256             mShutdownCV.wait(lck, [this, &rpc_stopped]() {
257                 return rpc_stopped || mShuttingDownFlag.load();
258             });
259             context.TryCancel();
260         });
261 
262         auto value_stream =
263                 mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
264         LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
265         proto::VehiclePropValues protoValues;
266         while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) {
267             std::vector<aidlvhal::VehiclePropValue> values;
268             for (const auto protoValue : protoValues.values()) {
269                 values.push_back(aidlvhal::VehiclePropValue());
270                 proto_msg_converter::protoToAidl(protoValue, &values.back());
271             }
272             std::shared_lock lck(mCallbackMutex);
273             if (mOnPropChange) {
274                 (*mOnPropChange)(values);
275             }
276         }
277 
278         {
279             std::lock_guard lck(mShutdownMutex);
280             rpc_stopped = true;
281         }
282         mShutdownCV.notify_all();
283         shuttingdown_watcher.join();
284 
285         auto grpc_status = value_stream->Finish();
286         // never reach here until connection lost
287         LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
288 
289         // try to reconnect
290     }
291 }
292 
293 }  // namespace android::hardware::automotive::vehicle::virtualization
294