1 package kotlinx.coroutines.jdk9
2
3 import kotlinx.coroutines.*
4 import java.util.concurrent.*
5 import org.reactivestreams.FlowAdapters
6 import kotlinx.coroutines.reactive.*
7
8 /**
9 * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
10 * the publisher has produced an error, throws the corresponding exception.
11 *
12 * This suspending function is cancellable.
13 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
14 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
15 *
16 * @throws NoSuchElementException if the publisher does not emit any value
17 */
awaitFirstnull18 public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T =
19 FlowAdapters.toPublisher(this).awaitFirst()
20
21 /**
22 * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
23 * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
24 * exception.
25 *
26 * This suspending function is cancellable.
27 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
28 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
29 */
30 public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T =
31 FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
32
33 /**
34 * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
35 * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
36 *
37 * This suspending function is cancellable.
38 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
39 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
40 */
41 public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? =
42 FlowAdapters.toPublisher(this).awaitFirstOrNull()
43
44 /**
45 * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
46 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
47 * corresponding exception.
48 *
49 * This suspending function is cancellable.
50 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
51 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
52 */
53 public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
54 FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
55
56 /**
57 * Awaits the last value from the given publisher without blocking the thread and
58 * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
59 *
60 * This suspending function is cancellable.
61 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
62 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
63 *
64 * @throws NoSuchElementException if the publisher does not emit any value
65 */
66 public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
67 FlowAdapters.toPublisher(this).awaitLast()
68
69 /**
70 * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
71 * if this publisher has produced an error, throws the corresponding exception.
72 *
73 * This suspending function is cancellable.
74 * If the [Job] of the current coroutine is cancelled while the suspending function is waiting, this
75 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
76 *
77 * @throws NoSuchElementException if the publisher does not emit any value
78 * @throws IllegalArgumentException if the publisher emits more than one value
79 */
80 public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
81 FlowAdapters.toPublisher(this).awaitSingle()
82