/* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "GRPCVehicleProxyServer.h" #include "ProtoMessageConverter.h" #include #include #include #include #include #include #include #include namespace android::hardware::automotive::vehicle::virtualization { namespace { std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() { return ::grpc::InsecureServerCredentials(); } template std::vector getPropIdAreaIdsFromProtoRequest(const ProtoRequestType* request) { std::vector propIdAreaIds; for (const proto::PropIdAreaId& protoPropIdAreaId : request->prop_id_area_id()) { PropIdAreaId aidlPropIdAreaId = {}; proto_msg_converter::protoToAidl(protoPropIdAreaId, &aidlPropIdAreaId); propIdAreaIds.push_back(aidlPropIdAreaId); } return propIdAreaIds; } template void aidlResultsToProtoResults(const AidlResultType& aidlResults, ProtoResultType* result) { for (const auto& aidlResult : aidlResults) { auto* protoResult = result->add_result(); proto_msg_converter::aidlToProto(aidlResult, protoResult); } } } // namespace std::atomic GrpcVehicleProxyServer::ConnectionDescriptor::connection_id_counter_{0}; GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::string serverAddr, std::unique_ptr&& hardware) : GrpcVehicleProxyServer(std::vector({serverAddr}), std::move(hardware)) {}; GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector serverAddrs, std::unique_ptr&& hardware) : mServiceAddrs(std::move(serverAddrs)), mHardware(std::move(hardware)) { mHardware->registerOnPropertyChangeEvent( std::make_unique( [this](std::vector values) { OnVehiclePropChange(values); })); } ::grpc::Status GrpcVehicleProxyServer::GetAllPropertyConfig( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter* stream) { for (const auto& config : mHardware->getAllPropertyConfigs()) { proto::VehiclePropConfig protoConfig; proto_msg_converter::aidlToProto(config, &protoConfig); if (!stream->Write(protoConfig)) { return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } } return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::SetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::SetValueResults* results) { std::vector aidlRequests; std::unordered_set requestIds; for (const auto& protoRequest : requests->requests()) { auto& aidlRequest = aidlRequests.emplace_back(); int64_t requestId = protoRequest.request_id(); aidlRequest.requestId = requestId; proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value); requestIds.insert(requestId); } auto waitMtx = std::make_shared(); auto waitCV = std::make_shared(); auto complete = std::make_shared(false); auto tmpResults = std::make_shared(); auto aidlStatus = mHardware->setValues( std::make_shared( [waitMtx, waitCV, complete, tmpResults, &requestIds](std::vector setValueResults) { bool receivedAllResults = false; { std::lock_guard lck(*waitMtx); for (const auto& aidlResult : setValueResults) { auto& protoResult = *tmpResults->add_results(); int64_t requestIdForResult = aidlResult.requestId; protoResult.set_request_id(requestIdForResult); protoResult.set_status( static_cast(aidlResult.status)); requestIds.erase(requestIdForResult); } if (requestIds.empty()) { receivedAllResults = true; *complete = true; } } if (receivedAllResults) { waitCV->notify_all(); } }), aidlRequests); if (aidlStatus != aidlvhal::StatusCode::OK) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware fails to set values, VHAL status: " + toString(aidlStatus)); } std::unique_lock lck(*waitMtx); bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; }); if (!success) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware set values timeout."); } *results = std::move(*tmpResults); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::GetValues(::grpc::ServerContext* context, const proto::VehiclePropValueRequests* requests, proto::GetValueResults* results) { std::vector aidlRequests; std::unordered_set requestIds; for (const auto& protoRequest : requests->requests()) { auto& aidlRequest = aidlRequests.emplace_back(); int64_t requestId = protoRequest.request_id(); aidlRequest.requestId = requestId; proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop); requestIds.insert(requestId); } auto waitMtx = std::make_shared(); auto waitCV = std::make_shared(); auto complete = std::make_shared(false); auto tmpResults = std::make_shared(); auto aidlStatus = mHardware->getValues( std::make_shared( [waitMtx, waitCV, complete, tmpResults, &requestIds](std::vector getValueResults) { bool receivedAllResults = false; { std::lock_guard lck(*waitMtx); for (const auto& aidlResult : getValueResults) { auto& protoResult = *tmpResults->add_results(); int64_t requestIdForResult = aidlResult.requestId; protoResult.set_request_id(requestIdForResult); protoResult.set_status( static_cast(aidlResult.status)); if (aidlResult.prop) { auto* valuePtr = protoResult.mutable_value(); proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr); } requestIds.erase(requestIdForResult); } if (requestIds.empty()) { receivedAllResults = true; *complete = true; } } if (receivedAllResults) { waitCV->notify_all(); } }), aidlRequests); if (aidlStatus != aidlvhal::StatusCode::OK) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware fails to get values, VHAL status: " + toString(aidlStatus)); } std::unique_lock lck(*waitMtx); bool success = waitCV->wait_for(lck, kHardwareOpTimeout, [complete] { return *complete; }); if (!success) { return ::grpc::Status(::grpc::StatusCode::INTERNAL, "The underlying hardware get values timeout."); } *results = std::move(*tmpResults); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::UpdateSampleRate( ::grpc::ServerContext* context, const proto::UpdateSampleRateRequest* request, proto::VehicleHalCallStatus* status) { const auto status_code = mHardware->updateSampleRate(request->prop(), request->area_id(), request->sample_rate()); status->set_status_code(static_cast(status_code)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::Subscribe(::grpc::ServerContext* context, const proto::SubscribeRequest* request, proto::VehicleHalCallStatus* status) { const auto& protoSubscribeOptions = request->options(); aidlvhal::SubscribeOptions aidlSubscribeOptions = {}; proto_msg_converter::protoToAidl(protoSubscribeOptions, &aidlSubscribeOptions); const auto status_code = mHardware->subscribe(aidlSubscribeOptions); status->set_status_code(static_cast(status_code)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::Unsubscribe(::grpc::ServerContext* context, const proto::UnsubscribeRequest* request, proto::VehicleHalCallStatus* status) { int32_t propId = request->prop_id(); int32_t areaId = request->area_id(); const auto status_code = mHardware->unsubscribe(propId, areaId); status->set_status_code(static_cast(status_code)); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::CheckHealth(::grpc::ServerContext* context, const ::google::protobuf::Empty*, proto::VehicleHalCallStatus* status) { status->set_status_code(static_cast(mHardware->checkHealth())); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::Dump(::grpc::ServerContext* context, const proto::DumpOptions* options, proto::DumpResult* result) { std::vector dumpOptionStrings(options->options().begin(), options->options().end()); auto dumpResult = mHardware->dump(dumpOptionStrings); result->set_caller_should_dump_state(dumpResult.callerShouldDumpState); result->set_buffer(dumpResult.buffer); result->set_refresh_property_configs(dumpResult.refreshPropertyConfigs); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::StartPropertyValuesStream( ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter* stream) { auto conn = std::make_shared(stream); { std::lock_guard lck(mConnectionMutex); mValueStreamingConnections.push_back(conn); } conn->Wait(); LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn->ID(); return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); } ::grpc::Status GrpcVehicleProxyServer::GetMinMaxSupportedValues( ::grpc::ServerContext* context, const proto::GetMinMaxSupportedValuesRequest* request, proto::GetMinMaxSupportedValuesResult* result) { std::vector propIdAreaIds = getPropIdAreaIdsFromProtoRequest(request); std::vector minMaxSupportedValueResults = mHardware->getMinMaxSupportedValues(propIdAreaIds); aidlResultsToProtoResults(minMaxSupportedValueResults, result); return ::grpc::Status::OK; } ::grpc::Status GrpcVehicleProxyServer::GetSupportedValuesLists( ::grpc::ServerContext* context, const proto::GetSupportedValuesListsRequest* request, proto::GetSupportedValuesListsResult* result) { std::vector propIdAreaIds = getPropIdAreaIdsFromProtoRequest(request); std::vector supportedValuesListResults = mHardware->getSupportedValuesLists(propIdAreaIds); aidlResultsToProtoResults(supportedValuesListResults, result); return ::grpc::Status::OK; } void GrpcVehicleProxyServer::OnVehiclePropChange( const std::vector& values) { std::unordered_set brokenConn; proto::VehiclePropValues protoValues; for (const auto& value : values) { auto* protoValuePtr = protoValues.add_values(); proto_msg_converter::aidlToProto(value, protoValuePtr); } { std::shared_lock read_lock(mConnectionMutex); for (auto& connection : mValueStreamingConnections) { auto writeOK = connection->Write(protoValues); if (!writeOK) { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << connection->ID(); brokenConn.insert(connection->ID()); } } } if (brokenConn.empty()) { return; } std::unique_lock write_lock(mConnectionMutex); mValueStreamingConnections.erase( std::remove_if(mValueStreamingConnections.begin(), mValueStreamingConnections.end(), [&brokenConn](const auto& conn) { return brokenConn.find(conn->ID()) != brokenConn.end(); }), mValueStreamingConnections.end()); } GrpcVehicleProxyServer& GrpcVehicleProxyServer::Start() { if (mServer) { LOG(WARNING) << __func__ << ": GrpcVehicleProxyServer has already started."; return *this; } ::grpc::ServerBuilder builder; builder.RegisterService(this); for (const std::string& serviceAddr : mServiceAddrs) { builder.AddListeningPort(serviceAddr, getServerCredentials()); } mServer = builder.BuildAndStart(); CHECK(mServer) << __func__ << ": failed to create the GRPC server, " << "please make sure the configuration and permissions are correct"; return *this; } GrpcVehicleProxyServer& GrpcVehicleProxyServer::Shutdown() { std::shared_lock read_lock(mConnectionMutex); for (auto& conn : mValueStreamingConnections) { conn->Shutdown(); } if (mServer) { mServer->Shutdown(); } return *this; } void GrpcVehicleProxyServer::Wait() { if (mServer) { mServer->Wait(); } mServer.reset(); } GrpcVehicleProxyServer::ConnectionDescriptor::~ConnectionDescriptor() { Shutdown(); } bool GrpcVehicleProxyServer::ConnectionDescriptor::Write(const proto::VehiclePropValues& values) { if (!mStream) { LOG(ERROR) << __func__ << ": Empty stream. ID: " << ID(); Shutdown(); return false; } { std::lock_guard lck(*mMtx); if (!mShutdownFlag && mStream->Write(values)) { return true; } else { LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " << ID(); } } Shutdown(); return false; } void GrpcVehicleProxyServer::ConnectionDescriptor::Wait() { std::unique_lock lck(*mMtx); mCV->wait(lck, [this] { return mShutdownFlag; }); } void GrpcVehicleProxyServer::ConnectionDescriptor::Shutdown() { { std::lock_guard lck(*mMtx); mShutdownFlag = true; } mCV->notify_all(); } } // namespace android::hardware::automotive::vehicle::virtualization