• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import kotlinx.coroutines.reactive.*
13 import org.reactivestreams.*
14 import java.util.concurrent.atomic.*
15 import kotlin.coroutines.*
16 
17 /**
18  * Converts this job to the hot reactive completable that signals
19  * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
20  *
21  * Every subscriber gets the signal at the same time.
22  * Unsubscribing from the resulting completable **does not** affect the original job in any way.
23  *
24  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
25  *    in the future to account for the concept of structured concurrency.
26  *
27  * @param context -- the coroutine context from which the resulting completable is going to be signalled
28  */
29 @ExperimentalCoroutinesApi
30 public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
31     this@asCompletable.join()
32 }
33 
34 /**
35  * Converts this deferred value to the hot reactive maybe that signals
36  * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
37  *
38  * Every subscriber gets the same completion value.
39  * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
40  *
41  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
42  *    in the future to account for the concept of structured concurrency.
43  *
44  * @param context -- the coroutine context from which the resulting maybe is going to be signalled
45  */
46 @ExperimentalCoroutinesApi
<lambda>null47 public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
48     this@asMaybe.await()
49 }
50 
51 /**
52  * Converts this deferred value to the hot reactive single that signals either
53  * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
54  *
55  * Every subscriber gets the same completion value.
56  * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
57  *
58  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
59  *    in the future to account for the concept of structured concurrency.
60  *
61  * @param context -- the coroutine context from which the resulting single is going to be signalled
62  */
63 @ExperimentalCoroutinesApi
<lambda>null64 public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
65     this@asSingle.await()
66 }
67 
68 /**
69  * Transforms given cold [ObservableSource] into cold [Flow].
70  *
71  * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
72  * is applied to the resulting flow.
73  *
74  * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
75  * resulting flow to specify a user-defined value and to control what happens when data is produced faster
76  * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
77  */
78 @ExperimentalCoroutinesApi
<lambda>null79 public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
80     val disposableRef = AtomicReference<Disposable>()
81     val observer = object : Observer<T> {
82         override fun onComplete() { close() }
83         override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
84         override fun onNext(t: T) {
85             try {
86                 sendBlocking(t)
87             } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88                 // Is handled by the downstream flow
89             }
90         }
91         override fun onError(e: Throwable) { close(e) }
92     }
93 
94     subscribe(observer)
95     awaitClose { disposableRef.getAndSet(Disposables.disposed())?.dispose() }
96 }
97 
98 /**
99  * Converts the given flow to a cold observable.
100  * The original flow is cancelled when the observable subscriber is disposed.
101  *
102  * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
103  * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
104  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
105  * is used, so calls are performed from an arbitrary thread.
106  */
107 @ExperimentalCoroutinesApi
asObservablenull108 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
109     /*
110      * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
111      * asObservable is already invoked from unconfined
112      */
113     val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
114         try {
115             collect { value -> emitter.onNext(value) }
116             emitter.onComplete()
117         } catch (e: Throwable) {
118             // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
119             if (e !is CancellationException) {
120                 if (!emitter.tryOnError(e)) {
121                     handleUndeliverableException(e, coroutineContext)
122                 }
123             } else {
124                 emitter.onComplete()
125             }
126         }
127     }
128     emitter.setCancellable(RxCancellable(job))
129 }
130 
131 /**
132  * Converts the given flow to a cold flowable.
133  * The original flow is cancelled when the flowable subscriber is disposed.
134  *
135  * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
136  * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
137  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
138  * is used, so calls are performed from an arbitrary thread.
139  */
140 @ExperimentalCoroutinesApi
asFlowablenull141 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
142     Flowable.fromPublisher(asPublisher(context))
143 
144 @Deprecated(
145     message = "Deprecated in the favour of Flow",
146     level = DeprecationLevel.ERROR,
147     replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
148 ) // Deprecated since 1.4.0
149 public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
150     for (t in this@asObservable)
151         send(t)
152 }
153 
154 @Suppress("UNUSED") // KT-42513
155 @JvmOverloads // binary compatibility
156 @JvmName("from")
157 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
_asFlowablenull158 public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
159     asFlowable(context)
160 
161 @Suppress("UNUSED") // KT-42513
162 @JvmOverloads // binary compatibility
163 @JvmName("from")
164 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
165 public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
166