• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlin.coroutines.*
8 import kotlinx.coroutines.reactive.*
9 import reactor.util.context.*
10 
11 /**
12  * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
13  * Reactor and kotlinx.coroutines.
14  * [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext],
15  * which can be used to propagate the information about Reactor's [Context] through coroutines.
16  *
17  * This context element is implicitly propagated through subscribers' context by all Reactive integrations,
18  * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
19  * Functions that subscribe to a reactive stream
20  * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext]
21  * to the subscriber's [Context].
22  **
23  * ### Examples of Reactive context integration.
24  *
25  * #### Propagating ReactorContext to Reactor's Context
26  * ```
27  * val flux = myDatabaseService.getUsers()
28  *     .contextWrite { ctx -> println(ctx); ctx }
29  * flux.awaitFirst() // Will print "null"
30  *
31  * // Now add ReactorContext
32  * withContext(Context.of("answer", "42").asCoroutineContext()) {
33  *    flux.awaitFirst() // Will print "Context{'key'='value'}"
34  * }
35  * ```
36  *
37  * #### Propagating subscriber's Context to ReactorContext:
38  * ```
39  * val flow = flow {
40  *     println("Reactor context in Flow: " + currentCoroutineContext()[ReactorContext])
41  * }
42  * // No context
43  * flow.asFlux()
44  *     .subscribe() // Will print 'Reactor context in Flow: null'
45  * // Add subscriber's context
46  * flow.asFlux()
47  *     .contextWrite { ctx -> ctx.put("answer", 42) }
48  *     .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}"
49  * ```
50  */
51 public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
52 
53     // `Context.of` is zero-cost if the argument is a `Context`
54     public constructor(contextView: ContextView): this(Context.of(contextView))
55 
56     public companion object Key : CoroutineContext.Key<ReactorContext>
57 
toStringnull58     override fun toString(): String = context.toString()
59 }
60 
61 /**
62  * Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
63  * and later used via `coroutineContext[ReactorContext]`.
64  */
65 public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)
66 
67 /** @suppress */
68 @Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN)
69 public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost.
70 
71 /**
72  * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
73  */
74 internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
75     (this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()
76