• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:Suppress("UNCHECKED_CAST") // KT-32203
5 
6 package kotlinx.coroutines.flow.internal
7 
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import kotlinx.coroutines.internal.*
12 private typealias Update = IndexedValue<Any?>
13 
14 @PublishedApi
15 internal suspend fun <R, T> FlowCollector<R>.combineInternal(
16     flows: Array<out Flow<T>>,
17     arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
18     transform: suspend FlowCollector<R>.(Array<T>) -> Unit
19 ): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
20     val size = flows.size
21     if (size == 0) return@flowScope // bail-out for empty input
22     val latestValues = arrayOfNulls<Any?>(size)
23     latestValues.fill(UNINITIALIZED) // Smaller bytecode & faster than Array(size) { UNINITIALIZED }
24     val resultChannel = Channel<Update>(size)
25     val nonClosed = LocalAtomicInt(size)
26     var remainingAbsentValues = size
27     for (i in 0 until size) {
28         // Coroutine per flow that keeps track of its value and sends result to downstream
29         launch {
30             try {
31                 flows[i].collect { value ->
32                     resultChannel.send(Update(i, value))
33                     yield() // Emulate fairness, giving each flow chance to emit
34                 }
35             } finally {
36                 // Close the channel when there is no more flows
37                 if (nonClosed.decrementAndGet() == 0) {
38                     resultChannel.close()
39                 }
40             }
41         }
42     }
43 
44     /*
45      * Batch-receive optimization: read updates in batches, but bail-out
46      * as soon as we encountered two values from the same source
47      */
48     val lastReceivedEpoch = ByteArray(size)
49     var currentEpoch: Byte = 0
50     while (true) {
51         ++currentEpoch
52         // Start batch
53         // The very first receive in epoch should be suspending
54         var element = resultChannel.receiveCatching().getOrNull() ?: break // Channel is closed, nothing to do here
55         while (true) {
56             val index = element.index
57             // Update values
58             val previous = latestValues[index]
59             latestValues[index] = element.value
60             if (previous === UNINITIALIZED) --remainingAbsentValues
61             // Check epoch
62             // Received the second value from the same flow in the same epoch -- bail out
63             if (lastReceivedEpoch[index] == currentEpoch) break
64             lastReceivedEpoch[index] = currentEpoch
65             element = resultChannel.tryReceive().getOrNull() ?: break
66         }
67 
68         // Process batch result if there is enough data
69         if (remainingAbsentValues == 0) {
70             /*
71              * If arrayFactory returns null, then we can avoid array copy because
72              * it's our own safe transformer that immediately deconstructs the array
73              */
74             val results = arrayFactory()
75             if (results == null) {
76                 transform(latestValues as Array<T>)
77             } else {
78                 (latestValues as Array<T?>).copyInto(results)
79                 transform(results as Array<T>)
80             }
81         }
82     }
83 }
84 
zipImplnull85 internal fun <T1, T2, R> zipImpl(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
86     unsafeFlow {
87         coroutineScope {
88             val second = produce<Any> {
89                 flow2.collect { value ->
90                     return@collect channel.send(value ?: NULL)
91                 }
92             }
93 
94             /*
95              * This approach only works with rendezvous channel and is required to enforce correctness
96              * in the following scenario:
97              * ```
98              * val f1 = flow { emit(1); delay(Long.MAX_VALUE) }
99              * val f2 = flowOf(1)
100              * f1.zip(f2) { ... }
101              * ```
102              *
103              * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
104              */
105             val collectJob = Job()
106             (second as SendChannel<*>).invokeOnClose {
107                 // Optimization to avoid AFE allocation when the other flow is done
108                 if (collectJob.isActive) collectJob.cancel(AbortFlowException(this@unsafeFlow))
109             }
110 
111             try {
112                 /*
113                  * Non-trivial undispatched (because we are in the right context and there is no structured concurrency)
114                  * hierarchy:
115                  * -Outer coroutineScope that owns the whole zip process
116                  * - First flow is collected by the child of coroutineScope, collectJob.
117                  *    So it can be safely cancelled as soon as the second flow is done
118                  * - **But** the downstream MUST NOT be cancelled when the second flow is done,
119                  *    so we emit to downstream from coroutineScope job.
120                  * Typically, such hierarchy requires coroutine for collector that communicates
121                  * with coroutines scope via a channel, but it's way too expensive, so
122                  * we are using this trick instead.
123                  */
124                 val scopeContext = coroutineContext
125                 val cnt = threadContextElements(scopeContext)
126                 withContextUndispatched(coroutineContext + collectJob, Unit) {
127                     flow.collect { value ->
128                         withContextUndispatched(scopeContext, Unit, cnt) {
129                             val otherValue = second.receiveCatching().getOrElse {
130                                 throw it ?:AbortFlowException(this@unsafeFlow)
131                             }
132                             emit(transform(value, NULL.unbox(otherValue)))
133                         }
134                     }
135                 }
136             } catch (e: AbortFlowException) {
137                 e.checkOwnership(owner = this@unsafeFlow)
138             } finally {
139                 second.cancel()
140             }
141         }
142     }
143