1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlin.coroutines.* 8 import kotlinx.coroutines.reactive.* 9 import reactor.util.context.* 10 11 /** 12 * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between 13 * Reactor and kotlinx.coroutines. 14 * [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext], 15 * which can be used to propagate the information about Reactor's [Context] through coroutines. 16 * 17 * This context element is implicitly propagated through subscribers' context by all Reactive integrations, 18 * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. 19 * Functions that subscribe to a reactive stream 20 * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext] 21 * to the subscriber's [Context]. 22 ** 23 * ### Examples of Reactive context integration. 24 * 25 * #### Propagating ReactorContext to Reactor's Context 26 * ``` 27 * val flux = myDatabaseService.getUsers() 28 * .contextWrite { ctx -> println(ctx); ctx } 29 * flux.awaitFirst() // Will print "null" 30 * 31 * // Now add ReactorContext 32 * withContext(Context.of("answer", "42").asCoroutineContext()) { 33 * flux.awaitFirst() // Will print "Context{'key'='value'}" 34 * } 35 * ``` 36 * 37 * #### Propagating subscriber's Context to ReactorContext: 38 * ``` 39 * val flow = flow { 40 * println("Reactor context in Flow: " + currentCoroutineContext()[ReactorContext]) 41 * } 42 * // No context 43 * flow.asFlux() 44 * .subscribe() // Will print 'Reactor context in Flow: null' 45 * // Add subscriber's context 46 * flow.asFlux() 47 * .contextWrite { ctx -> ctx.put("answer", 42) } 48 * .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}" 49 * ``` 50 */ 51 public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) { 52 53 // `Context.of` is zero-cost if the argument is a `Context` 54 public constructor(contextView: ContextView): this(Context.of(contextView)) 55 56 public companion object Key : CoroutineContext.Key<ReactorContext> 57 toStringnull58 override fun toString(): String = context.toString() 59 } 60 61 /** 62 * Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context 63 * and later used via `coroutineContext[ReactorContext]`. 64 */ 65 public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this) 66 67 /** @suppress */ 68 @Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN) 69 public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost. 70 71 /** 72 * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values. 73 */ 74 internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext = 75 (this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext() 76