• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.jdk9
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.*
9 import kotlinx.coroutines.reactive.asFlow
10 import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
11 import kotlinx.coroutines.reactive.collect
12 import kotlinx.coroutines.channels.*
13 import org.reactivestreams.*
14 import kotlin.coroutines.*
15 import java.util.concurrent.Flow as JFlow
16 
17 /**
18  * Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow].
19  * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
20  * In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request].
21  * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
22  *
23  * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
24  * elements are discarded.
25  */
asFlownull26 public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
27     FlowAdapters.toPublisher(this).asFlow()
28 
29 /**
30  * Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher].
31  *
32  * An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
33  * methods.
34  * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
35  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
36  * is used, so calls are performed from an arbitrary thread.
37  */
38 @JvmOverloads // binary compatibility
39 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> =
40     FlowAdapters.toFlowPublisher(asReactivePublisher(context))
41 
42 /**
43  * Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
44  *
45  * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
46  * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
47  */
48 public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
49     FlowAdapters.toPublisher(this).collect(action)
50