1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.jdk9 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import kotlinx.coroutines.reactive.asFlow 10 import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher 11 import kotlinx.coroutines.reactive.collect 12 import kotlinx.coroutines.channels.* 13 import org.reactivestreams.* 14 import kotlin.coroutines.* 15 import java.util.concurrent.Flow as JFlow 16 17 /** 18 * Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow]. 19 * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure. 20 * In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request]. 21 * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default. 22 * 23 * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight 24 * elements are discarded. 25 */ asFlownull26public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> = 27 FlowAdapters.toPublisher(this).asFlow() 28 29 /** 30 * Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher]. 31 * 32 * An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber] 33 * methods. 34 * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to 35 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher 36 * is used, so calls are performed from an arbitrary thread. 37 */ 38 @JvmOverloads // binary compatibility 39 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> = 40 FlowAdapters.toFlowPublisher(asReactivePublisher(context)) 41 42 /** 43 * Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element. 44 * 45 * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from 46 * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect]. 47 */ 48 public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit = 49 FlowAdapters.toPublisher(this).collect(action) 50