1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 package kotlinx.coroutines.reactive
5 
6 import kotlinx.atomicfu.*
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.intrinsics.*
10 import kotlinx.coroutines.selects.*
11 import kotlinx.coroutines.sync.*
12 import org.reactivestreams.*
13 import kotlin.coroutines.*
14 
15 /**
16  * Creates a cold reactive [Publisher] that runs a given [block] in a coroutine.
17  *
18  * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
19  * The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send],
20  * completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
21  * errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
22  * Unsubscribing cancels the running coroutine.
23  *
24  * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
25  * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
26  *
27  * Coroutine context can be specified with [context] argument.
28  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
29  * used.
30  *
31  * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
32  *        to cancellation and error handling may change in the future.
33  *
34  * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
35  */
36 public fun <T> publish(
37     context: CoroutineContext = EmptyCoroutineContext,
38     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
39 ): Publisher<T> {
40     require(context[Job] === null) { "Publisher context cannot contain job in it." +
41             "Its lifecycle should be managed via subscription. Had $context" }
42     return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
43 }
44 
45 /** @suppress For internal use from other reactive integration modules only */
46 @InternalCoroutinesApi
publishInternalnull47 public fun <T> publishInternal(
48     scope: CoroutineScope, // support for legacy publish in scope
49     context: CoroutineContext,
50     exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
51     block: suspend ProducerScope<T>.() -> Unit
52 ): Publisher<T> = Publisher { subscriber ->
53     // specification requires NPE on null subscriber
54     if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
55     val newContext = scope.newCoroutineContext(context)
56     val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
57     subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
58     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
59 }
60 
61 private const val CLOSED = -1L    // closed, but have not signalled onCompleted/onError yet
62 private const val SIGNALLED = -2L  // already signalled subscriber onCompleted/onError
tnull63 private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
64 
65 /** @suppress */
66 @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
67 @InternalCoroutinesApi
68 public class PublisherCoroutine<in T>(
69     parentContext: CoroutineContext,
70     private val subscriber: Subscriber<T>,
71     private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
72 ) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
73     override val channel: SendChannel<T> get() = this
74 
75     // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
76     private val mutex = Mutex(locked = true)
77     private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
78 
79     @Volatile
80     private var cancelled = false // true after Subscription.cancel() is invoked
81 
82     override val isClosedForSend: Boolean get() = !isActive
closenull83     override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
84     override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
85         throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
86 
87     override fun trySend(element: T): ChannelResult<Unit> =
88         if (!mutex.tryLock()) {
89             ChannelResult.failure()
90         } else {
throwablenull91             when (val throwable = doLockedNext(element)) {
92                 null -> ChannelResult.success(Unit)
93                 else -> ChannelResult.closed(throwable)
94             }
95         }
96 
sendnull97     public override suspend fun send(element: T) {
98         mutex.lock()
99         doLockedNext(element)?.let { throw it }
100     }
101 
102     override val onSend: SelectClause2<T, SendChannel<T>>
103         get() = this
104 
105     // registerSelectSend
106     @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
registerSelectClause2null107     override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
108         val clause =  suspend {
109             doLockedNext(element)?.let { throw it }
110             block(this)
111         }
112 
113         launch(start = CoroutineStart.UNDISPATCHED) {
114             mutex.lock()
115             // Already selected -- bail out
116             if (!select.trySelect()) {
117                 mutex.unlock()
118                 return@launch
119             }
120 
121             clause.startCoroutineCancellable(select.completion)
122         }
123     }
124 
125     /*
126      * This code is not trivial because of the following properties:
127      * 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
128      *    be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
129      *    coroutines are invoking `send` function.
130      * 2. Normally, `onComplete/onError` notification is sent only when coroutine and all its children are complete.
131      *    However, nothing prevents `publish` coroutine from leaking reference to it send channel to some
132      *    globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
133      *    lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
134      *    `onComplete/onError` is also done under the same mutex.
135      * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the
136      *    subscriber actually requests some elements. This is implemented by the mutex being locked when emitting
137      *    elements is not permitted (`_nRequested.value == 0`).
138      */
139 
140     /**
141      * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex.
142      *
143      * Requires that the caller has locked the mutex before this invocation.
144      *
145      * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success.
146      *
147      * @throws NullPointerException if the passed element is `null`
148      */
doLockedNextnull149     private fun doLockedNext(elem: T): Throwable? {
150         if (elem == null) {
151             unlockAndCheckCompleted()
152             throw NullPointerException("Attempted to emit `null` inside a reactive publisher")
153         }
154         /** This guards against the case when the caller of this function managed to lock the mutex not because some
155          * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed.
156          *
157          * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's
158          * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually*
159          * stop signalling the subscriber. */
160         if (!isActive) {
161             unlockAndCheckCompleted()
162             return getCancellationException()
163         }
164         // notify the subscriber
165         try {
166             subscriber.onNext(elem)
167         } catch (cause: Throwable) {
168             /** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the
169              * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler
170              * for uncaught exceptions and consider the subscription cancelled, as mandated by
171              * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13.
172              *
173              * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the
174              * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or
175              * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is
176              * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as
177              * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately
178              * would violate the contracts we place on them. */
179             cancelled = true
180             val causeDelivered = close(cause)
181             unlockAndCheckCompleted()
182             return if (causeDelivered) {
183                 // `cause` is the reason this channel is closed
184                 cause
185             } else {
186                 // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
187                 exceptionOnCancelHandler(cause, context)
188                 getCancellationException()
189             }
190         }
191         // now update nRequested
192         while (true) { // lock-free loop on nRequested
193             val current = _nRequested.value
194             if (current < 0) break // closed from inside onNext => unlock
195             if (current == Long.MAX_VALUE) break // no back-pressure => unlock
196             val updated = current - 1
197             if (_nRequested.compareAndSet(current, updated)) {
198                 if (updated == 0L) {
199                     // return to keep locked due to back-pressure
200                     return null
201                 }
202                 break // unlock if updated > 0
203             }
204         }
205         unlockAndCheckCompleted()
206         return null
207     }
208 
unlockAndCheckCompletednull209     private fun unlockAndCheckCompleted() {
210        /*
211         * There is no sense to check completion before doing `unlock`, because completion might
212         * happen after this check and before `unlock` (see `signalCompleted` that does not do anything
213         * if it fails to acquire the lock that we are still holding).
214         * We have to recheck `isCompleted` after `unlock` anyway.
215         */
216         mutex.unlock()
217         // check isCompleted and and try to regain lock to signal completion
218         if (isCompleted && mutex.tryLock()) {
219             doLockedSignalCompleted(completionCause, completionCauseHandled)
220         }
221     }
222 
223     // assert: mutex.isLocked() & isCompleted
doLockedSignalCompletednull224     private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
225         try {
226             if (_nRequested.value == SIGNALLED)
227                 return
228             _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
229             // Specification requires that after the cancellation is requested we eventually stop calling onXXX
230             if (cancelled) {
231                 // If the parent failed to handle this exception, then we must not lose the exception
232                 if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
233                 return
234             }
235             if (cause == null) {
236                 try {
237                     subscriber.onComplete()
238                 } catch (e: Throwable) {
239                     handleCoroutineException(context, e)
240                 }
241             } else {
242                 try {
243                     // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`.
244                     subscriber.onError(cause)
245                 } catch (e: Throwable) {
246                     if (e !== cause) {
247                         cause.addSuppressed(e)
248                     }
249                     handleCoroutineException(context, cause)
250                 }
251             }
252         } finally {
253             mutex.unlock()
254         }
255     }
256 
requestnull257     override fun request(n: Long) {
258         if (n <= 0) {
259             // Specification requires to call onError with IAE for n <= 0
260             cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
261             return
262         }
263         while (true) { // lock-free loop for nRequested
264             val cur = _nRequested.value
265             if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
266             var upd = cur + n
267             if (upd < 0 || n == Long.MAX_VALUE)
268                 upd = Long.MAX_VALUE
269             if (cur == upd) return // nothing to do
270             if (_nRequested.compareAndSet(cur, upd)) {
271                 // unlock the mutex when we don't have back-pressure anymore
272                 if (cur == 0L) {
273                     /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
274                      * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
275                      * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
276                      * impossible for any other operations to assume that they own the lock without actually acquiring
277                      * it. */
278                     unlockAndCheckCompleted()
279                 }
280                 return
281             }
282         }
283     }
284 
285     // assert: isCompleted
signalCompletednull286     private fun signalCompleted(cause: Throwable?, handled: Boolean) {
287         while (true) { // lock-free loop for nRequested
288             val current = _nRequested.value
289             if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
290             check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
291             if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS
292             // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
293             if (current == 0L) {
294                 doLockedSignalCompleted(cause, handled)
295             } else {
296                 // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
297                 if (mutex.tryLock()) doLockedSignalCompleted(cause, handled)
298                 // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
299             }
300             return // done anyway
301         }
302     }
303 
onCompletednull304     override fun onCompleted(value: Unit) {
305         signalCompleted(null, false)
306     }
307 
onCancellednull308     override fun onCancelled(cause: Throwable, handled: Boolean) {
309         signalCompleted(cause, handled)
310     }
311 
cancelnull312     override fun cancel() {
313         // Specification requires that after cancellation publisher stops signalling
314         // This flag distinguishes subscription cancellation request from the job crash
315         cancelled = true
316         super.cancel(null)
317     }
318 }
319 
320 @Deprecated(
321     message = "CoroutineScope.publish is deprecated in favour of top-level publish",
322     level = DeprecationLevel.HIDDEN,
323     replaceWith = ReplaceWith("publish(context, block)")
324 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
publishnull325 public fun <T> CoroutineScope.publish(
326     context: CoroutineContext = EmptyCoroutineContext,
327     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
328 ): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER, block)
329