1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:Suppress("UNCHECKED_CAST") // KT-32203
5
6 package kotlinx.coroutines.flow.internal
7
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.internal.*
12 private typealias Update = IndexedValue<Any?>
13
14 @PublishedApi
15 internal suspend fun <R, T> FlowCollector<R>.combineInternal(
16 flows: Array<out Flow<T>>,
17 arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
18 transform: suspend FlowCollector<R>.(Array<T>) -> Unit
19 ): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
20 val size = flows.size
21 if (size == 0) return@flowScope // bail-out for empty input
22 val latestValues = arrayOfNulls<Any?>(size)
23 latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster than Array(size) { UNINITIALIZED }
24 val resultChannel = Channel<Update>(size)
25 val nonClosed = LocalAtomicInt(size)
26 var remainingAbsentValues = size
27 for (i in 0 until size) {
28 // Coroutine per flow that keeps track of its value and sends result to downstream
29 launch {
30 try {
31 flows[i].collect { value ->
32 resultChannel.send(Update(i, value))
33 yield() // Emulate fairness, giving each flow chance to emit
34 }
35 } finally {
36 // Close the channel when there is no more flows
37 if (nonClosed.decrementAndGet() == 0) {
38 resultChannel.close()
39 }
40 }
41 }
42 }
43
44 /*
45 * Batch-receive optimization: read updates in batches, but bail-out
46 * as soon as we encountered two values from the same source
47 */
48 val lastReceivedEpoch = ByteArray(size)
49 var currentEpoch: Byte = 0
50 while (true) {
51 ++currentEpoch
52 // Start batch
53 // The very first receive in epoch should be suspending
54 var element = resultChannel.receiveCatching().getOrNull() ?: break // Channel is closed, nothing to do here
55 while (true) {
56 val index = element.index
57 // Update values
58 val previous = latestValues[index]
59 latestValues[index] = element.value
60 if (previous === UNINITIALIZED) --remainingAbsentValues
61 // Check epoch
62 // Received the second value from the same flow in the same epoch -- bail out
63 if (lastReceivedEpoch[index] == currentEpoch) break
64 lastReceivedEpoch[index] = currentEpoch
65 element = resultChannel.tryReceive().getOrNull() ?: break
66 }
67
68 // Process batch result if there is enough data
69 if (remainingAbsentValues == 0) {
70 /*
71 * If arrayFactory returns null, then we can avoid array copy because
72 * it's our own safe transformer that immediately deconstructs the array
73 */
74 val results = arrayFactory()
75 if (results == null) {
76 transform(latestValues as Array<T>)
77 } else {
78 (latestValues as Array<T?>).copyInto(results)
79 transform(results as Array<T>)
80 }
81 }
82 }
83 }
84
zipImplnull85 internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
86 unsafeFlow {
87 coroutineScope {
88 val second = produce<Any> {
89 flow2.collect { value ->
90 return@collect channel.send(value ?: NULL)
91 }
92 }
93
94 /*
95 * This approach only works with rendezvous channel and is required to enforce correctness
96 * in the following scenario:
97 * ```
98 * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
99 * val f2 = flowOf(1)
100 * f1.zip(f2) { ... }
101 * ```
102 *
103 * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
104 */
105 val collectJob = Job()
106 (second as SendChannel<*>).invokeOnClose {
107 // Optimization to avoid AFE allocation when the other flow is done
108 if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
109 }
110
111 try {
112 /*
113 * Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
114 * hierarchy:
115 * -Outer coroutineScope that owns the whole zip process
116 * - First flow is collected by the child of coroutineScope, collectJob.
117 * So it can be safely cancelled as soon as the second flow is done
118 * - **But** the downstream MUST NOT be cancelled when the second flow is done,
119 * so we emit to downstream from coroutineScope job.
120 * Typically, such hierarchy requires coroutine for collector that communicates
121 * with coroutines scope via a channel, but it's way too expensive, so
122 * we are using this trick instead.
123 */
124 val scopeContext = coroutineContext
125 val cnt = threadContextElements(scopeContext)
126 withContextUndispatched(coroutineContext + collectJob, Unit) {
127 flow.collect { value ->
128 withContextUndispatched(scopeContext, Unit, cnt) {
129 val otherValue = second.receiveCatching().getOrElse {
130 throw it ?:AbortFlowException(this@unsafeFlow)
131 }
132 emit(transform(value, NULL.unbox(otherValue)))
133 }
134 }
135 }
136 } catch (e: AbortFlowException) {
137 e.checkOwnership(owner = this@unsafeFlow)
138 } finally {
139 second.cancel()
140 }
141 }
142 }
143