• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "GrpcVehicleServer.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <shared_mutex>
21 
22 #include <android-base/logging.h>
23 #include <grpc++/grpc++.h>
24 
25 #include "GarageModeServerSideHandler.h"
26 #include "PowerStateListener.h"
27 #include "VehicleServer.grpc.pb.h"
28 #include "VehicleServer.pb.h"
29 #include "vhal_v2_0/DefaultConfig.h"
30 #include "vhal_v2_0/ProtoMessageConverter.h"
31 
32 namespace android {
33 namespace hardware {
34 namespace automotive {
35 namespace vehicle {
36 namespace V2_0 {
37 
38 namespace impl {
39 
40 class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service {
41   public:
GrpcVehicleServerImpl(const VirtualizedVhalServerInfo & serverInfo)42     explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo)
43         : mServiceAddr(serverInfo.getServerUri()),
44           mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool,
45                                                              serverInfo.powerStateMarkerFilePath)),
46           mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) {
47         setValuePool(&mValueObjectPool);
48     }
49 
50     // method from GrpcVehicleServer
51     GrpcVehicleServer& Start() override;
52 
53     void Wait() override;
54 
55     GrpcVehicleServer& Stop() override;
56 
57     uint32_t NumOfActivePropertyValueStream() override;
58 
59     // methods from IVehicleServer
60     void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
61 
62     StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override;
63 
64     // methods from vhal_proto::VehicleServer::Service
65 
66     ::grpc::Status GetAllPropertyConfig(
67             ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
68             ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override;
69 
70     ::grpc::Status SetProperty(::grpc::ServerContext* context,
71                                const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
72                                vhal_proto::VehicleHalCallStatus* status) override;
73 
74     ::grpc::Status StartPropertyValuesStream(
75             ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
76             ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override;
77 
78   private:
79     // We keep long-lasting connection for streaming the prop values.
80     // For us, each connection can be represented as a function to send the new value, and
81     // an ID to identify this connection
82     struct ConnectionDescriptor {
83         using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>;
84 
ConnectionDescriptorandroid::hardware::automotive::vehicle::V2_0::impl::GrpcVehicleServerImpl::ConnectionDescriptor85         explicit ConnectionDescriptor(ValueWriterType&& value_writer)
86             : mValueWriter(std::move(value_writer)),
87               mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {}
88 
89         ConnectionDescriptor(const ConnectionDescriptor&) = delete;
90 
91         ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete;
92 
93         // This move constructor is NOT THREAD-SAFE, which means it cannot be moved
94         // while using. Since the connection descriptors are pretected by mConnectionMutex
95         // then we are fine here
ConnectionDescriptorandroid::hardware::automotive::vehicle::V2_0::impl::GrpcVehicleServerImpl::ConnectionDescriptor96         ConnectionDescriptor(ConnectionDescriptor&& cd)
97             : mValueWriter(std::move(cd.mValueWriter)),
98               mConnectionID(cd.mConnectionID),
99               mIsAlive(cd.mIsAlive.load()) {
100             cd.mIsAlive.store(false);
101         }
102 
103         ValueWriterType mValueWriter;
104         uint64_t mConnectionID;
105         std::atomic<bool> mIsAlive{true};
106 
107         static std::atomic<uint64_t> CONNECTION_ID_COUNTER;
108     };
109 
110     std::string mServiceAddr;
111     std::unique_ptr<::grpc::Server> mServer{nullptr};
112     VehiclePropValuePool mValueObjectPool;
113     std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
114     PowerStateListener mPowerStateListener;
115     std::thread mPowerStateListenerThread{};
116     mutable std::shared_mutex mConnectionMutex;
117     mutable std::shared_mutex mWriterMutex;
118     std::list<ConnectionDescriptor> mValueStreamingConnections;
119 };
120 
121 std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0;
122 
getServerCredentials()123 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
124     // TODO(chenhaosjtuacm): get secured credentials here
125     return ::grpc::InsecureServerCredentials();
126 }
127 
makeGrpcVehicleServer(const VirtualizedVhalServerInfo & serverInfo)128 GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) {
129     return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
130 }
131 
Start()132 GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
133     if (mServer) {
134         LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
135         return *this;
136     }
137 
138     ::grpc::ServerBuilder builder;
139     builder.RegisterService(this);
140     builder.AddListeningPort(mServiceAddr, getServerCredentials());
141     mServer = builder.BuildAndStart();
142 
143     CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
144                    << "please make sure the configuration and permissions are correct";
145 
146     mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
147     return *this;
148 }
149 
Wait()150 void GrpcVehicleServerImpl::Wait() {
151     if (mServer) {
152         mServer->Wait();
153     }
154 
155     if (mPowerStateListenerThread.joinable()) {
156         mPowerStateListenerThread.join();
157     }
158 
159     mPowerStateListenerThread = {};
160     mServer.reset();
161 }
162 
Stop()163 GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
164     if (!mServer) {
165         LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
166         return *this;
167     }
168 
169     mServer->Shutdown();
170     mPowerStateListener.Stop();
171     return *this;
172 }
173 
NumOfActivePropertyValueStream()174 uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
175     std::shared_lock read_lock(mConnectionMutex);
176     return mValueStreamingConnections.size();
177 }
178 
onPropertyValueFromCar(const VehiclePropValue & value,bool updateStatus)179 void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
180                                                    bool updateStatus) {
181     vhal_proto::WrappedVehiclePropValue wrappedPropValue;
182     proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value);
183     wrappedPropValue.set_update_status(updateStatus);
184     std::shared_lock read_lock(mConnectionMutex);
185 
186     bool has_terminated_connections = 0;
187 
188     for (auto& connection : mValueStreamingConnections) {
189         auto writeOK = connection.mValueWriter(wrappedPropValue);
190         if (!writeOK) {
191             LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: "
192                        << connection.mConnectionID;
193             has_terminated_connections = true;
194             connection.mIsAlive.store(false);
195         }
196     }
197 
198     if (!has_terminated_connections) {
199         return;
200     }
201 
202     read_lock.unlock();
203 
204     std::unique_lock write_lock(mConnectionMutex);
205 
206     for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) {
207         if (!itr->mIsAlive.load()) {
208             itr = mValueStreamingConnections.erase(itr);
209         } else {
210             ++itr;
211         }
212     }
213 }
214 
onSetProperty(const VehiclePropValue & value,bool updateStatus)215 StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) {
216     if (value.prop == AP_POWER_STATE_REPORT &&
217         value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) {
218         mGarageModeHandler->HandleHeartbeat();
219     }
220     return GrpcVehicleServer::onSetProperty(value, updateStatus);
221 }
222 
GetAllPropertyConfig(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<vhal_proto::VehiclePropConfig> * stream)223 ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig(
224         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
225         ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) {
226     auto configs = onGetAllPropertyConfig();
227     for (auto& config : configs) {
228         vhal_proto::VehiclePropConfig protoConfig;
229         proto_msg_converter::toProto(&protoConfig, config);
230         if (!stream->Write(protoConfig)) {
231             return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
232         }
233     }
234 
235     return ::grpc::Status::OK;
236 }
237 
SetProperty(::grpc::ServerContext * context,const vhal_proto::WrappedVehiclePropValue * wrappedPropValue,vhal_proto::VehicleHalCallStatus * status)238 ::grpc::Status GrpcVehicleServerImpl::SetProperty(
239         ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
240         vhal_proto::VehicleHalCallStatus* status) {
241     VehiclePropValue value;
242     proto_msg_converter::fromProto(&value, wrappedPropValue->value());
243 
244     auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status()));
245     if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) {
246         return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code");
247     }
248 
249     status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status));
250 
251     return ::grpc::Status::OK;
252 }
253 
StartPropertyValuesStream(::grpc::ServerContext * context,const::google::protobuf::Empty * request,::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue> * stream)254 ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream(
255         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
256         ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) {
257     std::mutex terminateMutex;
258     std::condition_variable terminateCV;
259     std::unique_lock<std::mutex> terminateLock(terminateMutex);
260     bool terminated{false};
261 
262     auto callBack = [stream, &terminateMutex, &terminateCV, &terminated,
263                      this](const vhal_proto::WrappedVehiclePropValue& value) {
264         std::unique_lock lock(mWriterMutex);
265         if (!stream->Write(value)) {
266             std::unique_lock<std::mutex> terminateLock(terminateMutex);
267             terminated = true;
268             terminateLock.unlock();
269             terminateCV.notify_all();
270             return false;
271         }
272         return true;
273     };
274 
275     // Register connection
276     std::unique_lock lock(mConnectionMutex);
277     auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack));
278     lock.unlock();
279 
280     // Never stop until connection lost
281     terminateCV.wait(terminateLock, [&terminated]() { return terminated; });
282 
283     LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID;
284 
285     return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
286 }
287 
288 }  // namespace impl
289 
290 }  // namespace V2_0
291 }  // namespace vehicle
292 }  // namespace automotive
293 }  // namespace hardware
294 }  // namespace android
295