1 use std::time::Duration;
2 use tokio_stream::{Stream, StreamExt};
3 use tonic::transport::Channel;
4
5 use echo_proto::echo::{echo_client::EchoClient, EchoRequest};
6
echo_requests_iter() -> impl Stream<Item = EchoRequest>7 fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
8 tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
9 message: format!("msg {:02}", i),
10 })
11 }
12
streaming_echo(client: &mut EchoClient<Channel>, num: usize)13 async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
14 let stream = client
15 .server_streaming_echo(EchoRequest {
16 message: "foo".into(),
17 })
18 .await
19 .unwrap()
20 .into_inner();
21
22 // stream is infinite - take just 5 elements and then disconnect
23 let mut stream = stream.take(num);
24 while let Some(item) = stream.next().await {
25 println!("\treceived: {}", item.unwrap().message);
26 }
27 // stream is droped here and the disconnect info is send to server
28 }
29
bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize)30 async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
31 let in_stream = echo_requests_iter().take(num);
32
33 let response = client
34 .bidirectional_streaming_echo(in_stream)
35 .await
36 .unwrap();
37
38 let mut resp_stream = response.into_inner();
39
40 while let Some(received) = resp_stream.next().await {
41 let received = received.unwrap();
42 println!("\treceived message: `{}`", received.message);
43 }
44 }
45
bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration)46 async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
47 let in_stream = echo_requests_iter().throttle(dur);
48
49 let response = client
50 .bidirectional_streaming_echo(in_stream)
51 .await
52 .unwrap();
53
54 let mut resp_stream = response.into_inner();
55
56 while let Some(received) = resp_stream.next().await {
57 let received = received.unwrap();
58 println!("\treceived message: `{}`", received.message);
59 }
60 }
61
62 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>63 async fn main() -> Result<(), Box<dyn std::error::Error>> {
64 let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
65
66 println!("Streaming echo:");
67 streaming_echo(&mut client, 5).await;
68 tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
69
70 // Echo stream that sends 17 requests then graceful end that connection
71 println!("\r\nBidirectional stream echo:");
72 bidirectional_streaming_echo(&mut client, 17).await;
73
74 // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
75 // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
76 // graceful client disconnection (above example) on the server side.
77 println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
78 bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
79
80 Ok(())
81 }
82