1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.jdk9 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import kotlinx.coroutines.reactive.asFlow 10 import kotlinx.coroutines.reactive.asPublisher 11 import kotlinx.coroutines.reactive.collect 12 import org.reactivestreams.* 13 import kotlin.coroutines.* 14 import java.util.concurrent.Flow as JFlow 15 16 /** 17 * Transforms the given reactive [Publisher] into [Flow]. 18 * Use [buffer] operator on the resulting flow to specify the size of the backpressure. 19 * More precisely, it specifies the value of the subscription's [request][Subscription.request]. 20 * [buffer] default capacity is used by default. 21 * 22 * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements 23 * are discarded. 24 */ asFlownull25public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> = 26 FlowAdapters.toPublisher(this).asFlow() 27 28 /** 29 * Transforms the given flow to a reactive specification compliant [Publisher]. 30 * 31 * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. 32 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to 33 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher 34 * is used, so calls are performed from an arbitrary thread. 35 */ 36 @JvmOverloads // binary compatibility 37 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> { 38 val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context) 39 return FlowAdapters.toFlowPublisher(reactivePublisher) 40 } 41 42 /** 43 * Subscribes to this [Publisher] and performs the specified action for each received element. 44 * Cancels subscription if any exception happens during collect. 45 */ collectnull46public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit = 47 FlowAdapters.toPublisher(this).collect(action) 48