• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::mem::MaybeUninit;
6 use std::sync::atomic::AtomicUsize;
7 use std::sync::atomic::Ordering;
8 
9 use async_trait::async_trait;
10 use audio_streams::capture::AsyncCaptureBuffer;
11 use audio_streams::capture::AsyncCaptureBufferStream;
12 use audio_streams::AsyncBufferCommit;
13 use audio_streams::AsyncPlaybackBufferStream;
14 use audio_streams::AudioStreamsExecutor;
15 use audio_streams::BoxError;
16 use audio_streams::NoopStream;
17 use audio_streams::NoopStreamControl;
18 use audio_streams::SampleFormat;
19 use audio_streams::StreamControl;
20 use audio_streams::StreamSource;
21 use audio_streams::StreamSourceGenerator;
22 use base::error;
23 use base::warn;
24 use metrics::MetricEventType;
25 
26 use super::NoopBufferCommit;
27 use crate::intermediate_resampler_buffer::CaptureResamplerBuffer;
28 use crate::intermediate_resampler_buffer::PlaybackResamplerBuffer;
29 use crate::CaptureError;
30 use crate::CapturerStream;
31 use crate::DeviceCapturerWrapper;
32 use crate::DeviceRenderer;
33 use crate::DeviceRendererWrapper;
34 use crate::RenderError;
35 use crate::RendererStream;
36 use crate::WinAudio;
37 use crate::WinAudioCapturer;
38 use crate::WinAudioError;
39 use crate::WinAudioRenderer;
40 
41 // These global values are used to prevent metrics upload spam.
42 const ERROR_METRICS_LOG_LIMIT: usize = 5;
43 static INIT_ERRORS_LOGGED_COUNT: AtomicUsize = AtomicUsize::new(0);
44 static PLAYBACK_ERRORS_LOGGED_COUNT: AtomicUsize = AtomicUsize::new(0);
45 
46 pub struct WinAudioStreamSourceGenerator {}
47 
48 impl StreamSourceGenerator for WinAudioStreamSourceGenerator {
generate(&self) -> std::result::Result<Box<dyn StreamSource>, BoxError>49     fn generate(&self) -> std::result::Result<Box<dyn StreamSource>, BoxError> {
50         Ok(Box::new(WinAudio::new()?))
51     }
52 }
53 
54 impl WinAudio {
new_async_playback_stream_helper( num_channels: usize, format: SampleFormat, frame_rate: u32, buffer_size: usize, ex: &dyn audio_streams::AudioStreamsExecutor, ) -> Result< ( Box<dyn StreamControl>, Box<dyn audio_streams::AsyncPlaybackBufferStream>, ), BoxError, >55     pub(super) fn new_async_playback_stream_helper(
56         num_channels: usize,
57         format: SampleFormat,
58         frame_rate: u32,
59         buffer_size: usize,
60         ex: &dyn audio_streams::AudioStreamsExecutor,
61     ) -> Result<
62         (
63             Box<dyn StreamControl>,
64             Box<dyn audio_streams::AsyncPlaybackBufferStream>,
65         ),
66         BoxError,
67     > {
68         let hr = WinAudio::co_init_once_per_thread();
69         let _ = check_hresult!(hr, WinAudioError::from(hr), "Co Initialized failed");
70 
71         let playback_buffer_stream: Box<dyn AsyncPlaybackBufferStream> =
72             match WinAudioRenderer::new_async(num_channels, format, frame_rate, buffer_size, ex) {
73                 Ok(renderer) => Box::new(renderer),
74                 Err(e) => {
75                     warn!(
76                         "Failed to create WinAudioRenderer. Fallback to NoopStream with error: {}",
77                         e
78                     );
79                     Box::new(NoopStream::new(
80                         num_channels,
81                         SampleFormat::S16LE,
82                         frame_rate,
83                         buffer_size,
84                     ))
85                 }
86             };
87 
88         Ok((Box::new(NoopStreamControl::new()), playback_buffer_stream))
89     }
90 }
91 
92 impl WinAudioRenderer {
93     /// Constructor to allow for async audio backend.
new_async( num_channels: usize, guest_bit_depth: SampleFormat, frame_rate: u32, incoming_buffer_size_in_frames: usize, ex: &dyn audio_streams::AudioStreamsExecutor, ) -> Result<Self, RenderError>94     pub fn new_async(
95         num_channels: usize,
96         guest_bit_depth: SampleFormat,
97         frame_rate: u32,
98         incoming_buffer_size_in_frames: usize,
99         ex: &dyn audio_streams::AudioStreamsExecutor,
100     ) -> Result<Self, RenderError> {
101         let device = DeviceRendererWrapper::new(
102             num_channels,
103             guest_bit_depth,
104             frame_rate,
105             incoming_buffer_size_in_frames,
106             Some(ex),
107         )
108         .map_err(|e| {
109             match &e {
110                 RenderError::WinAudioError(win_audio_error) => {
111                     log_init_error_with_limit(win_audio_error.into());
112                 }
113                 _ => {
114                     log_init_error_with_limit((&WinAudioError::Unknown).into());
115                     error!(
116                         "Unhandled NoopStream forced error. These errors should not have been \
117                      returned: {}",
118                         e
119                     );
120                 }
121             }
122             e
123         })?;
124 
125         Ok(Self { device })
126     }
127 
unregister_notification_client_and_make_new_device_renderer( &mut self, ex: &dyn audio_streams::AudioStreamsExecutor, ) -> Result<(), BoxError>128     fn unregister_notification_client_and_make_new_device_renderer(
129         &mut self,
130         ex: &dyn audio_streams::AudioStreamsExecutor,
131     ) -> Result<(), BoxError> {
132         base::info!("Device found. Will attempt to make a DeviceRenderer");
133         let device_renderer = DeviceRendererWrapper::create_device_renderer_and_log_time(
134             self.device.num_channels,
135             self.device.guest_frame_rate,
136             self.device.incoming_buffer_size_in_frames,
137             Some(ex),
138         )
139         .map_err(|e| {
140             match &e {
141                 RenderError::WinAudioError(win_audio_error) => {
142                     log_playback_error_with_limit(win_audio_error.into())
143                 }
144                 _ => log_playback_error_with_limit((&WinAudioError::Unknown).into()),
145             }
146             Box::new(e)
147         })?;
148 
149         let audio_shared_format = device_renderer.audio_shared_format;
150 
151         let playback_resampler_buffer = PlaybackResamplerBuffer::new(
152             self.device.guest_frame_rate as usize,
153             audio_shared_format.frame_rate,
154             self.device.incoming_buffer_size_in_frames,
155             audio_shared_format.shared_audio_engine_period_in_frames,
156             audio_shared_format.channels,
157             audio_shared_format.channel_mask,
158         )
159         .expect("Failed to create PlaybackResamplerBuffer");
160 
161         self.device.renderer_stream =
162             RendererStream::Device((device_renderer, playback_resampler_buffer));
163 
164         Ok(())
165     }
166 }
167 
168 /// Attach `descriptor` to the event code `AudioNoopStreamForced` and upload to clearcut.
169 ///
170 /// This method will stop uploading after `ERRO_METRICS_LOG_LIMIT` uploads in order to prevent
171 /// metrics upload spam.
log_init_error_with_limit(descriptor: i64)172 pub(crate) fn log_init_error_with_limit(descriptor: i64) {
173     if INIT_ERRORS_LOGGED_COUNT.load(Ordering::SeqCst) <= ERROR_METRICS_LOG_LIMIT {
174         metrics::log_descriptor(MetricEventType::AudioNoopStreamForced, descriptor);
175         INIT_ERRORS_LOGGED_COUNT.fetch_add(1, Ordering::SeqCst);
176     }
177 }
178 
179 #[async_trait(?Send)]
180 impl AsyncPlaybackBufferStream for WinAudioRenderer {
next_playback_buffer<'a>( &'a mut self, ex: &dyn audio_streams::AudioStreamsExecutor, ) -> Result<audio_streams::AsyncPlaybackBuffer<'a>, BoxError>181     async fn next_playback_buffer<'a>(
182         &'a mut self,
183         ex: &dyn audio_streams::AudioStreamsExecutor,
184     ) -> Result<audio_streams::AsyncPlaybackBuffer<'a>, BoxError> {
185         // Check to see if a new device is available, if so, create a new `DeviceRenderer`.
186         if let RendererStream::Noop(noop_renderer) = &self.device.renderer_stream {
187             if noop_renderer
188                 .is_device_available
189                 .fetch_and(false, Ordering::SeqCst)
190             {
191                 match self.unregister_notification_client_and_make_new_device_renderer(ex) {
192                     Ok(()) => {}
193                     Err(e) => warn!(
194                         "Making a new DeviceRenderer failed in the middle of playback. \
195                             Will continue using NoopStream and listening for new devices: {}",
196                         e
197                     ),
198                 };
199             }
200         }
201 
202         if let RendererStream::Device((device_renderer, _)) = &mut self.device.renderer_stream {
203             if device_renderer.should_get_next_win_buffer {
204                 if let Err(e) = device_renderer.async_next_win_buffer().await {
205                     Self::handle_playback_logging_on_error(&e);
206                     // At this point, the `DeviceRenderer` doesn't exist, so we assume that
207                     // there were no available audio devices.
208                     base::info!(
209                         "async_next_win_buffer failed. Starting NoopStream and start \
210                         listening for a new default device"
211                     );
212                     self.device.renderer_stream =
213                         DeviceRendererWrapper::create_noop_stream_with_device_notification(
214                             self.device.num_channels,
215                             self.device.guest_frame_rate,
216                             self.device.incoming_buffer_size_in_frames,
217                         )
218                         .map_err(|e| {
219                             match &e {
220                                 RenderError::WinAudioError(win_audio_error) => {
221                                     log_playback_error_with_limit(win_audio_error.into())
222                                 }
223                                 _ => {
224                                     log_playback_error_with_limit((&WinAudioError::Unknown).into())
225                                 }
226                             }
227                             e
228                         })?;
229                 }
230             }
231         }
232 
233         if let RendererStream::Noop(noop_renderer) = &mut self.device.renderer_stream {
234             // This will trigger the sleep so that virtio sound doesn't write to win_audio too
235             // quickly, which will cause underruns. No audio samples will actually be written to
236             // this buffer, but it doesn't matter becuase those samples are meant to be dropped
237             // anyways.
238             AsyncPlaybackBufferStream::next_playback_buffer(&mut noop_renderer.noop_stream, ex)
239                 .await?;
240         }
241 
242         self.device
243             .get_intermediate_async_buffer()
244             .map_err(|e| Box::new(e) as _)
245     }
246 }
247 
248 #[async_trait(?Send)]
249 impl AsyncBufferCommit for DeviceRendererWrapper {
commit(&mut self, nframes: usize)250     async fn commit(&mut self, nframes: usize) {
251         if nframes != self.incoming_buffer_size_in_frames {
252             warn!(
253                 "AsyncBufferCommit commited {} frames, instead of a full period of {}",
254                 nframes, self.incoming_buffer_size_in_frames
255             );
256         }
257 
258         match &mut self.renderer_stream {
259             RendererStream::Device((device_renderer, playback_resampler_buffer)) => {
260                 // `intermediate_buffer` will contain audio samples from CrosVm's emulated audio
261                 // device (ie. Virtio Sound). First, we will add the audio samples to the resampler
262                 // buffer.
263                 playback_resampler_buffer.convert_and_add(self.intermediate_buffer.as_slice());
264 
265                 if playback_resampler_buffer.is_priming {
266                     if device_renderer.win_buffer.is_null() {
267                         error!("AsyncBufferCommit: win_buffer is null");
268                         return;
269                     }
270 
271                     let format = device_renderer.audio_shared_format;
272                     let shared_audio_engine_period_bytes =
273                         format.get_shared_audio_engine_period_in_bytes();
274                     Self::write_slice_to_wasapi_buffer_and_release_buffer(
275                         device_renderer,
276                         &vec![0; shared_audio_engine_period_bytes],
277                     );
278 
279                     // WASAPI's `GetBuffer` should be called next because we either wrote to the
280                     // Windows endpoint buffer or the audio samples were dropped.
281                     device_renderer.should_get_next_win_buffer = true;
282                     return;
283                 }
284 
285                 if let Some(next_period) = playback_resampler_buffer.get_next_period() {
286                     if device_renderer.win_buffer.is_null() {
287                         error!("AsyncBufferCommit: win_buffer is null");
288                         return;
289                     }
290                     Self::write_slice_to_wasapi_buffer_and_release_buffer(
291                         device_renderer,
292                         next_period,
293                     );
294                     device_renderer.should_get_next_win_buffer = true;
295                 } else {
296                     // Don't call WASAPI's `GetBuffer` because the resampler didn't have enough
297                     // audio samples write a full period in the Windows endpoint buffer.
298                     device_renderer.should_get_next_win_buffer = false;
299                 }
300             }
301             // For the `Noop` case, we can just drop the incoming audio samples.
302             RendererStream::Noop(_) => {}
303         }
304     }
305 }
306 
307 impl DeviceRendererWrapper {
write_slice_to_wasapi_buffer_and_release_buffer( device_renderer: &DeviceRenderer, slice_to_write: &[u8], )308     fn write_slice_to_wasapi_buffer_and_release_buffer(
309         device_renderer: &DeviceRenderer,
310         slice_to_write: &[u8],
311     ) {
312         let format = device_renderer.audio_shared_format;
313         let shared_audio_engine_period_bytes = format.get_shared_audio_engine_period_in_bytes();
314 
315         // SAFETY: win_buffer is a valid pointer to shared_audio_engine_period_bytes of data
316         let win_buffer_slice = unsafe {
317             std::slice::from_raw_parts_mut(
318                 device_renderer.win_buffer,
319                 shared_audio_engine_period_bytes,
320             )
321         };
322 
323         win_buffer_slice.copy_from_slice(slice_to_write);
324         // SAFETY: We own the buffer
325         unsafe {
326             let hr = device_renderer
327                 .audio_render_client
328                 .ReleaseBuffer(format.shared_audio_engine_period_in_frames as u32, 0);
329             if let Err(e) = check_hresult!(
330                 hr,
331                 WinAudioError::ReleaseBufferError(hr),
332                 "Audio Render Client ReleaseBuffer() failed"
333             ) {
334                 log_playback_error_with_limit((&e).into());
335             }
336         }
337     }
338 }
339 
log_playback_error_with_limit(descriptor: i64)340 pub(crate) fn log_playback_error_with_limit(descriptor: i64) {
341     if PLAYBACK_ERRORS_LOGGED_COUNT.load(Ordering::SeqCst) <= ERROR_METRICS_LOG_LIMIT {
342         metrics::log_descriptor(MetricEventType::AudioPlaybackError, descriptor);
343         PLAYBACK_ERRORS_LOGGED_COUNT.fetch_add(1, Ordering::SeqCst);
344     }
345 }
346 
347 impl DeviceRenderer {
348     /// Similiar to `next_win_buffer`, this is the async version that will return a wrapper
349     /// the WASAPI buffer.
350     ///
351     /// Unlike `next_win_buffer`, there is no timeout if `async_ready_to_read_event` doesn't fire.
352     /// This should be fine, since the end result with or without the timeout will be no audio.
async_next_win_buffer(&mut self) -> Result<(), RenderError>353     async fn async_next_win_buffer(&mut self) -> Result<(), RenderError> {
354         self.win_buffer = MaybeUninit::uninit().as_mut_ptr();
355 
356         // We will wait for windows to tell us when it is ready to take in the next set of
357         // audio samples from the guest
358         loop {
359             let async_ready_to_read_event = self
360                 .async_ready_to_read_event
361                 .as_ref()
362                 .ok_or(RenderError::WinAudioError(WinAudioError::MissingEventAsync))?;
363             async_ready_to_read_event.wait().await.map_err(|e| {
364                 RenderError::WinAudioError(WinAudioError::AsyncError(
365                     e,
366                     "Failed to wait for async event to get next playback buffer.".to_string(),
367                 ))
368             })?;
369 
370             if self.enough_available_frames()? {
371                 break;
372             }
373         }
374 
375         self.get_buffer()?;
376 
377         Ok(())
378     }
379 }
380 
381 impl WinAudioCapturer {
new_async( num_channels: usize, guest_bit_depth: SampleFormat, frame_rate: u32, outgoing_buffer_size_in_frames: usize, ex: &dyn audio_streams::AudioStreamsExecutor, ) -> Result<Self, CaptureError>382     pub fn new_async(
383         num_channels: usize,
384         guest_bit_depth: SampleFormat,
385         frame_rate: u32,
386         outgoing_buffer_size_in_frames: usize,
387         ex: &dyn audio_streams::AudioStreamsExecutor,
388     ) -> Result<Self, CaptureError> {
389         let device = DeviceCapturerWrapper::new(
390             num_channels,
391             guest_bit_depth,
392             frame_rate,
393             outgoing_buffer_size_in_frames,
394             Some(ex),
395         )?;
396 
397         Ok(Self { device })
398     }
399 
unregister_notification_client_and_make_new_device_capturer( &mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<(), BoxError>400     fn unregister_notification_client_and_make_new_device_capturer(
401         &mut self,
402         ex: &dyn AudioStreamsExecutor,
403     ) -> Result<(), BoxError> {
404         let device_capturer = DeviceCapturerWrapper::create_device_capturer_and_log_time(
405             self.device.num_channels,
406             self.device.guest_frame_rate,
407             self.device.outgoing_buffer_size_in_frames,
408             Some(ex),
409         )
410         .map_err(Box::new)?;
411 
412         let audio_shared_format = device_capturer.audio_shared_format;
413 
414         let capture_resampler_buffer = CaptureResamplerBuffer::new_input_resampler(
415             audio_shared_format.frame_rate,
416             self.device.guest_frame_rate as usize,
417             self.device.outgoing_buffer_size_in_frames,
418             audio_shared_format.channels,
419             audio_shared_format.channel_mask,
420         )
421         .expect("Failed to create CaptureResamplerBuffer.");
422 
423         self.device.capturer_stream =
424             CapturerStream::Device((device_capturer, capture_resampler_buffer, NoopBufferCommit));
425 
426         Ok(())
427     }
428 }
429 
430 #[async_trait(?Send)]
431 impl AsyncCaptureBufferStream for WinAudioCapturer {
next_capture_buffer<'a>( &'a mut self, ex: &dyn AudioStreamsExecutor, ) -> Result<AsyncCaptureBuffer<'a>, BoxError>432     async fn next_capture_buffer<'a>(
433         &'a mut self,
434         ex: &dyn AudioStreamsExecutor,
435     ) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
436         // In the `Noop` state, check to see if there is a new device connected. If so, create a
437         // `DeviceCapturer`.
438         if let CapturerStream::Noop(noop_capturer) = &self.device.capturer_stream {
439             if noop_capturer
440                 .is_device_available
441                 .fetch_and(false, Ordering::SeqCst)
442             {
443                 match self.unregister_notification_client_and_make_new_device_capturer(ex) {
444                     Ok(()) => {}
445                     Err(e) => warn!(
446                         "Making a new DeviceCapturer failed in the middle of capture \
447                     Will continue using NoopCaptureStream and listening for new devices: {}",
448                         e
449                     ),
450                 }
451             }
452         }
453 
454         // Try to drain bytes from the Windows buffer into the capture resample buffer, which acts
455         // as a sink. If any part fails, the `Noop` state is set.
456         if let CapturerStream::Device((device_capturer, capture_resampler_buffer, _)) =
457             &mut self.device.capturer_stream
458         {
459             match DeviceCapturerWrapper::drain_until_bytes_avaialable(
460                 device_capturer,
461                 capture_resampler_buffer,
462                 self.device.outgoing_buffer_size_in_frames,
463             )
464             .await
465             {
466                 Ok(()) => {}
467                 Err(e) => {
468                     warn!(
469                         "Making a new DeviceCapturer failed in the middle of capture. \
470                         Will continue using NoopStream and listening for new devices: {}",
471                         e
472                     );
473                     self.device.capturer_stream =
474                         DeviceCapturerWrapper::create_noop_capture_stream_with_device_notification(
475                             self.device.num_channels,
476                             self.device.guest_bit_depth,
477                             self.device.guest_frame_rate,
478                             self.device.outgoing_buffer_size_in_frames,
479                         )
480                         .map_err(Box::new)?;
481                 }
482             };
483         }
484 
485         // Return the buffer to be written to shared memory.
486         match &mut self.device.capturer_stream {
487             CapturerStream::Device((_, capture_resampler_buffer, noop_buffer_commit)) => {
488                 DeviceCapturerWrapper::get_async_capture_buffer(
489                     capture_resampler_buffer,
490                     noop_buffer_commit,
491                 )
492                 .map_err(|e| Box::new(e) as _)
493             }
494             CapturerStream::Noop(noop_capturer) => {
495                 AsyncCaptureBufferStream::next_capture_buffer(
496                     &mut noop_capturer.noop_capture_stream,
497                     ex,
498                 )
499                 .await
500             }
501         }
502     }
503 }
504 
505 #[cfg(test)]
506 mod tests {
507     use cros_async::Executor;
508 
509     use super::*;
510     use crate::WinStreamSourceGenerator;
511 
512     // This test is meant to run through the normal audio playback procedure in order to make
513     // debugging easier. The test needs to be ran with a playback device format of 48KHz,
514     // stereo, 16bit. This test might not pass on AMD, since its period is 513 instead of 480.
515     #[ignore]
516     #[test]
test_async()517     fn test_async() {
518         async fn test(ex: &Executor) {
519             let stream_source_generator: Box<dyn StreamSourceGenerator> =
520                 Box::new(WinAudioStreamSourceGenerator {});
521             let mut stream_source = stream_source_generator
522                 .generate()
523                 .expect("Failed to create stream source.");
524 
525             let (_, mut async_pb_stream) = stream_source
526                 .new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
527                 .expect("Failed to create async playback stream.");
528 
529             let mut async_pb_buffer = async_pb_stream
530                 .next_playback_buffer(ex)
531                 .await
532                 .expect("Failed to get next playback buffer");
533 
534             // The buffer size is calculated by "period * channels * bit depth". The actual buffer
535             // from `next_playback_buffer` may vary, depending on the device format and the user's
536             // machine.
537             let buffer = [1u8; 480 * 2 * 2];
538 
539             async_pb_buffer
540                 .copy_cb(buffer.len(), |out| out.copy_from_slice(&buffer))
541                 .unwrap();
542 
543             async_pb_buffer.commit().await;
544 
545             let mut async_pb_buffer = async_pb_stream
546                 .next_playback_buffer(ex)
547                 .await
548                 .expect("Failed to get next playback buffer");
549 
550             let buffer = [1u8; 480 * 2 * 2];
551 
552             async_pb_buffer
553                 .copy_cb(buffer.len(), |out| out.copy_from_slice(&buffer))
554                 .expect("Failed to copy samples from buffer to win_buffer");
555 
556             async_pb_buffer.commit().await;
557         }
558 
559         let ex = Executor::new().expect("Failed to create executor.");
560 
561         ex.run_until(test(&ex)).unwrap();
562     }
563 
564     // This test is meant to run through the normal audio capture procedure in order to make
565     // debugging easier.
566     #[ignore]
567     #[test]
test_async_capture()568     fn test_async_capture() {
569         async fn test(ex: &Executor) {
570             let stream_source_generator: Box<dyn WinStreamSourceGenerator> =
571                 Box::new(WinAudioStreamSourceGenerator {});
572             let mut stream_source = stream_source_generator
573                 .generate()
574                 .expect("Failed to create stream source.");
575 
576             let (mut async_cp_stream, _shared_format) = stream_source
577                 .new_async_capture_stream_and_get_shared_format(
578                     2,
579                     SampleFormat::S16LE,
580                     48000,
581                     480,
582                     ex,
583                 )
584                 .expect("Failed to create async capture stream.");
585 
586             let mut async_cp_buffer = async_cp_stream
587                 .next_capture_buffer(ex)
588                 .await
589                 .expect("Failed to get next capture buffer");
590 
591             // Capacity of 480 frames, where there are 2 channels and 2 bytes per sample.
592             let mut buffer_to_send_to_guest = Vec::with_capacity(480 * 2 * 2);
593 
594             async_cp_buffer
595                 .copy_cb(buffer_to_send_to_guest.len(), |win_buffer| {
596                     buffer_to_send_to_guest.copy_from_slice(win_buffer);
597                 })
598                 .expect("Failed to copy samples from win_buffer to buffer");
599         }
600 
601         let ex = Executor::new().expect("Failed to create executor.");
602 
603         ex.run_until(test(&ex)).unwrap();
604     }
605 }
606