• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "backend/grpc_server.h"
16 
17 #include <google/protobuf/util/json_util.h>
18 #include <stdlib.h>
19 
20 #include <cstdint>
21 #include <memory>
22 #include <string>
23 #include <unordered_map>
24 
25 #include "common.pb.h"
26 #include "controller/controller.h"
27 #include "google/protobuf/empty.pb.h"
28 #include "grpcpp/server_context.h"
29 #include "grpcpp/support/status.h"
30 #include "packet_hub/packet_hub.h"
31 #include "packet_streamer.grpc.pb.h"
32 #include "packet_streamer.pb.h"
33 #include "util/log.h"
34 
35 namespace netsim {
36 namespace backend {
37 namespace {
38 
39 using netsim::common::ChipKind;
40 
41 using Stream =
42     ::grpc::ServerReaderWriter<packet::PacketResponse, packet::PacketRequest>;
43 
44 using netsim::startup::Chip;
45 
46 // Mapping from <chip kind, facade id> to streams.
47 std::unordered_map<std::string, Stream *> facade_to_stream;
48 
ChipFacade(ChipKind chip_kind,uint32_t facade_id)49 std::string ChipFacade(ChipKind chip_kind, uint32_t facade_id) {
50   return std::to_string(chip_kind) + "/" + std::to_string(facade_id);
51 }
52 
53 // Service handles the gRPC StreamPackets requests.
54 
55 class ServiceImpl final : public packet::PacketStreamer::Service {
56  public:
StreamPackets(::grpc::ServerContext * context,Stream * stream)57   ::grpc::Status StreamPackets(::grpc::ServerContext *context,
58                                Stream *stream) override {
59     // Now connected to a peer issuing a bi-directional streaming grpc
60     auto peer = context->peer();
61     BtsLog("grpc_server new packet_stream for peer %s", peer.c_str());
62 
63     packet::PacketRequest request;
64 
65     // First packet must have initial_info describing the peer
66     bool success = stream->Read(&request);
67     if (!success || !request.has_initial_info()) {
68       BtsLog("ServiceImpl no initial information or stream closed");
69       return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT,
70                             "Missing initial_info in first packet.");
71     }
72 
73     auto device_name = request.initial_info().name();
74     auto chip_kind = request.initial_info().chip().kind();
75     // multiple chips of the same chip_kind for a device have a name
76     auto chip_name = request.initial_info().chip().id();
77     auto manufacturer = request.initial_info().chip().manufacturer();
78     auto product_name = request.initial_info().chip().product_name();
79     // Add a new chip to the device
80     auto [device_id, chip_id, facade_id] = scene_controller::AddChip(
81         peer, device_name, chip_kind, chip_name, manufacturer, product_name);
82 
83     BtsLog("grpc_server: adding chip %d with facade %d to %s", chip_id,
84            facade_id, device_name.c_str());
85     // connect packet responses from chip facade to the peer
86     facade_to_stream[ChipFacade(chip_kind, facade_id)] = stream;
87     this->ProcessRequests(stream, device_id, chip_kind, facade_id);
88 
89     // no longer able to send responses to peer
90     facade_to_stream.erase(ChipFacade(chip_kind, facade_id));
91 
92     // Remove the chip from the device
93     scene_controller::RemoveChip(device_id, chip_id);
94 
95     BtsLog("grpc_server: removing chip %d from %s", chip_id,
96            device_name.c_str());
97 
98     return ::grpc::Status::OK;
99   }
100 
101   // Convert a protobuf bytes field into shared_ptr<<vec<uint8_t>>.
102   //
103   // Release ownership of the bytes field and convert it to a vector using move
104   // iterators. No copy when called with a mutable reference.
ToSharedVec(std::string * bytes_field)105   std::shared_ptr<std::vector<uint8_t>> ToSharedVec(std::string *bytes_field) {
106     return std::make_shared<std::vector<uint8_t>>(
107         std::make_move_iterator(bytes_field->begin()),
108         std::make_move_iterator(bytes_field->end()));
109   }
110 
111   // Process requests in a loop forwarding packets to the packet_hub and
112   // returning when the channel is closed.
ProcessRequests(Stream * stream,uint32_t device_id,common::ChipKind chip_kind,uint32_t facade_id)113   void ProcessRequests(Stream *stream, uint32_t device_id,
114                        common::ChipKind chip_kind, uint32_t facade_id) {
115     packet::PacketRequest request;
116     while (true) {
117       if (!stream->Read(&request)) {
118         BtsLog("grpc_server: reading stopped for %d", facade_id);
119         break;
120       }
121       // All kinds possible (bt, uwb, wifi), but each rpc only streames one.
122       if (chip_kind == common::ChipKind::BLUETOOTH) {
123         if (!request.has_hci_packet()) {
124           BtsLog("grpc_server: unknown packet type from %d", facade_id);
125           continue;
126         }
127         auto packet_type = request.hci_packet().packet_type();
128         auto packet =
129             ToSharedVec(request.mutable_hci_packet()->mutable_packet());
130         packet_hub::HandleRequest(chip_kind, facade_id, *packet, packet_type);
131       } else if (chip_kind == common::ChipKind::WIFI) {
132         if (!request.has_packet()) {
133           BtsLog("grpc_server: unknown packet type from %d", facade_id);
134           continue;
135         }
136         auto packet = ToSharedVec(request.mutable_packet());
137         packet_hub::HandleRequest(chip_kind, facade_id,
138 
139                                   *packet,
140                                   packet::HCIPacket::HCI_PACKET_UNSPECIFIED);
141       } else {
142         // TODO: add UWB here
143         BtsLog("grpc_server: unknown chip kind");
144       }
145     }
146   }
147 };
148 }  // namespace
149 
150 // handle_response is called by packet_hub to forward a response to the gRPC
151 // stream associated with chip_kind and facade_id.
152 //
153 // When writing, the packet is copied because is borrowed from a shared_ptr and
154 // grpc++ doesn't know about smart pointers.
HandleResponse(ChipKind kind,uint32_t facade_id,const std::vector<uint8_t> & packet,packet::HCIPacket_PacketType packet_type)155 void HandleResponse(ChipKind kind, uint32_t facade_id,
156                     const std::vector<uint8_t> &packet,
157                     packet::HCIPacket_PacketType packet_type) {
158   auto stream = facade_to_stream[ChipFacade(kind, facade_id)];
159   if (stream) {
160     // TODO: lock or caller here because gRPC does not allow overlapping writes.
161     packet::PacketResponse response;
162     // Copies the borrowed packet for output
163     auto str_packet = std::string(packet.begin(), packet.end());
164     if (kind == ChipKind::BLUETOOTH) {
165       response.mutable_hci_packet()->set_packet_type(packet_type);
166       response.mutable_hci_packet()->set_packet(str_packet);
167     } else {
168       response.set_packet(str_packet);
169     }
170     if (!stream->Write(response)) {
171       BtsLog("grpc_server: write failed %d", facade_id);
172     }
173   } else {
174     BtsLog("grpc_server: no stream for %d", facade_id);
175   }
176 }
177 
178 }  // namespace backend
179 
GetBackendService()180 std::unique_ptr<packet::PacketStreamer::Service> GetBackendService() {
181   return std::make_unique<backend::ServiceImpl>();
182 }
183 }  // namespace netsim
184