1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.reactor
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.reactive.*
10 import org.reactivestreams.*
11 import reactor.core.*
12 import reactor.core.publisher.*
13 import reactor.util.context.*
14 import kotlin.coroutines.*
15
16 /**
17 * Creates a cold reactive [Flux] that runs the given [block] in a coroutine.
18 * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
19 * The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete])
20 * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed,
21 * emits the error ([Subscriber.onError]) and closes the channel with the cause.
22 * Unsubscribing cancels the running coroutine.
23 *
24 * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
25 * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
26 *
27 * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
28 * to cancellation and error handling may change in the future.
29 *
30 * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
31 */
32 public fun <T> flux(
33 context: CoroutineContext = EmptyCoroutineContext,
34 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
35 ): Flux<T> {
36 require(context[Job] === null) { "Flux context cannot contain job in it." +
37 "Its lifecycle should be managed via Disposable handle. Had $context" }
38 return Flux.from(reactorPublish(GlobalScope, context, block))
39 }
40
reactorPublishnull41 private fun <T> reactorPublish(
42 scope: CoroutineScope,
43 context: CoroutineContext = EmptyCoroutineContext,
44 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
45 ): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? ->
46 if (subscriber !is CoreSubscriber) {
47 subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted."))
48 return@onSubscribe
49 }
50 val currentContext = subscriber.currentContext()
51 val reactorContext = context.extendReactorContext(currentContext)
52 val newContext = scope.newCoroutineContext(context + reactorContext)
53 val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
54 subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
55 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
56 }
57
ctxnull58 private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ctx ->
59 if (cause !is CancellationException) {
60 try {
61 Operators.onOperatorError(cause, ctx[ReactorContext]?.context ?: Context.empty())
62 } catch (e: Throwable) {
63 cause.addSuppressed(e)
64 handleCoroutineException(ctx, cause)
65 }
66 }
67 }
68
69 /** The proper way to reject the subscriber, according to
70 * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
71 */
rejectnull72 private fun <T> Subscriber<T>?.reject(t: Throwable) {
73 if (this == null)
74 throw NullPointerException("The subscriber can not be null")
75 onSubscribe(object: Subscription {
76 override fun request(n: Long) {
77 // intentionally left blank
78 }
79 override fun cancel() {
80 // intentionally left blank
81 }
82 })
83 onError(t)
84 }
85
86 /**
87 * @suppress
88 */
89 @Deprecated(
90 message = "CoroutineScope.flux is deprecated in favour of top-level flux",
91 level = DeprecationLevel.HIDDEN,
92 replaceWith = ReplaceWith("flux(context, block)")
93 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
fluxnull94 public fun <T> CoroutineScope.flux(
95 context: CoroutineContext = EmptyCoroutineContext,
96 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
97 ): Flux<T> =
98 Flux.from(reactorPublish(this, context, block))
99