1 /* <lambda>null2 * Copyright 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 @file:RestrictTo(RestrictTo.Scope.LIBRARY) 18 19 package androidx.paging 20 21 import androidx.annotation.RestrictTo 22 import kotlin.coroutines.resume 23 import kotlinx.coroutines.CoroutineScope 24 import kotlinx.coroutines.Job 25 import kotlinx.coroutines.channels.Channel 26 import kotlinx.coroutines.channels.SendChannel 27 import kotlinx.coroutines.coroutineScope 28 import kotlinx.coroutines.flow.Flow 29 import kotlinx.coroutines.flow.buffer 30 import kotlinx.coroutines.flow.flow 31 import kotlinx.coroutines.flow.internal.FusibleFlow 32 import kotlinx.coroutines.launch 33 import kotlinx.coroutines.suspendCancellableCoroutine 34 35 /** 36 * This is a simplified channelFlow implementation as a temporary measure until channel flow leaves 37 * experimental state. 38 * 39 * The exact same implementation is not possible due to [FusibleFlow] being an internal API. To get 40 * close to that implementation, internally we use a [Channel.RENDEZVOUS] channel and use a [buffer] 41 * ([Channel.BUFFERED]) operator on the resulting Flow. This gives us a close behavior where the 42 * default is buffered and any followup buffer operation will result in +1 value being produced. 43 */ 44 internal fun <T> simpleChannelFlow(block: suspend SimpleProducerScope<T>.() -> Unit): Flow<T> { 45 return flow { 46 coroutineScope { 47 val channel = Channel<T>(capacity = Channel.RENDEZVOUS) 48 val producer = launch { 49 try { 50 // run producer in a separate inner scope to ensure we wait for its children 51 // to finish, in case it does more launches inside. 52 coroutineScope { 53 val producerScopeImpl = 54 SimpleProducerScopeImpl( 55 scope = this, 56 channel = channel, 57 ) 58 producerScopeImpl.block() 59 } 60 channel.close() 61 } catch (t: Throwable) { 62 channel.close(t) 63 } 64 } 65 for (item in channel) { 66 emit(item) 67 } 68 // in case channel closed before producer completes, cancel the producer. 69 producer.cancel() 70 } 71 } 72 .buffer(Channel.BUFFERED) 73 } 74 75 internal interface SimpleProducerScope<T> : CoroutineScope, SendChannel<T> { 76 val channel: SendChannel<T> 77 awaitClosenull78 suspend fun awaitClose(block: () -> Unit) 79 } 80 81 internal class SimpleProducerScopeImpl<T>( 82 scope: CoroutineScope, 83 override val channel: SendChannel<T>, 84 ) : SimpleProducerScope<T>, CoroutineScope by scope, SendChannel<T> by channel { 85 override suspend fun awaitClose(block: () -> Unit) { 86 try { 87 val job = 88 checkNotNull(coroutineContext[Job]) { "Internal error, context should have a job." } 89 suspendCancellableCoroutine<Unit> { cont -> 90 job.invokeOnCompletion { cont.resume(Unit) } 91 } 92 } finally { 93 block() 94 } 95 } 96 } 97