• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
5 
6 package kotlinx.coroutines.flow.internal
7 
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.internal.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14 
15 private typealias Update = IndexedValue<Any?>
16 
17 @PublishedApi
18 internal suspend fun <R, T> FlowCollector<R>.combineInternal(
19     flows: Array<out Flow<T>>,
20     arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
21     transform: suspend FlowCollector<R>.(Array<T>) -> Unit
22 ): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
23     val size = flows.size
24     if (size == 0) return@flowScope // bail-out for empty input
25     val latestValues = arrayOfNulls<Any?>(size)
26     latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED }
27     val resultChannel = Channel<Update>(size)
28     val nonClosed = LocalAtomicInt(size)
29     var remainingAbsentValues = size
30     for (i in 0 until size) {
31         // Coroutine per flow that keeps track of its value and sends result to downstream
32         launch {
33             try {
34                 flows[i].collect { value ->
35                     resultChannel.send(Update(i, value))
36                     yield() // Emulate fairness, giving each flow chance to emit
37                 }
38             } finally {
39                 // Close the channel when there is no more flows
40                 if (nonClosed.decrementAndGet() == 0) {
41                     resultChannel.close()
42                 }
43             }
44         }
45     }
46 
47     /*
48      * Batch-receive optimization: read updates in batches, but bail-out
49      * as soon as we encountered two values from the same source
50      */
51     val lastReceivedEpoch = ByteArray(size)
52     var currentEpoch: Byte = 0
53     while (true) {
54         ++currentEpoch
55         // Start batch
56         // The very first receive in epoch should be suspending
57         var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here
58         while (true) {
59             val index = element.index
60             // Update values
61             val previous = latestValues[index]
62             latestValues[index] = element.value
63             if (previous === UNINITIALIZED) --remainingAbsentValues
64             // Check epoch
65             // Received the second value from the same flow in the same epoch -- bail out
66             if (lastReceivedEpoch[index] == currentEpoch) break
67             lastReceivedEpoch[index] = currentEpoch
68             element = resultChannel.poll() ?: break
69         }
70 
71         // Process batch result if there is enough data
72         if (remainingAbsentValues == 0) {
73             /*
74              * If arrayFactory returns null, then we can avoid array copy because
75              * it's our own safe transformer that immediately deconstructs the array
76              */
77             val results = arrayFactory()
78             if (results == null) {
79                 transform(latestValues as Array<T>)
80             } else {
81                 (latestValues as Array<T?>).copyInto(results)
82                 transform(results as Array<T>)
83             }
84         }
85     }
86 }
87 
zipImplnull88 internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
89     unsafeFlow {
90         coroutineScope {
91             val second = produce<Any> {
92                 flow2.collect { value ->
93                     return@collect channel.send(value ?: NULL)
94                 }
95             }
96 
97             /*
98              * This approach only works with rendezvous channel and is required to enforce correctness
99              * in the following scenario:
100              * ```
101              * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
102              * val f2 = flowOf(1)
103              * f1.zip(f2) { ... }
104              * ```
105              *
106              * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
107              */
108             val collectJob = Job()
109             (second as SendChannel<*>).invokeOnClose {
110                 // Optimization to avoid AFE allocation when the other flow is done
111                 if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
112             }
113 
114             try {
115                 /*
116                  * Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
117                  * hierarchy:
118                  * -Outer coroutineScope that owns the whole zip process
119                  * - First flow is collected by the child of coroutineScope, collectJob.
120                  *    So it can be safely cancelled as soon as the second flow is done
121                  * - **But** the downstream MUST NOT be cancelled when the second flow is done,
122                  *    so we emit to downstream from coroutineScope job.
123                  * Typically, such hierarchy requires coroutine for collector that communicates
124                  * with coroutines scope via a channel, but it's way too expensive, so
125                  * we are using this trick instead.
126                  */
127                 val scopeContext = coroutineContext
128                 val cnt = threadContextElements(scopeContext)
129                 withContextUndispatched(coroutineContext + collectJob, Unit) {
130                     flow.collect { value ->
131                         withContextUndispatched(scopeContext, Unit, cnt) {
132                             val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
133                             emit(transform(value, NULL.unbox(otherValue)))
134                         }
135                     }
136                 }
137             } catch (e: AbortFlowException) {
138                 e.checkOwnership(owner = this@unsafeFlow)
139             } finally {
140                 if (!second.isClosedForReceive) second.cancel()
141             }
142         }
143     }
144