• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::time::Duration;
2 use tokio_stream::{Stream, StreamExt};
3 use tonic::transport::Channel;
4 
5 use echo_proto::echo::{echo_client::EchoClient, EchoRequest};
6 
echo_requests_iter() -> impl Stream<Item = EchoRequest>7 fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
8     tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
9         message: format!("msg {:02}", i),
10     })
11 }
12 
streaming_echo(client: &mut EchoClient<Channel>, num: usize)13 async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
14     let stream = client
15         .server_streaming_echo(EchoRequest {
16             message: "foo".into(),
17         })
18         .await
19         .unwrap()
20         .into_inner();
21 
22     // stream is infinite - take just 5 elements and then disconnect
23     let mut stream = stream.take(num);
24     while let Some(item) = stream.next().await {
25         println!("\treceived: {}", item.unwrap().message);
26     }
27     // stream is droped here and the disconnect info is send to server
28 }
29 
bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize)30 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
31     let in_stream = echo_requests_iter().take(num);
32 
33     let response = client
34         .bidirectional_streaming_echo(in_stream)
35         .await
36         .unwrap();
37 
38     let mut resp_stream = response.into_inner();
39 
40     while let Some(received) = resp_stream.next().await {
41         let received = received.unwrap();
42         println!("\treceived message: `{}`", received.message);
43     }
44 }
45 
bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration)46 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
47     let in_stream = echo_requests_iter().throttle(dur);
48 
49     let response = client
50         .bidirectional_streaming_echo(in_stream)
51         .await
52         .unwrap();
53 
54     let mut resp_stream = response.into_inner();
55 
56     while let Some(received) = resp_stream.next().await {
57         let received = received.unwrap();
58         println!("\treceived message: `{}`", received.message);
59     }
60 }
61 
62 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>63 async fn main() -> Result<(), Box<dyn std::error::Error>> {
64     let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
65 
66     println!("Streaming echo:");
67     streaming_echo(&mut client, 5).await;
68     tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
69 
70     // Echo stream that sends 17 requests then graceful end that connection
71     println!("\r\nBidirectional stream echo:");
72     bidirectional_streaming_echo(&mut client, 17).await;
73 
74     // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
75     // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
76     // graceful client disconnection (above example) on the server side.
77     println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
78     bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
79 
80     Ok(())
81 }
82