• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "GrpcVehicleClient.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <thread>
21 
22 #include <android-base/logging.h>
23 #include <grpc++/grpc++.h>
24 
25 #include "VehicleServer.grpc.pb.h"
26 #include "VehicleServer.pb.h"
27 #include "vhal_v2_0/DefaultConfig.h"
28 #include "vhal_v2_0/ProtoMessageConverter.h"
29 
30 namespace android {
31 namespace hardware {
32 namespace automotive {
33 namespace vehicle {
34 namespace V2_0 {
35 
36 namespace impl {
37 
getChannelCredentials()38 static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
39     // TODO(chenhaosjtuacm): get secured credentials here
40     return ::grpc::InsecureChannelCredentials();
41 }
42 
43 class GrpcVehicleClientImpl : public VehicleHalClient {
44   public:
GrpcVehicleClientImpl(const std::string & addr)45     explicit GrpcVehicleClientImpl(const std::string& addr)
46         : mServiceAddr(addr),
47           mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
48           mGrpcStub(vhal_proto::VehicleServer::NewStub(mGrpcChannel)) {
49         StartValuePollingThread();
50     }
51 
~GrpcVehicleClientImpl()52     ~GrpcVehicleClientImpl() {
53         mShuttingDownFlag.store(true);
54         mShutdownCV.notify_all();
55 
56         if (mPollingThread.joinable()) {
57             mPollingThread.join();
58         }
59     }
60 
61     // methods from IVehicleClient
62 
63     std::vector<VehiclePropConfig> getAllPropertyConfig() const override;
64 
65     StatusCode setProperty(const VehiclePropValue& value, bool updateStatus) override;
66 
67   private:
68     void StartValuePollingThread();
69 
70     // private data members
71 
72     std::string mServiceAddr;
73     std::shared_ptr<::grpc::Channel> mGrpcChannel;
74     std::unique_ptr<vhal_proto::VehicleServer::Stub> mGrpcStub;
75     std::thread mPollingThread;
76 
77     std::mutex mShutdownMutex;
78     std::condition_variable mShutdownCV;
79     std::atomic<bool> mShuttingDownFlag{false};
80 };
81 
makeGrpcVehicleClient(const std::string & addr)82 std::unique_ptr<VehicleHalClient> makeGrpcVehicleClient(const std::string& addr) {
83     return std::make_unique<GrpcVehicleClientImpl>(addr);
84 }
85 
getAllPropertyConfig() const86 std::vector<VehiclePropConfig> GrpcVehicleClientImpl::getAllPropertyConfig() const {
87     std::vector<VehiclePropConfig> configs;
88     ::grpc::ClientContext context;
89     auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
90     vhal_proto::VehiclePropConfig protoConfig;
91     while (config_stream->Read(&protoConfig)) {
92         VehiclePropConfig config;
93         proto_msg_converter::fromProto(&config, protoConfig);
94         configs.emplace_back(std::move(config));
95     }
96     auto grpc_status = config_stream->Finish();
97     if (!grpc_status.ok()) {
98         LOG(ERROR) << __func__
99                    << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
100         configs.clear();
101     }
102 
103     return configs;
104 }
105 
setProperty(const VehiclePropValue & value,bool updateStatus)106 StatusCode GrpcVehicleClientImpl::setProperty(const VehiclePropValue& value, bool updateStatus) {
107     ::grpc::ClientContext context;
108     vhal_proto::WrappedVehiclePropValue wrappedProtoValue;
109     vhal_proto::VehicleHalCallStatus vhal_status;
110     proto_msg_converter::toProto(wrappedProtoValue.mutable_value(), value);
111     wrappedProtoValue.set_update_status(updateStatus);
112 
113     auto grpc_status = mGrpcStub->SetProperty(&context, wrappedProtoValue, &vhal_status);
114     if (!grpc_status.ok()) {
115         LOG(ERROR) << __func__ << ": GRPC SetProperty Failed: " << grpc_status.error_message();
116         return StatusCode::INTERNAL_ERROR;
117     }
118 
119     return static_cast<StatusCode>(vhal_status.status_code());
120 }
121 
StartValuePollingThread()122 void GrpcVehicleClientImpl::StartValuePollingThread() {
123     mPollingThread = std::thread([this]() {
124         while (!mShuttingDownFlag.load()) {
125             ::grpc::ClientContext context;
126 
127             std::atomic<bool> rpc_ok{true};
128             std::thread shuttingdown_watcher([this, &rpc_ok, &context]() {
129                 std::unique_lock<std::mutex> shutdownLock(mShutdownMutex);
130                 mShutdownCV.wait(shutdownLock, [this, &rpc_ok]() {
131                     return !rpc_ok.load() || mShuttingDownFlag.load();
132                 });
133                 context.TryCancel();
134             });
135 
136             auto value_stream =
137                     mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
138             LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
139             vhal_proto::WrappedVehiclePropValue wrappedProtoValue;
140             while (!mShuttingDownFlag.load() && value_stream->Read(&wrappedProtoValue)) {
141                 VehiclePropValue value;
142                 proto_msg_converter::fromProto(&value, wrappedProtoValue.value());
143                 onPropertyValue(value, wrappedProtoValue.update_status());
144             }
145 
146             rpc_ok.store(false);
147             mShutdownCV.notify_all();
148             shuttingdown_watcher.join();
149 
150             auto grpc_status = value_stream->Finish();
151             // never reach here until connection lost
152             LOG(ERROR) << __func__
153                        << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
154 
155             // try to reconnect
156         }
157     });
158 }
159 
160 }  // namespace impl
161 
162 }  // namespace V2_0
163 }  // namespace vehicle
164 }  // namespace automotive
165 }  // namespace hardware
166 }  // namespace android
167