<lambda>null1 package kotlinx.coroutines.reactive
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.selects.*
7 import kotlinx.coroutines.sync.*
8 import org.reactivestreams.*
9 import kotlin.coroutines.*
10
11 /**
12 * Creates a cold reactive [Publisher] that runs a given [block] in a coroutine.
13 *
14 * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
15 * The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send],
16 * completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
17 * errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
18 * Unsubscribing cancels the running coroutine.
19 *
20 * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
21 * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
22 *
23 * Coroutine context can be specified with [context] argument.
24 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
25 * used.
26 *
27 * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
28 * to cancellation and error handling may change in the future.
29 *
30 * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
31 */
32 public fun <T> publish(
33 context: CoroutineContext = EmptyCoroutineContext,
34 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
35 ): Publisher<T> {
36 require(context[Job] === null) { "Publisher context cannot contain job in it." +
37 "Its lifecycle should be managed via subscription. Had $context" }
38 return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
39 }
40
41 /** @suppress For internal use from other reactive integration modules only */
42 @InternalCoroutinesApi
publishInternalnull43 public fun <T> publishInternal(
44 scope: CoroutineScope, // support for legacy publish in scope
45 context: CoroutineContext,
46 exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
47 block: suspend ProducerScope<T>.() -> Unit
48 ): Publisher<T> = Publisher { subscriber ->
49 // specification requires NPE on null subscriber
50 if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
51 val newContext = scope.newCoroutineContext(context)
52 val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
53 subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
54 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
55 }
56
57 private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
58 private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
tnull59 private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
60
61 /** @suppress */
62 @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
63 @InternalCoroutinesApi
64 public class PublisherCoroutine<in T>(
65 parentContext: CoroutineContext,
66 private val subscriber: Subscriber<T>,
67 private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
68 ) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, Subscription {
69 override val channel: SendChannel<T> get() = this
70
71 private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
72
73 @Volatile
74 private var cancelled = false // true after Subscription.cancel() is invoked
75
76 override val isClosedForSend: Boolean get() = !isActive
closenull77 override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
78 override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
79 throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
80
81 // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
82 private val mutex: Mutex = Mutex(locked = true)
83
84 @Suppress("UNCHECKED_CAST", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
85 override val onSend: SelectClause2<T, SendChannel<T>> get() = SelectClause2Impl(
86 clauseObject = this,
87 regFunc = PublisherCoroutine<*>::registerSelectForSend as RegistrationFunction,
88 processResFunc = PublisherCoroutine<*>::processResultSelectSend as ProcessResultFunction
89 )
90
91 @Suppress("UNCHECKED_CAST", "UNUSED_PARAMETER")
92 private fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
93 // Try to acquire the mutex and complete in the registration phase.
94 if (mutex.tryLock()) {
95 select.selectInRegistrationPhase(Unit)
96 return
97 }
98 // Start a new coroutine that waits for the mutex, invoking `trySelect(..)` after that.
99 // Please note that at the point of the `trySelect(..)` invocation the corresponding
100 // `select` can still be in the registration phase, making this `trySelect(..)` bound to fail.
101 // In this case, the `onSend` clause will be re-registered, which alongside with the mutex
102 // manipulation makes the resulting solution obstruction-free.
103 launch {
104 mutex.lock()
105 if (!select.trySelect(this@PublisherCoroutine, Unit)) {
106 mutex.unlock()
107 }
108 }
109 }
110
111 @Suppress("RedundantNullableReturnType", "UNUSED_PARAMETER", "UNCHECKED_CAST")
processResultSelectSendnull112 private fun processResultSelectSend(element: Any?, selectResult: Any?): Any? {
113 doLockedNext(element as T)?.let { throw it }
114 return this@PublisherCoroutine
115 }
116
trySendnull117 override fun trySend(element: T): ChannelResult<Unit> =
118 if (!mutex.tryLock()) {
119 ChannelResult.failure()
120 } else {
throwablenull121 when (val throwable = doLockedNext(element)) {
122 null -> ChannelResult.success(Unit)
123 else -> ChannelResult.closed(throwable)
124 }
125 }
126
sendnull127 public override suspend fun send(element: T) {
128 mutex.lock()
129 doLockedNext(element)?.let { throw it }
130 }
131
132 /*
133 * This code is not trivial because of the following properties:
134 * 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
135 * be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
136 * coroutines are invoking `send` function.
137 * 2. Normally, `onComplete/onError` notification is sent only when coroutine and all its children are complete.
138 * However, nothing prevents `publish` coroutine from leaking reference to it send channel to some
139 * globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
140 * lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
141 * `onComplete/onError` is also done under the same mutex.
142 * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the
143 * subscriber actually requests some elements. This is implemented by the mutex being locked when emitting
144 * elements is not permitted (`_nRequested.value == 0`).
145 */
146
147 /**
148 * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex.
149 *
150 * Requires that the caller has locked the mutex before this invocation.
151 *
152 * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success.
153 *
154 * @throws NullPointerException if the passed element is `null`
155 */
doLockedNextnull156 private fun doLockedNext(elem: T): Throwable? {
157 if (elem == null) {
158 unlockAndCheckCompleted()
159 throw NullPointerException("Attempted to emit `null` inside a reactive publisher")
160 }
161 /** This guards against the case when the caller of this function managed to lock the mutex not because some
162 * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed.
163 *
164 * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's
165 * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually*
166 * stop signalling the subscriber. */
167 if (!isActive) {
168 unlockAndCheckCompleted()
169 return getCancellationException()
170 }
171 // notify the subscriber
172 try {
173 subscriber.onNext(elem)
174 } catch (cause: Throwable) {
175 /** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the
176 * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler
177 * for uncaught exceptions and consider the subscription cancelled, as mandated by
178 * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13.
179 *
180 * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the
181 * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or
182 * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is
183 * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as
184 * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately
185 * would violate the contracts we place on them. */
186 cancelled = true
187 val causeDelivered = close(cause)
188 unlockAndCheckCompleted()
189 return if (causeDelivered) {
190 // `cause` is the reason this channel is closed
191 cause
192 } else {
193 // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
194 exceptionOnCancelHandler(cause, context)
195 getCancellationException()
196 }
197 }
198 // now update nRequested
199 while (true) { // lock-free loop on nRequested
200 val current = _nRequested.value
201 if (current < 0) break // closed from inside onNext => unlock
202 if (current == Long.MAX_VALUE) break // no back-pressure => unlock
203 val updated = current - 1
204 if (_nRequested.compareAndSet(current, updated)) {
205 if (updated == 0L) {
206 // return to keep locked due to back-pressure
207 return null
208 }
209 break // unlock if updated > 0
210 }
211 }
212 unlockAndCheckCompleted()
213 return null
214 }
215
unlockAndCheckCompletednull216 private fun unlockAndCheckCompleted() {
217 /*
218 * There is no sense to check completion before doing `unlock`, because completion might
219 * happen after this check and before `unlock` (see `signalCompleted` that does not do anything
220 * if it fails to acquire the lock that we are still holding).
221 * We have to recheck `isCompleted` after `unlock` anyway.
222 */
223 mutex.unlock()
224 // check isCompleted and try to regain lock to signal completion
225 if (isCompleted && mutex.tryLock()) {
226 doLockedSignalCompleted(completionCause, completionCauseHandled)
227 }
228 }
229
230 // assert: mutex.isLocked() & isCompleted
doLockedSignalCompletednull231 private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
232 try {
233 if (_nRequested.value == SIGNALLED)
234 return
235 _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
236 // Specification requires that after the cancellation is requested we eventually stop calling onXXX
237 if (cancelled) {
238 // If the parent failed to handle this exception, then we must not lose the exception
239 if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
240 return
241 }
242 if (cause == null) {
243 try {
244 subscriber.onComplete()
245 } catch (e: Throwable) {
246 handleCoroutineException(context, e)
247 }
248 } else {
249 try {
250 // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`.
251 subscriber.onError(cause)
252 } catch (e: Throwable) {
253 if (e !== cause) {
254 cause.addSuppressed(e)
255 }
256 handleCoroutineException(context, cause)
257 }
258 }
259 } finally {
260 mutex.unlock()
261 }
262 }
263
requestnull264 override fun request(n: Long) {
265 if (n <= 0) {
266 // Specification requires to call onError with IAE for n <= 0
267 cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
268 return
269 }
270 while (true) { // lock-free loop for nRequested
271 val cur = _nRequested.value
272 if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
273 var upd = cur + n
274 if (upd < 0 || n == Long.MAX_VALUE)
275 upd = Long.MAX_VALUE
276 if (cur == upd) return // nothing to do
277 if (_nRequested.compareAndSet(cur, upd)) {
278 // unlock the mutex when we don't have back-pressure anymore
279 if (cur == 0L) {
280 /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
281 * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
282 * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
283 * impossible for any other operations to assume that they own the lock without actually acquiring
284 * it. */
285 unlockAndCheckCompleted()
286 }
287 return
288 }
289 }
290 }
291
292 // assert: isCompleted
signalCompletednull293 private fun signalCompleted(cause: Throwable?, handled: Boolean) {
294 while (true) { // lock-free loop for nRequested
295 val current = _nRequested.value
296 if (current == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion
297 check(current >= 0) // no other thread could have marked it as CLOSED, because onCompleted[Exceptionally] is invoked once
298 if (!_nRequested.compareAndSet(current, CLOSED)) continue // retry on failed CAS
299 // Ok -- marked as CLOSED, now can unlock the mutex if it was locked due to backpressure
300 if (current == 0L) {
301 doLockedSignalCompleted(cause, handled)
302 } else {
303 // otherwise mutex was either not locked or locked in concurrent onNext... try lock it to signal completion
304 if (mutex.tryLock()) doLockedSignalCompleted(cause, handled)
305 // Note: if failed `tryLock`, then `doLockedNext` will signal after performing `unlock`
306 }
307 return // done anyway
308 }
309 }
310
onCompletednull311 override fun onCompleted(value: Unit) {
312 signalCompleted(null, false)
313 }
314
onCancellednull315 override fun onCancelled(cause: Throwable, handled: Boolean) {
316 signalCompleted(cause, handled)
317 }
318
cancelnull319 override fun cancel() {
320 // Specification requires that after cancellation publisher stops signalling
321 // This flag distinguishes subscription cancellation request from the job crash
322 cancelled = true
323 super.cancel(null)
324 }
325 }
326
327 @Deprecated(
328 message = "CoroutineScope.publish is deprecated in favour of top-level publish",
329 level = DeprecationLevel.HIDDEN,
330 replaceWith = ReplaceWith("publish(context, block)")
331 ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
publishnull332 public fun <T> CoroutineScope.publish(
333 context: CoroutineContext = EmptyCoroutineContext,
334 @BuilderInference block: suspend ProducerScope<T>.() -> Unit
335 ): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER, block)
336