1 /*
<lambda>null2  * Copyright 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:RestrictTo(RestrictTo.Scope.LIBRARY)
18 
19 package androidx.paging
20 
21 import androidx.annotation.RestrictTo
22 import kotlin.coroutines.resume
23 import kotlinx.coroutines.CoroutineScope
24 import kotlinx.coroutines.Job
25 import kotlinx.coroutines.channels.Channel
26 import kotlinx.coroutines.channels.SendChannel
27 import kotlinx.coroutines.coroutineScope
28 import kotlinx.coroutines.flow.Flow
29 import kotlinx.coroutines.flow.buffer
30 import kotlinx.coroutines.flow.flow
31 import kotlinx.coroutines.flow.internal.FusibleFlow
32 import kotlinx.coroutines.launch
33 import kotlinx.coroutines.suspendCancellableCoroutine
34 
35 /**
36  * This is a simplified channelFlow implementation as a temporary measure until channel flow leaves
37  * experimental state.
38  *
39  * The exact same implementation is not possible due to [FusibleFlow] being an internal API. To get
40  * close to that implementation, internally we use a [Channel.RENDEZVOUS] channel and use a [buffer]
41  * ([Channel.BUFFERED]) operator on the resulting Flow. This gives us a close behavior where the
42  * default is buffered and any followup buffer operation will result in +1 value being produced.
43  */
44 internal fun <T> simpleChannelFlow(block: suspend SimpleProducerScope<T>.() -> Unit): Flow<T> {
45     return flow {
46             coroutineScope {
47                 val channel = Channel<T>(capacity = Channel.RENDEZVOUS)
48                 val producer = launch {
49                     try {
50                         // run producer in a separate inner scope to ensure we wait for its children
51                         // to finish, in case it does more launches inside.
52                         coroutineScope {
53                             val producerScopeImpl =
54                                 SimpleProducerScopeImpl(
55                                     scope = this,
56                                     channel = channel,
57                                 )
58                             producerScopeImpl.block()
59                         }
60                         channel.close()
61                     } catch (t: Throwable) {
62                         channel.close(t)
63                     }
64                 }
65                 for (item in channel) {
66                     emit(item)
67                 }
68                 // in case channel closed before producer completes, cancel the producer.
69                 producer.cancel()
70             }
71         }
72         .buffer(Channel.BUFFERED)
73 }
74 
75 internal interface SimpleProducerScope<T> : CoroutineScope, SendChannel<T> {
76     val channel: SendChannel<T>
77 
awaitClosenull78     suspend fun awaitClose(block: () -> Unit)
79 }
80 
81 internal class SimpleProducerScopeImpl<T>(
82     scope: CoroutineScope,
83     override val channel: SendChannel<T>,
84 ) : SimpleProducerScope<T>, CoroutineScope by scope, SendChannel<T> by channel {
85     override suspend fun awaitClose(block: () -> Unit) {
86         try {
87             val job =
88                 checkNotNull(coroutineContext[Job]) { "Internal error, context should have a job." }
89             suspendCancellableCoroutine<Unit> { cont ->
90                 job.invokeOnCompletion { cont.resume(Unit) }
91             }
92         } finally {
93             block()
94         }
95     }
96 }
97