1 //! gRPC frontend client library for netsim.
2 use anyhow::{anyhow, Result};
3 use futures_util::StreamExt;
4 use netsim_proto::frontend;
5 use netsim_proto::frontend_grpc::FrontendServiceClient;
6 use protobuf::well_known_types::empty;
7
8 /// Wrapper struct for application defined ClientResponseReader
9 pub struct ClientResponseReader {
10 /// Delegated handler for reading responses
11 pub handler: Box<dyn ClientResponseReadable>,
12 }
13
14 /// Delegating functions to handler
15 impl ClientResponseReader {
handle_chunk(&self, chunk: &[u8])16 fn handle_chunk(&self, chunk: &[u8]) {
17 self.handler.handle_chunk(chunk);
18 }
19 }
20
21 /// Trait for ClientResponseReader handler functions
22 pub trait ClientResponseReadable {
23 /// Process each chunk of streaming response
handle_chunk(&self, chunk: &[u8])24 fn handle_chunk(&self, chunk: &[u8]);
25 }
26
27 // Enum of Grpc Requests holding the request proto as applicable
28 #[derive(Debug, PartialEq)]
29 pub enum GrpcRequest {
30 GetVersion,
31 ListDevice,
32 Reset,
33 ListCapture,
34 CreateDevice(frontend::CreateDeviceRequest),
35 DeleteChip(frontend::DeleteChipRequest),
36 PatchDevice(frontend::PatchDeviceRequest),
37 PatchCapture(frontend::PatchCaptureRequest),
38 GetCapture(frontend::GetCaptureRequest),
39 }
40
41 // Enum of Grpc Responses holding the response proto as applicable
42 #[derive(Debug, PartialEq)]
43 pub enum GrpcResponse {
44 GetVersion(frontend::VersionResponse),
45 ListDevice(frontend::ListDeviceResponse),
46 Reset,
47 ListCapture(frontend::ListCaptureResponse),
48 CreateDevice(frontend::CreateDeviceResponse),
49 DeleteChip,
50 PatchDevice,
51 PatchCapture,
52 Unknown,
53 }
54
get_capture( client: &FrontendServiceClient, req: &frontend::GetCaptureRequest, client_reader: &mut ClientResponseReader, ) -> Result<()>55 pub fn get_capture(
56 client: &FrontendServiceClient,
57 req: &frontend::GetCaptureRequest,
58 client_reader: &mut ClientResponseReader,
59 ) -> Result<()> {
60 let mut stream = client.get_capture(req)?;
61 // Use block_on to run the async block handling all chunks
62 futures::executor::block_on(async {
63 // Read every available chunk from gRPC stream
64 while let Some(Ok(chunk)) = stream.next().await {
65 let bytes = chunk.capture_stream;
66 client_reader.handle_chunk(&bytes);
67 }
68 });
69
70 Ok(())
71 }
72
send_grpc( client: &FrontendServiceClient, grpc_request: &GrpcRequest, ) -> Result<GrpcResponse>73 pub fn send_grpc(
74 client: &FrontendServiceClient,
75 grpc_request: &GrpcRequest,
76 ) -> Result<GrpcResponse> {
77 match grpc_request {
78 GrpcRequest::GetVersion => {
79 Ok(GrpcResponse::GetVersion(client.get_version(&empty::Empty::new())?))
80 }
81 GrpcRequest::ListDevice => {
82 Ok(GrpcResponse::ListDevice(client.list_device(&empty::Empty::new())?))
83 }
84 GrpcRequest::Reset => {
85 client.reset(&empty::Empty::new())?;
86 Ok(GrpcResponse::Reset)
87 }
88 GrpcRequest::ListCapture => {
89 Ok(GrpcResponse::ListCapture(client.list_capture(&empty::Empty::new())?))
90 }
91 GrpcRequest::CreateDevice(req) => {
92 Ok(GrpcResponse::CreateDevice(client.create_device(req)?))
93 }
94 GrpcRequest::DeleteChip(req) => {
95 client.delete_chip(req)?;
96 Ok(GrpcResponse::DeleteChip)
97 }
98 GrpcRequest::PatchDevice(req) => {
99 client.patch_device(req)?;
100 Ok(GrpcResponse::PatchDevice)
101 }
102 GrpcRequest::PatchCapture(req) => {
103 client.patch_capture(req)?;
104 Ok(GrpcResponse::PatchCapture)
105 }
106 _ => Err(anyhow!(grpcio::RpcStatus::new(grpcio::RpcStatusCode::INVALID_ARGUMENT,))),
107 }
108 }
109