1 #![cfg(feature = "spawn-ready")]
2 #[path = "../support.rs"]
3 mod support;
4
5 use tokio::time;
6 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
7 use tower::spawn_ready::{SpawnReady, SpawnReadyLayer};
8 use tower::util::ServiceExt;
9 use tower_test::mock;
10
11 #[tokio::test(flavor = "current_thread")]
when_inner_is_not_ready()12 async fn when_inner_is_not_ready() {
13 time::pause();
14
15 let _t = support::trace_init();
16
17 let layer = SpawnReadyLayer::new();
18 let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);
19
20 // Make the service NotReady
21 handle.allow(0);
22
23 assert_pending!(service.poll_ready());
24
25 // Make the service is Ready
26 handle.allow(1);
27 time::sleep(time::Duration::from_millis(100)).await;
28 assert_ready_ok!(service.poll_ready());
29 }
30
31 #[tokio::test(flavor = "current_thread")]
when_inner_fails()32 async fn when_inner_fails() {
33 let _t = support::trace_init();
34
35 let layer = SpawnReadyLayer::new();
36 let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer);
37
38 // Make the service NotReady
39 handle.allow(0);
40 handle.send_error("foobar");
41
42 assert_eq!(
43 assert_ready_err!(service.poll_ready()).to_string(),
44 "foobar"
45 );
46 }
47
48 #[tokio::test(flavor = "current_thread")]
propagates_trace_spans()49 async fn propagates_trace_spans() {
50 use tracing::Instrument;
51
52 let _t = support::trace_init();
53
54 let span = tracing::info_span!("my_span");
55
56 let service = support::AssertSpanSvc::new(span.clone());
57 let service = SpawnReady::new(service);
58 let result = tokio::spawn(service.oneshot(()).instrument(span));
59
60 result.await.expect("service panicked").expect("failed");
61 }
62
63 #[cfg(test)]
64 #[tokio::test(flavor = "current_thread")]
abort_on_drop()65 async fn abort_on_drop() {
66 let (mock, mut handle) = mock::pair::<(), ()>();
67 let mut svc = SpawnReady::new(mock);
68 handle.allow(0);
69
70 // Drive the service to readiness until we signal a drop.
71 let (drop_tx, drop_rx) = tokio::sync::oneshot::channel();
72 let mut task = tokio_test::task::spawn(async move {
73 tokio::select! {
74 _ = drop_rx => {}
75 _ = svc.ready() => unreachable!("Service must not become ready"),
76 }
77 });
78 assert_pending!(task.poll());
79 assert_pending!(handle.poll_request());
80
81 // End the task and ensure that the inner service has been dropped.
82 assert!(drop_tx.send(()).is_ok());
83 tokio_test::assert_ready!(task.poll());
84 tokio::task::yield_now().await;
85 assert!(tokio_test::assert_ready!(handle.poll_request()).is_none());
86 }
87