• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlin.coroutines.*
9 import kotlin.jvm.*
10 
11 /**
12  * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
13  * when all deferred computations are complete or resumes with the first thrown exception if any of computations
14  * complete exceptionally including cancellation.
15  *
16  * This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when it sequentially
17  * gets to wait for the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
18  *
19  * This suspending function is cancellable.
20  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
21  * this function immediately resumes with [CancellationException].
22  */
23 public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
24     if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
25 
26 /**
27  * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
28  * when all deferred computations are complete or resumes with the first thrown exception if any of computations
29  * complete exceptionally including cancellation.
30  *
31  * This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially
32  * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
33  *
34  * This suspending function is cancellable.
35  * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
36  * this function immediately resumes with [CancellationException].
37  */
38 public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
39     if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
40 
41 /**
42  * Suspends current coroutine until all given jobs are complete.
43  * This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
44  *
45  * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
46  * this function immediately resumes with [CancellationException].
47  */
48 public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
49 
50 /**
51  * Suspends current coroutine until all given jobs are complete.
52  * This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
53  *
54  * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
55  * this function immediately resumes with [CancellationException].
56  */
<lambda>null57 public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
58 
59 private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
60     private val notCompletedCount = atomic(deferreds.size)
61 
awaitnull62     suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
63         // Intricate dance here
64         // Step 1: Create nodes and install them as completion handlers, they may fire!
65         val nodes = Array<AwaitAllNode>(deferreds.size) { i ->
66             val deferred = deferreds[i]
67             deferred.start() // To properly await lazily started deferreds
68             AwaitAllNode(cont, deferred).apply {
69                 handle = deferred.invokeOnCompletion(asHandler)
70             }
71         }
72         val disposer = DisposeHandlersOnCancel(nodes)
73         // Step 2: Set disposer to each node
74         nodes.forEach { it.disposer = disposer }
75         // Here we know that if any code the nodes complete, it will dipsose the rest
76         // Step 3: Now we can check if continuation is complete
77         if (cont.isCompleted) {
78             // it is already complete while handlers were being installed -- dispose them all
79             disposer.disposeAll()
80         } else {
81             cont.invokeOnCancellation(handler = disposer.asHandler)
82         }
83     }
84 
85     private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler() {
disposeAllnull86         fun disposeAll() {
87             nodes.forEach { it.handle.dispose() }
88         }
89 
invokenull90         override fun invoke(cause: Throwable?) { disposeAll() }
toStringnull91         override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
92     }
93 
94     private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
95         lateinit var handle: DisposableHandle
96 
97         @Volatile
98         var disposer: DisposeHandlersOnCancel? = null
99 
100         override fun invoke(cause: Throwable?) {
101             if (cause != null) {
102                 val token = continuation.tryResumeWithException(cause)
103                 if (token != null) {
104                     continuation.completeResume(token)
105                     // volatile read of disposer AFTER continuation is complete
106                     val disposer = this.disposer
107                     // and if disposer was already set (all handlers where already installed, then dispose them all)
108                     if (disposer != null) disposer.disposeAll()
109                 }
110             } else if (notCompletedCount.decrementAndGet() == 0) {
111                 continuation.resume(deferreds.map { it.getCompleted() })
112                 // Note that all deferreds are complete here, so we don't need to dispose their nodes
113             }
114         }
115     }
116 }
117