• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! gRPC frontend client library for netsim.
2 use anyhow::{anyhow, Result};
3 use futures_util::StreamExt;
4 use netsim_proto::frontend;
5 use netsim_proto::frontend_grpc::FrontendServiceClient;
6 use protobuf::well_known_types::empty;
7 
8 /// Wrapper struct for application defined ClientResponseReader
9 pub struct ClientResponseReader {
10     /// Delegated handler for reading responses
11     pub handler: Box<dyn ClientResponseReadable>,
12 }
13 
14 /// Delegating functions to handler
15 impl ClientResponseReader {
handle_chunk(&self, chunk: &[u8])16     fn handle_chunk(&self, chunk: &[u8]) {
17         self.handler.handle_chunk(chunk);
18     }
19 }
20 
21 /// Trait for ClientResponseReader handler functions
22 pub trait ClientResponseReadable {
23     /// Process each chunk of streaming response
handle_chunk(&self, chunk: &[u8])24     fn handle_chunk(&self, chunk: &[u8]);
25 }
26 
27 // Enum of Grpc Requests holding the request proto as applicable
28 #[derive(Debug, PartialEq)]
29 pub enum GrpcRequest {
30     GetVersion,
31     ListDevice,
32     Reset,
33     ListCapture,
34     CreateDevice(frontend::CreateDeviceRequest),
35     DeleteChip(frontend::DeleteChipRequest),
36     PatchDevice(frontend::PatchDeviceRequest),
37     PatchCapture(frontend::PatchCaptureRequest),
38     GetCapture(frontend::GetCaptureRequest),
39 }
40 
41 // Enum of Grpc Responses holding the response proto as applicable
42 #[derive(Debug, PartialEq)]
43 pub enum GrpcResponse {
44     GetVersion(frontend::VersionResponse),
45     ListDevice(frontend::ListDeviceResponse),
46     Reset,
47     ListCapture(frontend::ListCaptureResponse),
48     CreateDevice(frontend::CreateDeviceResponse),
49     DeleteChip,
50     PatchDevice,
51     PatchCapture,
52     Unknown,
53 }
54 
get_capture( client: &FrontendServiceClient, req: &frontend::GetCaptureRequest, client_reader: &mut ClientResponseReader, ) -> Result<()>55 pub fn get_capture(
56     client: &FrontendServiceClient,
57     req: &frontend::GetCaptureRequest,
58     client_reader: &mut ClientResponseReader,
59 ) -> Result<()> {
60     let mut stream = client.get_capture(req)?;
61     // Use block_on to run the async block handling all chunks
62     futures::executor::block_on(async {
63         // Read every available chunk from gRPC stream
64         while let Some(Ok(chunk)) = stream.next().await {
65             let bytes = chunk.capture_stream;
66             client_reader.handle_chunk(&bytes);
67         }
68     });
69 
70     Ok(())
71 }
72 
send_grpc( client: &FrontendServiceClient, grpc_request: &GrpcRequest, ) -> Result<GrpcResponse>73 pub fn send_grpc(
74     client: &FrontendServiceClient,
75     grpc_request: &GrpcRequest,
76 ) -> Result<GrpcResponse> {
77     match grpc_request {
78         GrpcRequest::GetVersion => {
79             Ok(GrpcResponse::GetVersion(client.get_version(&empty::Empty::new())?))
80         }
81         GrpcRequest::ListDevice => {
82             Ok(GrpcResponse::ListDevice(client.list_device(&empty::Empty::new())?))
83         }
84         GrpcRequest::Reset => {
85             client.reset(&empty::Empty::new())?;
86             Ok(GrpcResponse::Reset)
87         }
88         GrpcRequest::ListCapture => {
89             Ok(GrpcResponse::ListCapture(client.list_capture(&empty::Empty::new())?))
90         }
91         GrpcRequest::CreateDevice(req) => {
92             Ok(GrpcResponse::CreateDevice(client.create_device(req)?))
93         }
94         GrpcRequest::DeleteChip(req) => {
95             client.delete_chip(req)?;
96             Ok(GrpcResponse::DeleteChip)
97         }
98         GrpcRequest::PatchDevice(req) => {
99             client.patch_device(req)?;
100             Ok(GrpcResponse::PatchDevice)
101         }
102         GrpcRequest::PatchCapture(req) => {
103             client.patch_capture(req)?;
104             Ok(GrpcResponse::PatchCapture)
105         }
106         _ => Err(anyhow!(grpcio::RpcStatus::new(grpcio::RpcStatusCode::INVALID_ARGUMENT,))),
107     }
108 }
109