• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 package kotlinx.coroutines.reactive
5 
6 import kotlinx.atomicfu.*
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.selects.*
10 import kotlinx.coroutines.sync.*
11 import org.reactivestreams.*
12 import kotlin.coroutines.*
13 
14 /**
15  * Creates a cold reactive [Publisher] that runs a given [block] in a coroutine.
16  *
17  * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
18  * The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send],
19  * completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
20  * errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
21  * Unsubscribing cancels the running coroutine.
22  *
23  * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
24  * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
25  *
26  * Coroutine context can be specified with [context] argument.
27  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
28  * used.
29  *
30  * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
31  *        to cancellation and error handling may change in the future.
32  *
33  * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
34  */
35 public fun <T> publish(
36     context: CoroutineContext = EmptyCoroutineContext,
37     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
38 ): Publisher<T> {
39     require(context[Job] === null) { "Publisher context cannot contain job in it." +
40             "Its lifecycle should be managed via subscription. Had $context" }
41     return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
42 }
43 
44 /** @suppress For internal use from other reactive integration modules only */
45 @InternalCoroutinesApi
publishInternalnull46 public fun <T> publishInternal(
47     scope: CoroutineScope, // support for legacy publish in scope
48     context: CoroutineContext,
49     exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
50     block: suspend ProducerScope<T>.() -> Unit
51 ): Publisher<T> = Publisher { subscriber ->
52     // specification requires NPE on null subscriber
53     if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
54     val newContext = scope.newCoroutineContext(context)
55     val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
56     subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
57     coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
58 }
59 
60 private const val CLOSED = -1L    // closed, but have not signalled onCompleted/onError yet
61 private const val SIGNALLED = -2L  // already signalled subscriber onCompleted/onError
tnull62 private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
63 
64 /** @suppress */
65 @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
66 @InternalCoroutinesApi
67 public class PublisherCoroutine<in T>(
68     parentContext: CoroutineContext,
69     private val subscriber: Subscriber<T>,
70     private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
71 ) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, Subscription {
72     override val channel: SendChannel<T> get() = this
73 
74     private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
75 
76     @Volatile
77     private var cancelled = false // true after Subscription.cancel() is invoked
78 
79     override val isClosedForSend: Boolean get() = !isActive
closenull80     override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
81     override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
82         throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
83 
84     // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
85     private val mutex: Mutex = Mutex(locked = true)
86 
87     @Suppress("UNCHECKED_CAST", "INVISIBLE_MEMBER")
88     override val onSend: SelectClause2<T, SendChannel<T>> get() = SelectClause2Impl(
89         clauseObject = this,
90         regFunc = PublisherCoroutine<*>::registerSelectForSend as RegistrationFunction,
91         processResFunc = PublisherCoroutine<*>::processResultSelectSend as ProcessResultFunction
92     )
93 
94     @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER")
95     private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
96         // Try to acquire the mutex and complete in the registration phase.
97         if (mutex.tryLock()) {
98             select.selectInRegistrationPhase(Unit)
99             return
100         }
101         // Start a new coroutine that waits for the mutex, invoking `trySelect(..)` after that.
102         // Please note that at the point of the `trySelect(..)` invocation the corresponding
103         // `select` can still be in the registration phase, making this `trySelect(..)` bound to fail.
104         // In this case, the `onSend` clause will be re-registered, which alongside with the mutex
105         // manipulation makes the resulting solution obstruction-free.
106         launch {
107             mutex.lock()
108             if (!select.trySelect(this@PublisherCoroutine, Unit)) {
109                 mutex.unlock()
110             }
111         }
112     }
113 
114     @Suppress("RedundantNullableReturnType", "UNUSED_PARAMETER", "UNCHECKED_CAST")
processResultSelectSendnull115     private fun processResultSelectSend(element: Any?, selectResult: Any?): Any? {
116         doLockedNext(element as T)?.let { throw it }
117         return this@PublisherCoroutine
118     }
119 
trySendnull120     override fun trySend(element: T): ChannelResult<Unit> =
121         if (!mutex.tryLock()) {
122             ChannelResult.failure()
123         } else {
throwablenull124             when (val throwable = doLockedNext(element)) {
125                 null -> ChannelResult.success(Unit)
126                 else -> ChannelResult.closed(throwable)
127             }
128         }
129 
sendnull130     public override suspend fun send(element: T) {
131         mutex.lock()
132         doLockedNext(element)?.let { throw it }
133     }
134 
135     /*
136      * This code is not trivial because of the following properties:
137      * 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
138      *    be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
139      *    coroutines are invoking `send` function.
140      * 2. Normally, `onComplete/onError` notification is sent only when coroutine and all its children are complete.
141      *    However, nothing prevents `publish` coroutine from leaking reference to it send channel to some
142      *    globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
143      *    lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
144      *    `onComplete/onError` is also done under the same mutex.
145      * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the
146      *    subscriber actually requests some elements. This is implemented by the mutex being locked when emitting
147      *    elements is not permitted (`_nRequested.value == 0`).
148      */
149 
150     /**
151      * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex.
152      *
153      * Requires that the caller has locked the mutex before this invocation.
154      *
155      * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success.
156      *
157      * @throws NullPointerException if the passed element is `null`
158      */
doLockedNextnull159     private fun doLockedNext(elem: T): Throwable? {
160         if (elem == null) {
161             unlockAndCheckCompleted()
162             throw NullPointerException("Attempted to emit `null` inside a reactive publisher")
163         }
164         /** This guards against the case when the caller of this function managed to lock the mutex not because some
165          * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed.
166          *
167          * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's
168          * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually*
169          * stop signalling the subscriber. */
170         if (!isActive) {
171             unlockAndCheckCompleted()
172             return getCancellationException()
173         }
174         // notify the subscriber
175         try {
176             subscriber.onNext(elem)
177         } catch (cause: Throwable) {
178             /** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the
179              * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler
180              * for uncaught exceptions and consider the subscription cancelled, as mandated by
181              * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13.
182              *
183              * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the
184              * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or
185              * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is
186              * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as
187              * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately
188              * would violate the contracts we place on them. */
189             cancelled = true
190             val causeDelivered = close(cause)
191             unlockAndCheckCompleted()
192             return if (causeDelivered) {
193                 // `cause` is the reason this channel is closed
194                 cause
195             } else {
196                 // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
197                 exceptionOnCancelHandler(cause, context)
198                 getCancellationException()
199             }
200         }
201         // now update nRequested
202         while (true) { // lock-free loop on nRequested
203             val current = _nRequested.value
204             if (current < 0) break // closed from inside onNext => unlock
205             if (current == Long.MAX_VALUE) break // no back-pressure => unlock
206             val updated = current - 1
207             if (_nRequested.compareAndSet(current, updated)) {
208                 if (updated == 0L) {
209                     // return to keep locked due to back-pressure
210                     return null
211                 }
212                 break // unlock if updated > 0
213             }
214         }
215         unlockAndCheckCompleted()
216         return null
217     }
218 
unlockAndCheckCompletednull219     private fun unlockAndCheckCompleted() {
220        /*
221         * There is no sense to check completion before doing `unlock`, because completion might
222         * happen after this check and before `unlock` (see `signalCompleted` that does not do anything
223         * if it fails to acquire the lock that we are still holding).
224         * We have to recheck `isCompleted` after `unlock` anyway.
225         */
226         mutex.unlock()
227         // check isCompleted and try to regain lock to signal completion
228         if (isCompleted && mutex.tryLock()) {
229             doLockedSignalCompleted(completionCause, completionCauseHandled)
230         }
231     }
232 
233     // assert: mutex.isLocked() & isCompleted
doLockedSignalCompletednull234     private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
235         try {
236             if (_nRequested.value == SIGNALLED)
237                 return
238             _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
239             // Specification requires that after the cancellation is requested we eventually stop calling onXXX
240             if (cancelled) {
241                 // If the parent failed to handle this exception, then we must not lose the exception
242                 if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
243                 return
244             }
245             if (cause == null) {
246                 try {
247                     subscriber.onComplete()
248                 } catch (e: Throwable) {
249                     handleCoroutineException(context, e)
250                 }
251             } else {
252                 try {
253                     // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`.
254                     subscriber.onError(cause)
255                 } catch (e: Throwable) {
256                     if (e !== cause) {
257                         cause.addSuppressed(e)
258                     }
259                     handleCoroutineException(context, cause)
260                 }
261             }
262         } finally {
263             mutex.unlock()
264         }
265     }
266 
requestnull267     override fun request(n: Long) {
268         if (n <= 0) {
269             // Specification requires to call onError with IAE for n <= 0
270             cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
271             return
272         }
273         while (true) { // lock-free loop for nRequested
274             val cur = _nRequested.value
275             if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
276             var upd = cur + n
277             if (upd < 0 || n == Long.MAX_VALUE)
278                 upd = Long.MAX_VALUE
279             if (cur == upd) return // nothing to do
280             if (_nRequested.compareAndSet(cur, upd)) {
281                 // unlock the mutex when we don't have back-pressure anymore
282                 if (cur == 0L) {
283                     /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
284                      * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
285                      * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
286                      * impossible for any other operations to assume that they own the lock without actually acquiring
287                      * it. */
288                     unlockAndCheckCompleted()
289                 }
290                 return
291             }
292         }
293     }
294 
295     // assert: isCompleted
signalCompletednull296     private fun signalCompleted(cause: Throwable?, handled: Boolean) {
297         while (true) { // lock-free loop for nRequested
298             val current = _nRequested.value
299             if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
300             check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
301             if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS
302             // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
303             if (current == 0L) {
304                 doLockedSignalCompleted(cause, handled)
305             } else {
306                 // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
307                 if (mutex.tryLock()) doLockedSignalCompleted(cause, handled)
308                 // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
309             }
310             return // done anyway
311         }
312     }
313 
onCompletednull314     override fun onCompleted(value: Unit) {
315         signalCompleted(null, false)
316     }
317 
onCancellednull318     override fun onCancelled(cause: Throwable, handled: Boolean) {
319         signalCompleted(cause, handled)
320     }
321 
cancelnull322     override fun cancel() {
323         // Specification requires that after cancellation publisher stops signalling
324         // This flag distinguishes subscription cancellation request from the job crash
325         cancelled = true
326         super.cancel(null)
327     }
328 }
329 
330 @Deprecated(
331     message = "CoroutineScope.publish is deprecated in favour of top-level publish",
332     level = DeprecationLevel.HIDDEN,
333     replaceWith = ReplaceWith("publish(context, block)")
334 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
publishnull335 public fun <T> CoroutineScope.publish(
336     context: CoroutineContext = EmptyCoroutineContext,
337     @BuilderInference block: suspend ProducerScope<T>.() -> Unit
338 ): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER, block)
339