Lines Matching refs:rx
51 let (tx, mut rx) = mpsc::channel::<i32>(16); in send_recv_with_buffer()
63 let val = rx.recv().await; in send_recv_with_buffer()
66 let val = rx.recv().await; in send_recv_with_buffer()
69 let val = rx.recv().await; in send_recv_with_buffer()
76 let (tx, mut rx) = mpsc::channel::<i32>(2); in reserve_disarm()
98 rx.recv().await.unwrap(); in reserve_disarm()
116 let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16); in send_recv_stream_with_buffer()
117 let mut rx = Box::pin(rx); in send_recv_stream_with_buffer() localVariable
124 assert_eq!(Some(1), rx.next().await); in send_recv_stream_with_buffer()
125 assert_eq!(Some(2), rx.next().await); in send_recv_stream_with_buffer()
126 assert_eq!(None, rx.next().await); in send_recv_stream_with_buffer()
132 let (tx, mut rx) = mpsc::channel(16); in async_send_recv_with_buffer()
139 assert_eq!(Some(1), rx.recv().await); in async_send_recv_with_buffer()
140 assert_eq!(Some(2), rx.recv().await); in async_send_recv_with_buffer()
141 assert_eq!(None, rx.recv().await); in async_send_recv_with_buffer()
147 let (tx, mut rx) = mpsc::channel(2); in async_send_recv_many_with_buffer()
151 assert_eq!(0, rx.recv_many(&mut buffer, 0).await); in async_send_recv_many_with_buffer()
163 recv_count += rx.recv_many(&mut buffer, limit).await; in async_send_recv_many_with_buffer()
168 assert_eq!(0, rx.recv_many(&mut buffer, limit).await); in async_send_recv_many_with_buffer()
179 let (tx1, mut rx) = mpsc::channel(1); in start_send_past_cap()
193 assert!(rx.recv().await.is_some()); in start_send_past_cap()
202 assert!(rx.recv().await.is_none()); in start_send_past_cap()
214 let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); in send_recv_unbounded()
220 assert_eq!(rx.recv().await, Some(1)); in send_recv_unbounded()
221 assert_eq!(rx.recv().await, Some(2)); in send_recv_unbounded()
225 assert!(rx.recv().await.is_none()); in send_recv_unbounded()
230 let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); in send_recv_many_unbounded()
235 rx.recv_many(&mut buffer, 0).await; in send_recv_many_unbounded()
243 rx.recv_many(&mut buffer, 0).await; in send_recv_many_unbounded()
248 count += rx.recv_many(&mut buffer, 1).await; in send_recv_many_unbounded()
263 count = rx.recv_many(&mut buffer, 32).await; in send_recv_many_unbounded()
273 assert_eq!(0, rx.recv_many(&mut buffer, 4).await); in send_recv_many_unbounded()
274 assert!(rx.recv().await.is_none()); in send_recv_many_unbounded()
282 let (tx, mut rx) = mpsc::channel(100); in send_recv_many_bounded_capacity()
295 assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await); in send_recv_many_bounded_capacity()
300 assert_eq!(1, rx.recv_many(&mut buffer, limit).await); in send_recv_many_bounded_capacity()
311 assert_eq!(1, rx.recv_many(&mut buffer, limit).await); in send_recv_many_bounded_capacity()
315 assert_eq!(0, rx.recv_many(&mut buffer, limit).await); in send_recv_many_bounded_capacity()
324 let (tx, mut rx) = mpsc::unbounded_channel(); in send_recv_many_unbounded_capacity()
337 assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await); in send_recv_many_unbounded_capacity()
342 assert_eq!(1, rx.recv_many(&mut buffer, limit).await); in send_recv_many_unbounded_capacity()
353 assert_eq!(1, rx.recv_many(&mut buffer, limit).await); in send_recv_many_unbounded_capacity()
357 assert_eq!(0, rx.recv_many(&mut buffer, limit).await); in send_recv_many_unbounded_capacity()
364 let (tx, mut rx) = mpsc::unbounded_channel(); in async_send_recv_unbounded()
371 assert_eq!(Some(1), rx.recv().await); in async_send_recv_unbounded()
372 assert_eq!(Some(2), rx.recv().await); in async_send_recv_unbounded()
373 assert_eq!(None, rx.recv().await); in async_send_recv_unbounded()
381 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>(); in send_recv_stream_unbounded()
383 let mut rx = Box::pin(rx); in send_recv_stream_unbounded() localVariable
390 assert_eq!(Some(1), rx.next().await); in send_recv_stream_unbounded()
391 assert_eq!(Some(2), rx.next().await); in send_recv_stream_unbounded()
392 assert_eq!(None, rx.next().await); in send_recv_stream_unbounded()
399 let (tx, mut rx) = mpsc::channel(100); in no_t_bounds_buffer()
404 is_debug(&rx); in no_t_bounds_buffer()
408 assert!(rx.recv().await.is_some()); in no_t_bounds_buffer()
415 let (tx, mut rx) = mpsc::unbounded_channel(); in no_t_bounds_unbounded()
420 is_debug(&rx); in no_t_bounds_unbounded()
424 assert!(rx.recv().await.is_some()); in no_t_bounds_unbounded()
430 let (tx, mut rx) = mpsc::channel::<i32>(1); in send_recv_buffer_limited()
443 assert!(rx.recv().await.is_some()); in send_recv_buffer_limited()
455 assert!(rx.recv().await.is_some()); in send_recv_buffer_limited()
460 let (tx, mut rx) = mpsc::channel::<i32>(10); in recv_close_gets_none_idle()
462 rx.close(); in recv_close_gets_none_idle()
464 assert!(rx.recv().await.is_none()); in recv_close_gets_none_idle()
472 let (tx1, mut rx) = mpsc::channel::<i32>(1); in recv_close_gets_none_reserved()
479 rx.close(); in recv_close_gets_none_reserved()
485 let mut recv = tokio_test::task::spawn(rx.recv()); in recv_close_gets_none_reserved()
495 assert!(rx.recv().await.is_none()); in recv_close_gets_none_reserved()
500 let (_, mut rx) = mpsc::channel::<i32>(10); in tx_close_gets_none()
501 assert!(rx.recv().await.is_none()); in tx_close_gets_none()
506 let (tx, mut rx) = mpsc::channel(1); in try_send_fail()
516 assert_eq!(rx.recv().await, Some("hello")); in try_send_fail()
521 assert_eq!(rx.recv().await, Some("goodbye")); in try_send_fail()
522 assert!(rx.recv().await.is_none()); in try_send_fail()
527 let (tx, mut rx) = mpsc::channel(1); in try_send_fail_with_try_recv()
537 assert_eq!(rx.try_recv(), Ok("hello")); in try_send_fail_with_try_recv()
542 assert_eq!(rx.try_recv(), Ok("goodbye")); in try_send_fail_with_try_recv()
543 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); in try_send_fail_with_try_recv()
558 let (tx, rx) = mpsc::channel::<()>(1); in try_reserve_many_zero()
567 drop(rx); in try_reserve_many_zero()
578 let (tx, rx) = mpsc::channel::<()>(1); in reserve_many_zero()
587 drop(rx); in reserve_many_zero()
597 let (tx, rx) = mpsc::channel::<()>(1); in try_reserve_many_edge_cases()
615 drop(rx); in try_reserve_many_edge_cases()
621 let (tx, mut rx) = mpsc::channel(1); in try_reserve_fails()
633 assert_eq!(rx.recv().await, Some("foo")); in try_reserve_fails()
644 let (tx, mut rx) = mpsc::channel(100); in reserve_many_and_send()
648 assert_eq!(rx.recv().await, Some("foo")); in reserve_many_and_send()
650 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); in reserve_many_and_send()
655 let (tx, mut rx) = mpsc::channel(100); in try_reserve_many_and_send()
659 assert_eq!(rx.recv().await, Some("foo")); in try_reserve_many_and_send()
661 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); in try_reserve_many_and_send()
667 let (tx, rx) = mpsc::channel::<()>(100); in reserve_many_on_closed_channel()
668 drop(rx); in reserve_many_on_closed_channel()
674 let (tx, rx) = mpsc::channel::<usize>(100); in try_reserve_many_on_closed_channel()
675 drop(rx); in try_reserve_many_on_closed_channel()
688 let (tx, mut rx) = mpsc::channel::<usize>(n); in try_reserve_many_full()
715 assert_eq!(rx.recv().await, Some(0)); in try_reserve_many_full()
769 let (tx, rx) = mpsc::channel(100); in dropping_rx_closes_channel()
774 drop(rx); in dropping_rx_closes_channel()
782 let (tx, rx) = mpsc::channel(100); in dropping_rx_closes_channel_for_try()
787 drop(rx); in dropping_rx_closes_channel_for_try()
806 let (tx, rx) = mpsc::channel(100); in unconsumed_messages_are_dropped()
812 drop((tx, rx)); in unconsumed_messages_are_dropped()
820 let (tx, mut rx) = mpsc::channel::<u8>(1); in blocking_recv()
823 assert_eq!(Some(10), rx.blocking_recv()); in blocking_recv()
838 let (_tx, mut rx) = mpsc::channel::<()>(1); in blocking_recv_async()
839 let _ = rx.blocking_recv(); in blocking_recv_async()
845 let (tx, mut rx) = mpsc::channel::<u8>(1); in blocking_send()
854 assert_eq!(Some(10), rx.recv().await); in blocking_send()
870 let (tx, mut rx) = mpsc::channel::<()>(100); in ready_close_cancel_bounded()
875 rx.close(); in ready_close_cancel_bounded()
877 let mut recv = tokio_test::task::spawn(rx.recv()); in ready_close_cancel_bounded()
890 let (tx1, mut rx) = mpsc::channel::<()>(1); in permit_available_not_acquired_close()
898 rx.close(); in permit_available_not_acquired_close()
904 assert!(rx.recv().await.is_none()); in permit_available_not_acquired_close()
909 let (tx, mut rx) = mpsc::channel(5); in try_recv_bounded()
918 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
919 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
920 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
921 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
922 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
923 assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); in try_recv_bounded()
929 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
933 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
934 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
935 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
936 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
937 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
938 assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); in try_recv_bounded()
944 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
945 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
946 assert_eq!(Ok("hello"), rx.try_recv()); in try_recv_bounded()
947 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); in try_recv_bounded()
953 let (tx, mut rx) = mpsc::unbounded_channel(); in try_recv_unbounded()
960 assert_eq!(rx.try_recv(), Ok(i)); in try_recv_unbounded()
963 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); in try_recv_unbounded()
965 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); in try_recv_unbounded()
971 let (tx, mut rx) = mpsc::channel::<()>(5); in try_recv_close_while_empty_bounded()
973 assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); in try_recv_close_while_empty_bounded()
975 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); in try_recv_close_while_empty_bounded()
980 let (tx, mut rx) = mpsc::unbounded_channel::<()>(); in try_recv_close_while_empty_unbounded()
982 assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); in try_recv_close_while_empty_unbounded()
984 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); in try_recv_close_while_empty_unbounded()
993 let (tx, rx) = mpsc::channel(5); in recv_timeout()
1005 drop(rx); in recv_timeout()
1045 let (_tx, mut rx) = mpsc::channel::<()>(10); in test_rx_is_closed_when_calling_close_with_sender()
1046 rx.close(); in test_rx_is_closed_when_calling_close_with_sender()
1048 assert!(rx.is_closed()); in test_rx_is_closed_when_calling_close_with_sender()
1054 let (tx, rx) = mpsc::channel::<()>(10); in test_rx_is_closed_when_dropping_all_senders()
1063 assert!(rx.is_closed()); in test_rx_is_closed_when_dropping_all_senders()
1069 let (_tx, rx) = mpsc::channel::<()>(10); in test_rx_is_not_closed_when_there_are_senders()
1070 assert!(!rx.is_closed()); in test_rx_is_not_closed_when_there_are_senders()
1076 let (tx, rx) = mpsc::channel(10); in test_rx_is_not_closed_when_there_are_senders_and_buffer_filled()
1080 assert!(!rx.is_closed()); in test_rx_is_not_closed_when_there_are_senders_and_buffer_filled()
1086 let (tx, rx) = mpsc::channel(10); in test_rx_is_closed_when_there_are_no_senders_and_there_are_messages()
1091 assert!(rx.is_closed()); in test_rx_is_closed_when_there_are_no_senders_and_there_are_messages()
1097 let (tx, mut rx) = mpsc::channel(10); in test_rx_is_closed_when_there_are_messages_and_close_is_called()
1101 rx.close(); in test_rx_is_closed_when_there_are_messages_and_close_is_called()
1102 assert!(rx.is_closed()); in test_rx_is_closed_when_there_are_messages_and_close_is_called()
1108 let (tx, rx) = mpsc::channel::<()>(10); in test_rx_is_not_closed_when_there_are_permits_but_not_senders()
1110 assert!(!rx.is_closed()); in test_rx_is_not_closed_when_there_are_permits_but_not_senders()
1115 let (_tx, rx) = mpsc::channel::<()>(10); in test_rx_is_empty_when_no_messages_were_sent()
1116 assert!(rx.is_empty()) in test_rx_is_empty_when_no_messages_were_sent()
1121 let (tx, rx) = mpsc::channel::<()>(10); in test_rx_is_not_empty_when_there_are_messages_in_the_buffer()
1123 assert!(!rx.is_empty()) in test_rx_is_not_empty_when_there_are_messages_in_the_buffer()
1128 let (tx, rx) = mpsc::channel(10); in test_rx_is_not_empty_when_the_buffer_is_full()
1132 assert!(!rx.is_empty()) in test_rx_is_not_empty_when_the_buffer_is_full()
1137 let (tx, mut rx) = mpsc::channel(10); in test_rx_is_not_empty_when_all_but_one_messages_are_consumed()
1143 assert!(rx.recv().await.is_some()); in test_rx_is_not_empty_when_all_but_one_messages_are_consumed()
1146 assert!(!rx.is_empty()) in test_rx_is_not_empty_when_all_but_one_messages_are_consumed()
1151 let (tx, mut rx) = mpsc::channel(10); in test_rx_is_empty_when_all_messages_are_consumed()
1155 while rx.try_recv().is_ok() {} in test_rx_is_empty_when_all_messages_are_consumed()
1156 assert!(rx.is_empty()) in test_rx_is_empty_when_all_messages_are_consumed()
1161 let (tx, mut rx) = mpsc::channel(10); in test_rx_is_empty_all_senders_are_dropped_and_messages_consumed()
1168 assert!(rx.recv().await.is_some()); in test_rx_is_empty_all_senders_are_dropped_and_messages_consumed()
1171 assert!(rx.is_empty()) in test_rx_is_empty_all_senders_are_dropped_and_messages_consumed()
1176 let (_tx, rx) = mpsc::channel::<()>(100); in test_rx_len_on_empty_channel()
1177 assert_eq!(rx.len(), 0); in test_rx_len_on_empty_channel()
1185 let (tx, rx) = mpsc::channel::<()>(100); in test_rx_len_on_empty_channel_without_senders()
1187 assert_eq!(rx.len(), 0); in test_rx_len_on_empty_channel_without_senders()
1192 let (tx, rx) = mpsc::channel(100); in test_rx_len_on_filled_channel()
1197 assert_eq!(rx.len(), 100); in test_rx_len_on_filled_channel()
1202 let (tx, rx) = mpsc::channel(100); in test_rx_len_on_filled_channel_without_senders()
1208 assert_eq!(rx.len(), 100); in test_rx_len_on_filled_channel_without_senders()
1213 let (tx, mut rx) = mpsc::channel(100); in test_rx_len_when_consuming_all_messages()
1217 assert_eq!(rx.len(), i + 1); in test_rx_len_when_consuming_all_messages()
1223 assert!(rx.recv().await.is_some()); in test_rx_len_when_consuming_all_messages()
1224 assert_eq!(rx.len(), i); in test_rx_len_when_consuming_all_messages()
1230 let (tx, mut rx) = mpsc::channel(100); in test_rx_len_when_close_is_called()
1232 rx.close(); in test_rx_len_when_close_is_called()
1234 assert_eq!(rx.len(), 1); in test_rx_len_when_close_is_called()
1239 let (tx, mut rx) = mpsc::channel(100); in test_rx_len_when_close_is_called_before_dropping_sender()
1241 rx.close(); in test_rx_len_when_close_is_called_before_dropping_sender()
1244 assert_eq!(rx.len(), 1); in test_rx_len_when_close_is_called_before_dropping_sender()
1249 let (tx, mut rx) = mpsc::channel(100); in test_rx_len_when_close_is_called_after_dropping_sender()
1252 rx.close(); in test_rx_len_when_close_is_called_after_dropping_sender()
1254 assert_eq!(rx.len(), 1); in test_rx_len_when_close_is_called_after_dropping_sender()
1260 let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); in test_rx_unbounded_is_closed_when_calling_close_with_sender()
1261 rx.close(); in test_rx_unbounded_is_closed_when_calling_close_with_sender()
1263 assert!(rx.is_closed()); in test_rx_unbounded_is_closed_when_calling_close_with_sender()
1269 let (tx, rx) = mpsc::unbounded_channel::<()>(); in test_rx_unbounded_is_closed_when_dropping_all_senders()
1278 assert!(rx.is_closed()); in test_rx_unbounded_is_closed_when_dropping_all_senders()
1284 let (_tx, rx) = mpsc::unbounded_channel::<()>(); in test_rx_unbounded_is_not_closed_when_there_are_senders()
1285 assert!(!rx.is_closed()); in test_rx_unbounded_is_not_closed_when_there_are_senders()
1291 let (tx, rx) = mpsc::unbounded_channel(); in test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages()
1296 assert!(rx.is_closed()); in test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages()
1302 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called()
1306 rx.close(); in test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called()
1307 assert!(rx.is_closed()); in test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called()
1312 let (_tx, rx) = mpsc::unbounded_channel::<()>(); in test_rx_unbounded_is_empty_when_no_messages_were_sent()
1313 assert!(rx.is_empty()) in test_rx_unbounded_is_empty_when_no_messages_were_sent()
1318 let (tx, rx) = mpsc::unbounded_channel(); in test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer()
1320 assert!(!rx.is_empty()) in test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer()
1325 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed()
1331 assert!(rx.recv().await.is_some()); in test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed()
1334 assert!(!rx.is_empty()) in test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed()
1339 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_is_empty_when_all_messages_are_consumed()
1343 while rx.try_recv().is_ok() {} in test_rx_unbounded_is_empty_when_all_messages_are_consumed()
1344 assert!(rx.is_empty()) in test_rx_unbounded_is_empty_when_all_messages_are_consumed()
1349 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed()
1356 assert!(rx.recv().await.is_some()); in test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed()
1359 assert!(rx.is_empty()) in test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed()
1364 let (_tx, rx) = mpsc::unbounded_channel::<()>(); in test_rx_unbounded_len_on_empty_channel()
1365 assert_eq!(rx.len(), 0); in test_rx_unbounded_len_on_empty_channel()
1373 let (tx, rx) = mpsc::unbounded_channel::<()>(); in test_rx_unbounded_len_on_empty_channel_without_senders()
1375 assert_eq!(rx.len(), 0); in test_rx_unbounded_len_on_empty_channel_without_senders()
1380 let (tx, rx) = mpsc::unbounded_channel(); in test_rx_unbounded_len_with_multiple_messages()
1385 assert_eq!(rx.len(), 100); in test_rx_unbounded_len_with_multiple_messages()
1390 let (tx, rx) = mpsc::unbounded_channel(); in test_rx_unbounded_len_with_multiple_messages_and_dropped_senders()
1396 assert_eq!(rx.len(), 100); in test_rx_unbounded_len_with_multiple_messages_and_dropped_senders()
1401 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_len_when_consuming_all_messages()
1405 assert_eq!(rx.len(), i + 1); in test_rx_unbounded_len_when_consuming_all_messages()
1411 assert!(rx.recv().await.is_some()); in test_rx_unbounded_len_when_consuming_all_messages()
1412 assert_eq!(rx.len(), i); in test_rx_unbounded_len_when_consuming_all_messages()
1418 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_len_when_close_is_called()
1420 rx.close(); in test_rx_unbounded_len_when_close_is_called()
1422 assert_eq!(rx.len(), 1); in test_rx_unbounded_len_when_close_is_called()
1427 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_len_when_close_is_called_before_dropping_sender()
1429 rx.close(); in test_rx_unbounded_len_when_close_is_called_before_dropping_sender()
1432 assert_eq!(rx.len(), 1); in test_rx_unbounded_len_when_close_is_called_before_dropping_sender()
1437 let (tx, mut rx) = mpsc::unbounded_channel(); in test_rx_unbounded_len_when_close_is_called_after_dropping_sender()
1440 rx.close(); in test_rx_unbounded_len_when_close_is_called_after_dropping_sender()
1442 assert_eq!(rx.len(), 1); in test_rx_unbounded_len_when_close_is_called_after_dropping_sender()