• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.jdk9
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.flow.*
5 import kotlinx.coroutines.reactive.asFlow
6 import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
7 import kotlinx.coroutines.reactive.collect
8 import kotlinx.coroutines.channels.*
9 import org.reactivestreams.*
10 import kotlin.coroutines.*
11 import java.util.concurrent.Flow as JFlow
12 
13 /**
14  * Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow].
15  * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
16  * In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request].
17  * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
18  *
19  * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
20  * elements are discarded.
21  */
asFlownull22 public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
23     FlowAdapters.toPublisher(this).asFlow()
24 
25 /**
26  * Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher].
27  *
28  * An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
29  * methods.
30  * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
31  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
32  * is used, so calls are performed from an arbitrary thread.
33  */
34 @JvmOverloads // binary compatibility
35 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> =
36     FlowAdapters.toFlowPublisher(asReactivePublisher(context))
37 
38 /**
39  * Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
40  *
41  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
42  * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
43  */
44 public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
45     FlowAdapters.toPublisher(this).collect(action)
46