• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.jdk9
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.*
9 import kotlinx.coroutines.reactive.asFlow
10 import kotlinx.coroutines.reactive.asPublisher
11 import kotlinx.coroutines.reactive.collect
12 import org.reactivestreams.*
13 import kotlin.coroutines.*
14 import java.util.concurrent.Flow as JFlow
15 
16 /**
17  * Transforms the given reactive [Publisher] into [Flow].
18  * Use [buffer] operator on the resulting flow to specify the size of the backpressure.
19  * More precisely, it specifies the value of the subscription's [request][Subscription.request].
20  * [buffer] default capacity is used by default.
21  *
22  * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
23  * are discarded.
24  */
asFlownull25 public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
26         FlowAdapters.toPublisher(this).asFlow()
27 
28 /**
29  * Transforms the given flow to a reactive specification compliant [Publisher].
30  *
31  * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
32  * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
33  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
34  * is used, so calls are performed from an arbitrary thread.
35  */
36 @JvmOverloads // binary compatibility
37 public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
38     val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
39     return FlowAdapters.toFlowPublisher(reactivePublisher)
40 }
41 
42 /**
43  * Subscribes to this [Publisher] and performs the specified action for each received element.
44  * Cancels subscription if any exception happens during collect.
45  */
collectnull46 public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
47     FlowAdapters.toPublisher(this).collect(action)
48