• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlin.coroutines.*
9 
10 /**
11  * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
12  * when all deferred computations are complete or resumes with the first thrown exception if any of computations
13  * complete exceptionally including cancellation.
14  *
15  * This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when it sequentially
16  * gets to wait for the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
17  *
18  * This suspending function is cancellable.
19  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
20  * this function immediately resumes with [CancellationException].
21  * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
22  * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
23  */
24 public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
25     if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
26 
27 /**
28  * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
29  * when all deferred computations are complete or resumes with the first thrown exception if any of computations
30  * complete exceptionally including cancellation.
31  *
32  * This function is **not** equivalent to `this.map { it.await() }` which fails only when it sequentially
33  * gets to wait for the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
34  *
35  * This suspending function is cancellable.
36  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
37  * this function immediately resumes with [CancellationException].
38  * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
39  * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
40  */
41 public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
42     if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
43 
44 /**
45  * Suspends current coroutine until all given jobs are complete.
46  * This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
47  *
48  * This suspending function is cancellable.
49  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
50  * this function immediately resumes with [CancellationException].
51  * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
52  * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
53  */
54 public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
55 
56 /**
57  * Suspends current coroutine until all given jobs are complete.
58  * This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
59  *
60  * This suspending function is cancellable.
61  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
62  * this function immediately resumes with [CancellationException].
63  * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
64  * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
65  */
<lambda>null66 public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
67 
68 private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
69     private val notCompletedCount = atomic(deferreds.size)
70 
awaitnull71     suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
72         // Intricate dance here
73         // Step 1: Create nodes and install them as completion handlers, they may fire!
74         val nodes = Array(deferreds.size) { i ->
75             val deferred = deferreds[i]
76             deferred.start() // To properly await lazily started deferreds
77             AwaitAllNode(cont).apply {
78                 handle = deferred.invokeOnCompletion(asHandler)
79             }
80         }
81         val disposer = DisposeHandlersOnCancel(nodes)
82         // Step 2: Set disposer to each node
83         nodes.forEach { it.disposer = disposer }
84         // Here we know that if any code the nodes complete, it will dispose the rest
85         // Step 3: Now we can check if continuation is complete
86         if (cont.isCompleted) {
87             // it is already complete while handlers were being installed -- dispose them all
88             disposer.disposeAll()
89         } else {
90             cont.invokeOnCancellation(handler = disposer.asHandler)
91         }
92     }
93 
94     private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler() {
disposeAllnull95         fun disposeAll() {
96             nodes.forEach { it.handle.dispose() }
97         }
98 
invokenull99         override fun invoke(cause: Throwable?) { disposeAll() }
toStringnull100         override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
101     }
102 
103     private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>) : JobNode() {
104         lateinit var handle: DisposableHandle
105 
106         private val _disposer = atomic<DisposeHandlersOnCancel?>(null)
107         var disposer: DisposeHandlersOnCancel?
108             get() = _disposer.value
109             set(value) { _disposer.value = value }
110 
111         override fun invoke(cause: Throwable?) {
112             if (cause != null) {
113                 val token = continuation.tryResumeWithException(cause)
114                 if (token != null) {
115                     continuation.completeResume(token)
116                     // volatile read of disposer AFTER continuation is complete
117                     // and if disposer was already set (all handlers where already installed, then dispose them all)
118                     disposer?.disposeAll()
119                 }
120             } else if (notCompletedCount.decrementAndGet() == 0) {
121                 continuation.resume(deferreds.map { it.getCompleted() })
122                 // Note that all deferreds are complete here, so we don't need to dispose their nodes
123             }
124         }
125     }
126 }
127