1 // Copyright 2022 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "backend/grpc_server.h"
16
17 #include <google/protobuf/util/json_util.h>
18 #include <stdlib.h>
19
20 #include <cstdint>
21 #include <memory>
22 #include <string>
23 #include <unordered_map>
24
25 #include "common.pb.h"
26 #include "controller/controller.h"
27 #include "google/protobuf/empty.pb.h"
28 #include "grpcpp/server_context.h"
29 #include "grpcpp/support/status.h"
30 #include "packet_hub/packet_hub.h"
31 #include "packet_streamer.grpc.pb.h"
32 #include "packet_streamer.pb.h"
33 #include "util/log.h"
34
35 namespace netsim {
36 namespace backend {
37 namespace {
38
39 using netsim::common::ChipKind;
40
41 using Stream =
42 ::grpc::ServerReaderWriter<packet::PacketResponse, packet::PacketRequest>;
43
44 using netsim::startup::Chip;
45
46 // Mapping from <chip kind, facade id> to streams.
47 std::unordered_map<std::string, Stream *> facade_to_stream;
48
ChipFacade(ChipKind chip_kind,uint32_t facade_id)49 std::string ChipFacade(ChipKind chip_kind, uint32_t facade_id) {
50 return std::to_string(chip_kind) + "/" + std::to_string(facade_id);
51 }
52
53 // Service handles the gRPC StreamPackets requests.
54
55 class ServiceImpl final : public packet::PacketStreamer::Service {
56 public:
StreamPackets(::grpc::ServerContext * context,Stream * stream)57 ::grpc::Status StreamPackets(::grpc::ServerContext *context,
58 Stream *stream) override {
59 // Now connected to a peer issuing a bi-directional streaming grpc
60 auto peer = context->peer();
61 BtsLog("grpc_server new packet_stream for peer %s", peer.c_str());
62
63 packet::PacketRequest request;
64
65 // First packet must have initial_info describing the peer
66 bool success = stream->Read(&request);
67 if (!success || !request.has_initial_info()) {
68 BtsLog("ServiceImpl no initial information or stream closed");
69 return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
70 "Missing initial_info in first packet.");
71 }
72
73 auto device_name = request.initial_info().name();
74 auto chip_kind = request.initial_info().chip().kind();
75 // multiple chips of the same chip_kind for a device have a name
76 auto chip_name = request.initial_info().chip().id();
77 auto manufacturer = request.initial_info().chip().manufacturer();
78 auto product_name = request.initial_info().chip().product_name();
79 // Add a new chip to the device
80 auto [device_id, chip_id, facade_id] = scene_controller::AddChip(
81 peer, device_name, chip_kind, chip_name, manufacturer, product_name);
82
83 BtsLog("grpc_server: adding chip %d with facade %d to %s", chip_id,
84 facade_id, device_name.c_str());
85 // connect packet responses from chip facade to the peer
86 facade_to_stream[ChipFacade(chip_kind, facade_id)] = stream;
87 this->ProcessRequests(stream, device_id, chip_kind, facade_id);
88
89 // no longer able to send responses to peer
90 facade_to_stream.erase(ChipFacade(chip_kind, facade_id));
91
92 // Remove the chip from the device
93 scene_controller::RemoveChip(device_id, chip_id);
94
95 BtsLog("grpc_server: removing chip %d from %s", chip_id,
96 device_name.c_str());
97
98 return ::grpc::Status::OK;
99 }
100
101 // Convert a protobuf bytes field into shared_ptr<<vec<uint8_t>>.
102 //
103 // Release ownership of the bytes field and convert it to a vector using move
104 // iterators. No copy when called with a mutable reference.
ToSharedVec(std::string * bytes_field)105 std::shared_ptr<std::vector<uint8_t>> ToSharedVec(std::string *bytes_field) {
106 return std::make_shared<std::vector<uint8_t>>(
107 std::make_move_iterator(bytes_field->begin()),
108 std::make_move_iterator(bytes_field->end()));
109 }
110
111 // Process requests in a loop forwarding packets to the packet_hub and
112 // returning when the channel is closed.
ProcessRequests(Stream * stream,uint32_t device_id,common::ChipKind chip_kind,uint32_t facade_id)113 void ProcessRequests(Stream *stream, uint32_t device_id,
114 common::ChipKind chip_kind, uint32_t facade_id) {
115 packet::PacketRequest request;
116 while (true) {
117 if (!stream->Read(&request)) {
118 BtsLog("grpc_server: reading stopped for %d", facade_id);
119 break;
120 }
121 // All kinds possible (bt, uwb, wifi), but each rpc only streames one.
122 if (chip_kind == common::ChipKind::BLUETOOTH) {
123 if (!request.has_hci_packet()) {
124 BtsLog("grpc_server: unknown packet type from %d", facade_id);
125 continue;
126 }
127 auto packet_type = request.hci_packet().packet_type();
128 auto packet =
129 ToSharedVec(request.mutable_hci_packet()->mutable_packet());
130 packet_hub::HandleRequest(chip_kind, facade_id, *packet, packet_type);
131 } else if (chip_kind == common::ChipKind::WIFI) {
132 if (!request.has_packet()) {
133 BtsLog("grpc_server: unknown packet type from %d", facade_id);
134 continue;
135 }
136 auto packet = ToSharedVec(request.mutable_packet());
137 packet_hub::HandleRequest(chip_kind, facade_id,
138
139 *packet,
140 packet::HCIPacket::HCI_PACKET_UNSPECIFIED);
141 } else {
142 // TODO: add UWB here
143 BtsLog("grpc_server: unknown chip kind");
144 }
145 }
146 }
147 };
148 } // namespace
149
150 // handle_response is called by packet_hub to forward a response to the gRPC
151 // stream associated with chip_kind and facade_id.
152 //
153 // When writing, the packet is copied because is borrowed from a shared_ptr and
154 // grpc++ doesn't know about smart pointers.
HandleResponse(ChipKind kind,uint32_t facade_id,const std::vector<uint8_t> & packet,packet::HCIPacket_PacketType packet_type)155 void HandleResponse(ChipKind kind, uint32_t facade_id,
156 const std::vector<uint8_t> &packet,
157 packet::HCIPacket_PacketType packet_type) {
158 auto stream = facade_to_stream[ChipFacade(kind, facade_id)];
159 if (stream) {
160 // TODO: lock or caller here because gRPC does not allow overlapping writes.
161 packet::PacketResponse response;
162 // Copies the borrowed packet for output
163 auto str_packet = std::string(packet.begin(), packet.end());
164 if (kind == ChipKind::BLUETOOTH) {
165 response.mutable_hci_packet()->set_packet_type(packet_type);
166 response.mutable_hci_packet()->set_packet(str_packet);
167 } else {
168 response.set_packet(str_packet);
169 }
170 if (!stream->Write(response)) {
171 BtsLog("grpc_server: write failed %d", facade_id);
172 }
173 } else {
174 BtsLog("grpc_server: no stream for %d", facade_id);
175 }
176 }
177
178 } // namespace backend
179
GetBackendService()180 std::unique_ptr<packet::PacketStreamer::Service> GetBackendService() {
181 return std::make_unique<backend::ServiceImpl>();
182 }
183 } // namespace netsim
184