• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.atomicfu.*
4 import kotlin.coroutines.*
5 
6 /**
7  * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
8  * when all deferred computations are complete or resumes with the first thrown exception if any of computations
9  * complete exceptionally including cancellation.
10  *
11  * This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when it sequentially
12  * gets to wait for the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
13  *
14  * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
15  * suspending function is waiting, this function immediately resumes with [CancellationException].
16  * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
17  * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
18  */
19 public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
20     if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
21 
22 /**
23  * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
24  * when all deferred computations are complete or resumes with the first thrown exception if any of computations
25  * complete exceptionally including cancellation.
26  *
27  * This function is **not** equivalent to `this.map { it.await() }` which fails only when it sequentially
28  * gets to wait for the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
29  *
30  * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
31  * suspending function is waiting, this function immediately resumes with [CancellationException].
32  * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
33  * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
34  */
35 public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
36     if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
37 
38 /**
39  * Suspends current coroutine until all given jobs are complete.
40  * This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
41  *
42  * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
43  * suspending function is waiting, this function immediately resumes with [CancellationException].
44  * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
45  * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
46  */
47 public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
48 
49 /**
50  * Suspends current coroutine until all given jobs are complete.
51  * This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
52  *
53  * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
54  * suspending function is waiting, this function immediately resumes with [CancellationException].
55  * There is a **prompt cancellation guarantee**: even if this function is ready to return the result, but was cancelled
56  * while suspended, [CancellationException] will be thrown. See [suspendCancellableCoroutine] for low-level details.
57  */
<lambda>null58 public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
59 
60 private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
61     private val notCompletedCount = atomic(deferreds.size)
62 
awaitnull63     suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
64         // Intricate dance here
65         // Step 1: Create nodes and install them as completion handlers, they may fire!
66         val nodes = Array(deferreds.size) { i ->
67             val deferred = deferreds[i]
68             deferred.start() // To properly await lazily started deferreds
69             AwaitAllNode(cont).apply {
70                 handle = deferred.invokeOnCompletion(handler = this)
71             }
72         }
73         val disposer = DisposeHandlersOnCancel(nodes)
74         // Step 2: Set disposer to each node
75         nodes.forEach { it.disposer = disposer }
76         // Here we know that if any code the nodes complete, it will dispose the rest
77         // Step 3: Now we can check if continuation is complete
78         if (cont.isCompleted) {
79             // it is already complete while handlers were being installed -- dispose them all
80             disposer.disposeAll()
81         } else {
82             cont.invokeOnCancellation(handler = disposer)
83         }
84     }
85 
86     private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler {
disposeAllnull87         fun disposeAll() {
88             nodes.forEach { it.handle.dispose() }
89         }
90 
invokenull91         override fun invoke(cause: Throwable?) { disposeAll() }
toStringnull92         override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
93     }
94 
95     private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>) : JobNode() {
96         lateinit var handle: DisposableHandle
97 
98         private val _disposer = atomic<DisposeHandlersOnCancel?>(null)
99         var disposer: DisposeHandlersOnCancel?
100             get() = _disposer.value
101             set(value) { _disposer.value = value }
102 
103         override val onCancelling get() = false
104 
105         override fun invoke(cause: Throwable?) {
106             if (cause != null) {
107                 val token = continuation.tryResumeWithException(cause)
108                 if (token != null) {
109                     continuation.completeResume(token)
110                     // volatile read of disposer AFTER continuation is complete
111                     // and if disposer was already set (all handlers where already installed, then dispose them all)
112                     disposer?.disposeAll()
113                 }
114             } else if (notCompletedCount.decrementAndGet() == 0) {
115                 continuation.resume(deferreds.map { it.getCompleted() })
116                 // Note that all deferreds are complete here, so we don't need to dispose their nodes
117             }
118         }
119     }
120 }
121