• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.reactive.*
10 import org.reactivestreams.*
11 import reactor.core.*
12 import reactor.core.publisher.*
13 import reactor.util.context.*
14 import kotlin.coroutines.*
15 
16 /**
17  * Creates a cold reactive [Flux] that runs the given [block] in a coroutine.
18  * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
19  * The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete])
20  * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed,
21  * emits the error ([Subscriber.onError]) and closes the channel with the cause.
22  * Unsubscribing cancels the running coroutine.
23  *
24  * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
25  * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
26  *
27  * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
28  *        to cancellation and error handling may change in the future.
29  *
30  * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
31  */
32 public fun <T> flux(
33     context: CoroutineContext = EmptyCoroutineContext,
34     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
35 ): Flux<T> {
36     require(context[Job] === null) { "Flux context cannot contain job in it." +
37         "Its lifecycle should be managed via Disposable handle. Had $context" }
38     return Flux.from(reactorPublish(GlobalScope, context, block))
39 }
40 
reactorPublishnull41 private fun <T> reactorPublish(
42     scope: CoroutineScope,
43     context: CoroutineContext = EmptyCoroutineContext,
44     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
45 ): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? ->
46     if (subscriber !is CoreSubscriber) {
47         subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted."))
48         return@onSubscribe
49     }
50     val currentContext = subscriber.currentContext()
51     val reactorContext = context.extendReactorContext(currentContext)
52     val newContext = scope.newCoroutineContext(context + reactorContext)
53     val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
54     subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
55     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
56 }
57 
ctxnull58 private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ctx ->
59     if (cause !is CancellationException) {
60         try {
61             Operators.onOperatorError(cause, ctx[ReactorContext]?.context ?: Context.empty())
62         } catch (e: Throwable) {
63             cause.addSuppressed(e)
64             handleCoroutineException(ctx, cause)
65         }
66     }
67 }
68 
69 /** The proper way to reject the subscriber, according to
70  * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
71  */
rejectnull72 private fun <T> Subscriber<T>?.reject(t: Throwable) {
73     if (this == null)
74         throw NullPointerException("The subscriber can not be null")
75     onSubscribe(object: Subscription {
76         override fun request(n: Long) {
77             // intentionally left blank
78         }
79         override fun cancel() {
80             // intentionally left blank
81         }
82     })
83     onError(t)
84 }
85 
86 /**
87  * @suppress
88  */
89 @Deprecated(
90     message = "CoroutineScope.flux is deprecated in favour of top-level flux",
91     level = DeprecationLevel.HIDDEN,
92     replaceWith = ReplaceWith("flux(context, block)")
93 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
fluxnull94 public fun <T> CoroutineScope.flux(
95     context: CoroutineContext = EmptyCoroutineContext,
96     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
97 ): Flux<T> =
98     Flux.from(reactorPublish(this, context, block))
99