1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4 @file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
5
6 package kotlinx.coroutines.flow.internal
7
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.internal.*
12 import kotlin.coroutines.*
13 import kotlin.coroutines.intrinsics.*
14
15 private typealias Update = IndexedValue<Any?>
16
17 @PublishedApi
18 internal suspend fun <R, T> FlowCollector<R>.combineInternal(
19 flows: Array<out Flow<T>>,
20 arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
21 transform: suspend FlowCollector<R>.(Array<T>) -> Unit
22 ): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
23 val size = flows.size
24 if (size == 0) return@flowScope // bail-out for empty input
25 val latestValues = arrayOfNulls<Any?>(size)
26 latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster that Array(size) { UNINITIALIZED }
27 val resultChannel = Channel<Update>(size)
28 val nonClosed = LocalAtomicInt(size)
29 var remainingAbsentValues = size
30 for (i in 0 until size) {
31 // Coroutine per flow that keeps track of its value and sends result to downstream
32 launch {
33 try {
34 flows[i].collect { value ->
35 resultChannel.send(Update(i, value))
36 yield() // Emulate fairness, giving each flow chance to emit
37 }
38 } finally {
39 // Close the channel when there is no more flows
40 if (nonClosed.decrementAndGet() == 0) {
41 resultChannel.close()
42 }
43 }
44 }
45 }
46
47 /*
48 * Batch-receive optimization: read updates in batches, but bail-out
49 * as soon as we encountered two values from the same source
50 */
51 val lastReceivedEpoch = ByteArray(size)
52 var currentEpoch: Byte = 0
53 while (true) {
54 ++currentEpoch
55 // Start batch
56 // The very first receive in epoch should be suspending
57 var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here
58 while (true) {
59 val index = element.index
60 // Update values
61 val previous = latestValues[index]
62 latestValues[index] = element.value
63 if (previous === UNINITIALIZED) --remainingAbsentValues
64 // Check epoch
65 // Received the second value from the same flow in the same epoch -- bail out
66 if (lastReceivedEpoch[index] == currentEpoch) break
67 lastReceivedEpoch[index] = currentEpoch
68 element = resultChannel.poll() ?: break
69 }
70
71 // Process batch result if there is enough data
72 if (remainingAbsentValues == 0) {
73 /*
74 * If arrayFactory returns null, then we can avoid array copy because
75 * it's our own safe transformer that immediately deconstructs the array
76 */
77 val results = arrayFactory()
78 if (results == null) {
79 transform(latestValues as Array<T>)
80 } else {
81 (latestValues as Array<T?>).copyInto(results)
82 transform(results as Array<T>)
83 }
84 }
85 }
86 }
87
zipImplnull88 internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
89 unsafeFlow {
90 coroutineScope {
91 val second = produce<Any> {
92 flow2.collect { value ->
93 return@collect channel.send(value ?: NULL)
94 }
95 }
96
97 /*
98 * This approach only works with rendezvous channel and is required to enforce correctness
99 * in the following scenario:
100 * ```
101 * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
102 * val f2 = flowOf(1)
103 * f1.zip(f2) { ... }
104 * ```
105 *
106 * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
107 */
108 val collectJob = Job()
109 (second as SendChannel<*>).invokeOnClose {
110 // Optimization to avoid AFE allocation when the other flow is done
111 if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
112 }
113
114 try {
115 /*
116 * Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
117 * hierarchy:
118 * -Outer coroutineScope that owns the whole zip process
119 * - First flow is collected by the child of coroutineScope, collectJob.
120 * So it can be safely cancelled as soon as the second flow is done
121 * - **But** the downstream MUST NOT be cancelled when the second flow is done,
122 * so we emit to downstream from coroutineScope job.
123 * Typically, such hierarchy requires coroutine for collector that communicates
124 * with coroutines scope via a channel, but it's way too expensive, so
125 * we are using this trick instead.
126 */
127 val scopeContext = coroutineContext
128 val cnt = threadContextElements(scopeContext)
129 withContextUndispatched(coroutineContext + collectJob, Unit) {
130 flow.collect { value ->
131 withContextUndispatched(scopeContext, Unit, cnt) {
132 val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow)
133 emit(transform(value, NULL.unbox(otherValue)))
134 }
135 }
136 }
137 } catch (e: AbortFlowException) {
138 e.checkOwnership(owner = this@unsafeFlow)
139 } finally {
140 if (!second.isClosedForReceive) second.cancel()
141 }
142 }
143 }
144