1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.flow.internal 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.flow.* 10 import kotlinx.coroutines.sync.* 11 import kotlin.coroutines.* 12 13 internal class ChannelFlowTransformLatest<T, R>( 14 private val transform: suspend FlowCollector<R>.(value: T) -> Unit, 15 flow: Flow<T>, 16 context: CoroutineContext = EmptyCoroutineContext, 17 capacity: Int = Channel.BUFFERED, 18 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND 19 ) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) { 20 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> = 21 ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow) 22 23 override suspend fun flowCollect(collector: FlowCollector<R>) { 24 assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream 25 coroutineScope { 26 var previousFlow: Job? = null 27 flow.collect { value -> 28 previousFlow?.apply { 29 cancel(ChildCancelledException()) 30 join() 31 } 32 // Do not pay for dispatch here, it's never necessary 33 previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { 34 collector.transform(value) 35 } 36 } 37 } 38 } 39 } 40 41 internal class ChannelFlowMerge<T>( 42 private val flow: Flow<Flow<T>>, 43 private val concurrency: Int, 44 context: CoroutineContext = EmptyCoroutineContext, 45 capacity: Int = Channel.BUFFERED, 46 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND 47 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) { createnull48 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = 49 ChannelFlowMerge(flow, concurrency, context, capacity, onBufferOverflow) 50 51 override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> { 52 return scope.produce(context, capacity, block = collectToFun) 53 } 54 collectTonull55 override suspend fun collectTo(scope: ProducerScope<T>) { 56 val semaphore = Semaphore(concurrency) 57 val collector = SendingCollector(scope) 58 val job: Job? = coroutineContext[Job] 59 flow.collect { inner -> 60 /* 61 * We launch a coroutine on each emitted element and the only potential 62 * suspension point in this collector is `semaphore.acquire` that rarely suspends, 63 * so we manually check for cancellation to propagate it to the upstream in time. 64 */ 65 job?.ensureActive() 66 semaphore.acquire() 67 scope.launch { 68 try { 69 inner.collect(collector) 70 } finally { 71 semaphore.release() // Release concurrency permit 72 } 73 } 74 } 75 } 76 additionalToStringPropsnull77 override fun additionalToStringProps(): String = "concurrency=$concurrency" 78 } 79 80 internal class ChannelLimitedFlowMerge<T>( 81 private val flows: Iterable<Flow<T>>, 82 context: CoroutineContext = EmptyCoroutineContext, 83 capacity: Int = Channel.BUFFERED, 84 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND 85 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) { 86 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = 87 ChannelLimitedFlowMerge(flows, context, capacity, onBufferOverflow) 88 89 override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> { 90 return scope.produce(context, capacity, block = collectToFun) 91 } 92 93 override suspend fun collectTo(scope: ProducerScope<T>) { 94 val collector = SendingCollector(scope) 95 flows.forEach { flow -> 96 scope.launch { flow.collect(collector) } 97 } 98 } 99 } 100