1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
6
7 package kotlinx.coroutines.reactive
8
9 import kotlinx.atomicfu.*
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.selects.*
13 import kotlinx.coroutines.sync.*
14 import org.reactivestreams.*
15 import kotlin.coroutines.*
16 import kotlin.internal.LowPriorityInOverloadResolution
17
18 /**
19 * Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
20 * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
21 * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
22 * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
23 * if coroutine throws an exception or closes channel with a cause.
24 * Unsubscribing cancels running coroutine.
25 *
26 * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
27 * `onNext` is not invoked concurrently.
28 *
29 * Coroutine context can be specified with [context] argument.
30 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
31 * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
32 *
33 * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
34 * to cancellation and error handling may change in the future.
35 */
36 @ExperimentalCoroutinesApi
37 public fun <T> publish(
38 context: CoroutineContext = EmptyCoroutineContext,
39 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
40 ): Publisher<T> {
41 require(context[Job] === null) { "Publisher context cannot contain job in it." +
42 "Its lifecycle should be managed via subscription. Had $context" }
43 return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
44 }
45
46 @Deprecated(
47 message = "CoroutineScope.publish is deprecated in favour of top-level publish",
48 level = DeprecationLevel.ERROR,
49 replaceWith = ReplaceWith("publish(context, block)")
50 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
51 @LowPriorityInOverloadResolution
publishnull52 public fun <T> CoroutineScope.publish(
53 context: CoroutineContext = EmptyCoroutineContext,
54 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
55 ): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block)
56
57 /** @suppress For internal use from other reactive integration modules only */
58 @InternalCoroutinesApi
59 public fun <T> publishInternal(
60 scope: CoroutineScope, // support for legacy publish in scope
61 context: CoroutineContext,
62 exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
63 block: suspend ProducerScope<T>.() -> Unit
64 ): Publisher<T> = Publisher { subscriber ->
65 // specification requires NPE on null subscriber
66 if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
67 val newContext = scope.newCoroutineContext(context)
68 val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
69 subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
70 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
71 }
72
73 private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
74 private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
tnull75 private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
76
77 @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
78 @InternalCoroutinesApi
79 public class PublisherCoroutine<in T>(
80 parentContext: CoroutineContext,
81 private val subscriber: Subscriber<T>,
82 private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
83 ) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
84 override val channel: SendChannel<T> get() = this
85
86 // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
87 private val mutex = Mutex(locked = true)
88 private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
89
90 @Volatile
91 private var cancelled = false // true when Subscription.cancel() is invoked
92
93 override val isClosedForSend: Boolean get() = isCompleted
94 override val isFull: Boolean = mutex.isLocked
closenull95 override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
96 override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
97 throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
98
99 override fun offer(element: T): Boolean {
100 if (!mutex.tryLock()) return false
101 doLockedNext(element)
102 return true
103 }
104
sendnull105 public override suspend fun send(element: T) {
106 // fast-path -- try send without suspension
107 if (offer(element)) return
108 // slow-path does suspend
109 return sendSuspend(element)
110 }
111
sendSuspendnull112 private suspend fun sendSuspend(element: T) {
113 mutex.lock()
114 doLockedNext(element)
115 }
116
117 override val onSend: SelectClause2<T, SendChannel<T>>
118 get() = this
119
120 // registerSelectSend
121 @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
registerSelectClause2null122 override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
123 mutex.onLock.registerSelectClause2(select, null) {
124 doLockedNext(element)
125 block(this)
126 }
127 }
128
129 /*
130 * This code is not trivial because of the two properties:
131 * 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
132 * be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
133 * coroutines are invoking `send` function.
134 * 2. Normally, `onComplete/onError` notification is sent only when coroutine and all its children are complete.
135 * However, nothing prevents `publish` coroutine from leaking reference to it send channel to some
136 * globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
137 * lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
138 * `onComplete/onError` is also done under the same mutex.
139 */
140
141 // assert: mutex.isLocked()
doLockedNextnull142 private fun doLockedNext(elem: T) {
143 // check if already closed for send, note that isActive becomes false as soon as cancel() is invoked,
144 // because the job is cancelled, so this check also ensure conformance to the reactive specification's
145 // requirement that after cancellation requested we don't call onXXX
146 if (!isActive) {
147 unlockAndCheckCompleted()
148 throw getCancellationException()
149 }
150 // notify subscriber
151 try {
152 subscriber.onNext(elem)
153 } catch (e: Throwable) {
154 // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
155 // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
156 // this failure is essentially equivalent to a failure of a child coroutine.
157 cancelCoroutine(e)
158 unlockAndCheckCompleted()
159 throw e
160 }
161 // now update nRequested
162 while (true) { // lock-free loop on nRequested
163 val current = _nRequested.value
164 if (current < 0) break // closed from inside onNext => unlock
165 if (current == Long.MAX_VALUE) break // no back-pressure => unlock
166 val updated = current - 1
167 if (_nRequested.compareAndSet(current, updated)) {
168 if (updated == 0L) {
169 // return to keep locked due to back-pressure
170 return
171 }
172 break // unlock if updated > 0
173 }
174 }
175 unlockAndCheckCompleted()
176 }
177
unlockAndCheckCompletednull178 private fun unlockAndCheckCompleted() {
179 /*
180 * There is no sense to check completion before doing `unlock`, because completion might
181 * happen after this check and before `unlock` (see `signalCompleted` that does not do anything
182 * if it fails to acquire the lock that we are still holding).
183 * We have to recheck `isCompleted` after `unlock` anyway.
184 */
185 mutex.unlock()
186 // check isCompleted and and try to regain lock to signal completion
187 if (isCompleted && mutex.tryLock()) {
188 doLockedSignalCompleted(completionCause, completionCauseHandled)
189 }
190 }
191
192 // assert: mutex.isLocked() & isCompleted
doLockedSignalCompletednull193 private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
194 try {
195 if (_nRequested.value >= CLOSED) {
196 _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
197 // Specification requires that after cancellation requested we don't call onXXX
198 if (cancelled) {
199 // If the parent had failed to handle our exception, then we must not lose this exception
200 if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
201 return
202 }
203
204 try {
205 if (cause != null && cause !is CancellationException) {
206 /*
207 * Reactive frameworks have two types of exceptions: regular and fatal.
208 * Regular are passed to onError.
209 * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
210 * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
211 * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
212 * thrown by subscriber or upstream).
213 * To make behaviour consistent and least surprising, we always handle fatal exceptions
214 * by coroutines machinery, anyway, they should not be present in regular program flow,
215 * thus our goal here is just to expose it as soon as possible.
216 */
217 subscriber.onError(cause)
218 if (!handled && cause.isFatal()) {
219 exceptionOnCancelHandler(cause, context)
220 }
221 } else {
222 subscriber.onComplete()
223 }
224 } catch (e: Throwable) {
225 handleCoroutineException(context, e)
226 }
227 }
228 } finally {
229 mutex.unlock()
230 }
231 }
232
requestnull233 override fun request(n: Long) {
234 if (n <= 0) {
235 // Specification requires IAE for n <= 0
236 cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
237 return
238 }
239 while (true) { // lock-free loop for nRequested
240 val cur = _nRequested.value
241 if (cur < 0) return // already closed for send, ignore requests
242 var upd = cur + n
243 if (upd < 0 || n == Long.MAX_VALUE)
244 upd = Long.MAX_VALUE
245 if (cur == upd) return // nothing to do
246 if (_nRequested.compareAndSet(cur, upd)) {
247 // unlock the mutex when we don't have back-pressure anymore
248 if (cur == 0L) {
249 unlockAndCheckCompleted()
250 }
251 return
252 }
253 }
254 }
255
256 // assert: isCompleted
signalCompletednull257 private fun signalCompleted(cause: Throwable?, handled: Boolean) {
258 while (true) { // lock-free loop for nRequested
259 val current = _nRequested.value
260 if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
261 check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
262 if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS
263 // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
264 if (current == 0L) {
265 doLockedSignalCompleted(cause, handled)
266 } else {
267 // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
268 if (mutex.tryLock()) doLockedSignalCompleted(cause, handled)
269 // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
270 }
271 return // done anyway
272 }
273 }
274
onCompletednull275 override fun onCompleted(value: Unit) {
276 signalCompleted(null, false)
277 }
278
onCancellednull279 override fun onCancelled(cause: Throwable, handled: Boolean) {
280 signalCompleted(cause, handled)
281 }
282
cancelnull283 override fun cancel() {
284 // Specification requires that after cancellation publisher stops signalling
285 // This flag distinguishes subscription cancellation request from the job crash
286 cancelled = true
287 super.cancel(null)
288 }
289
Throwablenull290 private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
291 }
292