• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow.internal
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.sync.*
11 import kotlin.coroutines.*
12 
13 internal class ChannelFlowTransformLatest<T, R>(
14     private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
15     flow: Flow<T>,
16     context: CoroutineContext = EmptyCoroutineContext,
17     capacity: Int = Channel.BUFFERED,
18     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
19 ) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
20     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
21         ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)
22 
23     override suspend fun flowCollect(collector: FlowCollector<R>) {
24         assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
25         coroutineScope {
26             var previousFlow: Job? = null
27             flow.collect { value ->
28                 previousFlow?.apply {
29                     cancel(ChildCancelledException())
30                     join()
31                 }
32                 // Do not pay for dispatch here, it's never necessary
33                 previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
34                     collector.transform(value)
35                 }
36             }
37         }
38     }
39 }
40 
41 internal class ChannelFlowMerge<T>(
42     private val flow: Flow<Flow<T>>,
43     private val concurrency: Int,
44     context: CoroutineContext = EmptyCoroutineContext,
45     capacity: Int = Channel.BUFFERED,
46     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
47 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
createnull48     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
49         ChannelFlowMerge(flow, concurrency, context, capacity, onBufferOverflow)
50 
51     override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
52         return scope.produce(context, capacity, block = collectToFun)
53     }
54 
collectTonull55     override suspend fun collectTo(scope: ProducerScope<T>) {
56         val semaphore = Semaphore(concurrency)
57         val collector = SendingCollector(scope)
58         val job: Job? = coroutineContext[Job]
59         flow.collect { inner ->
60             /*
61              * We launch a coroutine on each emitted element and the only potential
62              * suspension point in this collector is `semaphore.acquire` that rarely suspends,
63              * so we manually check for cancellation to propagate it to the upstream in time.
64              */
65             job?.ensureActive()
66             semaphore.acquire()
67             scope.launch {
68                 try {
69                     inner.collect(collector)
70                 } finally {
71                     semaphore.release() // Release concurrency permit
72                 }
73             }
74         }
75     }
76 
additionalToStringPropsnull77     override fun additionalToStringProps(): String = "concurrency=$concurrency"
78 }
79 
80 internal class ChannelLimitedFlowMerge<T>(
81     private val flows: Iterable<Flow<T>>,
82     context: CoroutineContext = EmptyCoroutineContext,
83     capacity: Int = Channel.BUFFERED,
84     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
85 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
86     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
87         ChannelLimitedFlowMerge(flows, context, capacity, onBufferOverflow)
88 
89     override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
90         return scope.produce(context, capacity, block = collectToFun)
91     }
92 
93     override suspend fun collectTo(scope: ProducerScope<T>) {
94         val collector = SendingCollector(scope)
95         flows.forEach { flow ->
96             scope.launch { flow.collect(collector) }
97         }
98     }
99 }
100