1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.jdk9
6
7 import kotlinx.coroutines.*
8 import java.util.concurrent.*
9 import org.reactivestreams.FlowAdapters
10 import kotlinx.coroutines.reactive.*
11
12 /**
13 * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
14 * the publisher has produced an error, throws the corresponding exception.
15 *
16 * This suspending function is cancellable.
17 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
18 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
19 *
20 * @throws NoSuchElementException if the publisher does not emit any value
21 */
awaitFirstnull22 public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T =
23 FlowAdapters.toPublisher(this).awaitFirst()
24
25 /**
26 * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
27 * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
28 * exception.
29 *
30 * This suspending function is cancellable.
31 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
32 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
33 */
34 public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T =
35 FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
36
37 /**
38 * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
39 * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
40 *
41 * This suspending function is cancellable.
42 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
43 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
44 */
45 public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? =
46 FlowAdapters.toPublisher(this).awaitFirstOrNull()
47
48 /**
49 * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
50 * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
51 * corresponding exception.
52 *
53 * This suspending function is cancellable.
54 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
55 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
56 */
57 public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
58 FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
59
60 /**
61 * Awaits the last value from the given publisher without blocking the thread and
62 * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
63 *
64 * This suspending function is cancellable.
65 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
66 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
67 *
68 * @throws NoSuchElementException if the publisher does not emit any value
69 */
70 public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
71 FlowAdapters.toPublisher(this).awaitLast()
72
73 /**
74 * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
75 * if this publisher has produced an error, throws the corresponding exception.
76 *
77 * This suspending function is cancellable.
78 * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
79 * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
80 *
81 * @throws NoSuchElementException if the publisher does not emit any value
82 * @throws IllegalArgumentException if the publisher emits more than one value
83 */
84 public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
85 FlowAdapters.toPublisher(this).awaitSingle()
86