• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.reactor
2 
3 import kotlin.coroutines.*
4 import kotlinx.coroutines.reactive.*
5 import reactor.util.context.*
6 
7 /**
8  * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
9  * Reactor and kotlinx.coroutines.
10  * [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext],
11  * which can be used to propagate the information about Reactor's [Context] through coroutines.
12  *
13  * This context element is implicitly propagated through subscribers' context by all Reactive integrations,
14  * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
15  * Functions that subscribe to a reactive stream
16  * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext]
17  * to the subscriber's [Context].
18  **
19  * ### Examples of Reactive context integration.
20  *
21  * #### Propagating ReactorContext to Reactor's Context
22  * ```
23  * val flux = myDatabaseService.getUsers()
24  *     .contextWrite { ctx -> println(ctx); ctx }
25  * flux.awaitFirst() // Will print "null"
26  *
27  * // Now add ReactorContext
28  * withContext(Context.of("answer", "42").asCoroutineContext()) {
29  *     flux.awaitFirst() // Will print "Context{'key'='value'}"
30  * }
31  * ```
32  *
33  * #### Propagating subscriber's Context to ReactorContext:
34  * ```
35  * val flow = flow {
36  *     println("Reactor context in Flow: " + currentCoroutineContext()[ReactorContext])
37  * }
38  * // No context
39  * flow.asFlux()
40  *     .subscribe() // Will print 'Reactor context in Flow: null'
41  * // Add subscriber's context
42  * flow.asFlux()
43  *     .contextWrite { ctx -> ctx.put("answer", 42) }
44  *     .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}"
45  * ```
46  */
47 public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
48 
49     // `Context.of` is zero-cost if the argument is a `Context`
50     public constructor(contextView: ContextView): this(Context.of(contextView))
51 
52     public companion object Key : CoroutineContext.Key<ReactorContext>
53 
toStringnull54     override fun toString(): String = context.toString()
55 }
56 
57 /**
58  * Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
59  * and later used via `coroutineContext[ReactorContext]`.
60  */
61 public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)
62 
63 /** @suppress */
64 @Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN)
65 public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost.
66 
67 /**
68  * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
69  */
70 internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
71     (this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()
72