1 /*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "GrpcVehicleServer.h"
17
18 #include <condition_variable>
19 #include <mutex>
20 #include <shared_mutex>
21
22 #include <android-base/logging.h>
23 #include <grpc++/grpc++.h>
24
25 #include "GarageModeServerSideHandler.h"
26 #include "PowerStateListener.h"
27 #include "VehicleServer.grpc.pb.h"
28 #include "VehicleServer.pb.h"
29 #include "vhal_v2_0/DefaultConfig.h"
30 #include "vhal_v2_0/ProtoMessageConverter.h"
31
32 namespace android {
33 namespace hardware {
34 namespace automotive {
35 namespace vehicle {
36 namespace V2_0 {
37
38 namespace impl {
39
40 class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service {
41 public:
GrpcVehicleServerImpl(const VirtualizedVhalServerInfo & serverInfo)42 explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo)
43 : mServiceAddr(serverInfo.getServerUri()),
44 mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool,
45 serverInfo.powerStateMarkerFilePath)),
46 mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) {
47 setValuePool(&mValueObjectPool);
48 }
49
50 // method from GrpcVehicleServer
51 GrpcVehicleServer& Start() override;
52
53 void Wait() override;
54
55 GrpcVehicleServer& Stop() override;
56
57 uint32_t NumOfActivePropertyValueStream() override;
58
59 // methods from IVehicleServer
60 void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
61
62 StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override;
63
64 // methods from vhal_proto::VehicleServer::Service
65
66 ::grpc::Status GetAllPropertyConfig(
67 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
68 ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override;
69
70 ::grpc::Status SetProperty(::grpc::ServerContext* context,
71 const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
72 vhal_proto::VehicleHalCallStatus* status) override;
73
74 ::grpc::Status StartPropertyValuesStream(
75 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
76 ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override;
77
78 private:
79 // We keep long-lasting connection for streaming the prop values.
80 // For us, each connection can be represented as a function to send the new value, and
81 // an ID to identify this connection
82 struct ConnectionDescriptor {
83 using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>;
84
ConnectionDescriptorandroid::hardware::automotive::vehicle::V2_0::impl::GrpcVehicleServerImpl::ConnectionDescriptor85 explicit ConnectionDescriptor(ValueWriterType&& value_writer)
86 : mValueWriter(std::move(value_writer)),
87 mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {}
88
89 ConnectionDescriptor(const ConnectionDescriptor&) = delete;
90
91 ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete;
92
93 // This move constructor is NOT THREAD-SAFE, which means it cannot be moved
94 // while using. Since the connection descriptors are pretected by mConnectionMutex
95 // then we are fine here
ConnectionDescriptorandroid::hardware::automotive::vehicle::V2_0::impl::GrpcVehicleServerImpl::ConnectionDescriptor96 ConnectionDescriptor(ConnectionDescriptor&& cd)
97 : mValueWriter(std::move(cd.mValueWriter)),
98 mConnectionID(cd.mConnectionID),
99 mIsAlive(cd.mIsAlive.load()) {
100 cd.mIsAlive.store(false);
101 }
102
103 ValueWriterType mValueWriter;
104 uint64_t mConnectionID;
105 std::atomic<bool> mIsAlive{true};
106
107 static std::atomic<uint64_t> CONNECTION_ID_COUNTER;
108 };
109
110 std::string mServiceAddr;
111 std::unique_ptr<::grpc::Server> mServer{nullptr};
112 VehiclePropValuePool mValueObjectPool;
113 std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
114 PowerStateListener mPowerStateListener;
115 std::thread mPowerStateListenerThread{};
116 mutable std::shared_mutex mConnectionMutex;
117 mutable std::shared_mutex mWriterMutex;
118 std::list<ConnectionDescriptor> mValueStreamingConnections;
119 };
120
121 std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0;
122
getServerCredentials()123 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
124 // TODO(chenhaosjtuacm): get secured credentials here
125 return ::grpc::InsecureServerCredentials();
126 }
127
makeGrpcVehicleServer(const VirtualizedVhalServerInfo & serverInfo)128 GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) {
129 return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
130 }
131
Start()132 GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
133 if (mServer) {
134 LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
135 return *this;
136 }
137
138 ::grpc::ServerBuilder builder;
139 builder.RegisterService(this);
140 builder.AddListeningPort(mServiceAddr, getServerCredentials());
141 mServer = builder.BuildAndStart();
142
143 CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
144 << "please make sure the configuration and permissions are correct";
145
146 mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
147 return *this;
148 }
149
Wait()150 void GrpcVehicleServerImpl::Wait() {
151 if (mServer) {
152 mServer->Wait();
153 }
154
155 if (mPowerStateListenerThread.joinable()) {
156 mPowerStateListenerThread.join();
157 }
158
159 mPowerStateListenerThread = {};
160 mServer.reset();
161 }
162
Stop()163 GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
164 if (!mServer) {
165 LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
166 return *this;
167 }
168
169 mServer->Shutdown();
170 mPowerStateListener.Stop();
171 return *this;
172 }
173
NumOfActivePropertyValueStream()174 uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
175 std::shared_lock read_lock(mConnectionMutex);
176 return mValueStreamingConnections.size();
177 }
178
onPropertyValueFromCar(const VehiclePropValue & value,bool updateStatus)179 void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
180 bool updateStatus) {
181 vhal_proto::WrappedVehiclePropValue wrappedPropValue;
182 proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value);
183 wrappedPropValue.set_update_status(updateStatus);
184 std::shared_lock read_lock(mConnectionMutex);
185
186 bool has_terminated_connections = 0;
187
188 for (auto& connection : mValueStreamingConnections) {
189 auto writeOK = connection.mValueWriter(wrappedPropValue);
190 if (!writeOK) {
191 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: "
192 << connection.mConnectionID;
193 has_terminated_connections = true;
194 connection.mIsAlive.store(false);
195 }
196 }
197
198 if (!has_terminated_connections) {
199 return;
200 }
201
202 read_lock.unlock();
203
204 std::unique_lock write_lock(mConnectionMutex);
205
206 for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) {
207 if (!itr->mIsAlive.load()) {
208 itr = mValueStreamingConnections.erase(itr);
209 } else {
210 ++itr;
211 }
212 }
213 }
214
onSetProperty(const VehiclePropValue & value,bool updateStatus)215 StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) {
216 if (value.prop == AP_POWER_STATE_REPORT &&
217 value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) {
218 mGarageModeHandler->HandleHeartbeat();
219 }
220 return GrpcVehicleServer::onSetProperty(value, updateStatus);
221 }
222
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<vhal_proto::VehiclePropConfig> * stream)223 ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig(
224 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
225 ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) {
226 auto configs = onGetAllPropertyConfig();
227 for (auto& config : configs) {
228 vhal_proto::VehiclePropConfig protoConfig;
229 proto_msg_converter::toProto(&protoConfig, config);
230 if (!stream->Write(protoConfig)) {
231 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
232 }
233 }
234
235 return ::grpc::Status::OK;
236 }
237
SetProperty(::grpc::ServerContext * context,const vhal_proto::WrappedVehiclePropValue * wrappedPropValue,vhal_proto::VehicleHalCallStatus * status)238 ::grpc::Status GrpcVehicleServerImpl::SetProperty(
239 ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
240 vhal_proto::VehicleHalCallStatus* status) {
241 VehiclePropValue value;
242 proto_msg_converter::fromProto(&value, wrappedPropValue->value());
243
244 auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status()));
245 if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) {
246 return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code");
247 }
248
249 status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status));
250
251 return ::grpc::Status::OK;
252 }
253
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue> * stream)254 ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream(
255 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
256 ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) {
257 std::mutex terminateMutex;
258 std::condition_variable terminateCV;
259 std::unique_lock<std::mutex> terminateLock(terminateMutex);
260 bool terminated{false};
261
262 auto callBack = [stream, &terminateMutex, &terminateCV, &terminated,
263 this](const vhal_proto::WrappedVehiclePropValue& value) {
264 std::unique_lock lock(mWriterMutex);
265 if (!stream->Write(value)) {
266 std::unique_lock<std::mutex> terminateLock(terminateMutex);
267 terminated = true;
268 terminateLock.unlock();
269 terminateCV.notify_all();
270 return false;
271 }
272 return true;
273 };
274
275 // Register connection
276 std::unique_lock lock(mConnectionMutex);
277 auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack));
278 lock.unlock();
279
280 // Never stop until connection lost
281 terminateCV.wait(terminateLock, [&terminated]() { return terminated; });
282
283 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID;
284
285 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
286 }
287
288 } // namespace impl
289
290 } // namespace V2_0
291 } // namespace vehicle
292 } // namespace automotive
293 } // namespace hardware
294 } // namespace android
295