• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(feature = "spawn-ready")]
2 #[path = "../support.rs"]
3 mod support;
4 
5 use tokio::time;
6 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
7 use tower::spawn_ready::{SpawnReady, SpawnReadyLayer};
8 use tower::util::ServiceExt;
9 use tower_test::mock;
10 
11 #[tokio::test(flavor = "current_thread")]
when_inner_is_not_ready()12 async fn when_inner_is_not_ready() {
13     time::pause();
14 
15     let _t = support::trace_init();
16 
17     let layer = SpawnReadyLayer::new();
18     let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);
19 
20     // Make the service NotReady
21     handle.allow(0);
22 
23     assert_pending!(service.poll_ready());
24 
25     // Make the service is Ready
26     handle.allow(1);
27     time::sleep(time::Duration::from_millis(100)).await;
28     assert_ready_ok!(service.poll_ready());
29 }
30 
31 #[tokio::test(flavor = "current_thread")]
when_inner_fails()32 async fn when_inner_fails() {
33     let _t = support::trace_init();
34 
35     let layer = SpawnReadyLayer::new();
36     let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);
37 
38     // Make the service NotReady
39     handle.allow(0);
40     handle.send_error("foobar");
41 
42     assert_eq!(
43         assert_ready_err!(service.poll_ready()).to_string(),
44         "foobar"
45     );
46 }
47 
48 #[tokio::test(flavor = "current_thread")]
propagates_trace_spans()49 async fn propagates_trace_spans() {
50     use tracing::Instrument;
51 
52     let _t = support::trace_init();
53 
54     let span = tracing::info_span!("my_span");
55 
56     let service = support::AssertSpanSvc::new(span.clone());
57     let service = SpawnReady::new(service);
58     let result = tokio::spawn(service.oneshot(()).instrument(span));
59 
60     result.await.expect("service panicked").expect("failed");
61 }
62 
63 #[cfg(test)]
64 #[tokio::test(flavor = "current_thread")]
abort_on_drop()65 async fn abort_on_drop() {
66     let (mock, mut handle) = mock::pair::<(), ()>();
67     let mut svc = SpawnReady::new(mock);
68     handle.allow(0);
69 
70     // Drive the service to readiness until we signal a drop.
71     let (drop_tx, drop_rx) = tokio::sync::oneshot::channel();
72     let mut task = tokio_test::task::spawn(async move {
73         tokio::select! {
74             _ = drop_rx => {}
75             _ = svc.ready() => unreachable!("Service must not become ready"),
76         }
77     });
78     assert_pending!(task.poll());
79     assert_pending!(handle.poll_request());
80 
81     // End the task and ensure that the inner service has been dropped.
82     assert!(drop_tx.send(()).is_ok());
83     tokio_test::assert_ready!(task.poll());
84     tokio::task::yield_now().await;
85     assert!(tokio_test::assert_ready!(handle.poll_request()).is_none());
86 }
87