1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import kotlinx.coroutines.reactive.*
13 import org.reactivestreams.*
14 import java.util.concurrent.atomic.*
15 import kotlin.coroutines.*
16 
17 /**
18  * Converts this job to the hot reactive completable that signals
19  * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
20  *
21  * Every subscriber gets the signal at the same time.
22  * Unsubscribing from the resulting completable **does not** affect the original job in any way.
23  *
24  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
25  *    in the future to account for the concept of structured concurrency.
26  *
27  * @param context -- the coroutine context from which the resulting completable is going to be signalled
28  */
29 public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
30     this@asCompletable.join()
31 }
32 
33 /**
34  * Converts this deferred value to the hot reactive maybe that signals
35  * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
36  *
37  * Every subscriber gets the same completion value.
38  * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
39  *
40  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
41  *    in the future to account for the concept of structured concurrency.
42  *
43  * @param context -- the coroutine context from which the resulting maybe is going to be signalled
44  */
asMaybenull45 public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T & Any> = rxMaybe(context) {
46     this@asMaybe.await()
47 }
48 
49 /**
50  * Converts this deferred value to the hot reactive single that signals either
51  * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
52  *
53  * Every subscriber gets the same completion value.
54  * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
55  *
56  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
57  *    in the future to account for the concept of structured concurrency.
58  *
59  * @param context -- the coroutine context from which the resulting single is going to be signalled
60  */
<lambda>null61 public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
62     this@asSingle.await()
63 }
64 
65 /**
66  * Transforms given cold [ObservableSource] into cold [Flow].
67  *
68  * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
69  * is applied to the resulting flow.
70  *
71  * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
72  * resulting flow to specify a user-defined value and to control what happens when data is produced faster
73  * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
74  */
<lambda>null75 public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
76     val disposableRef = AtomicReference<Disposable>()
77     val observer = object : Observer<T> {
78         override fun onComplete() { close() }
79         override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
80         override fun onNext(t: T) {
81             /*
82              * Channel was closed by the downstream, so the exception (if any)
83              * also was handled by the same downstream
84              */
85             try {
86                 trySendBlocking(t)
87             } catch (e: InterruptedException) {
88                 // RxJava interrupts the source
89             }
90         }
91         override fun onError(e: Throwable) { close(e) }
92     }
93 
94     subscribe(observer)
95     awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
96 }
97 
98 /**
99  * Converts the given flow to a cold observable.
100  * The original flow is cancelled when the observable subscriber is disposed.
101  *
102  * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
103  * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
104  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
105  * is used, so calls are performed from an arbitrary thread.
106  */
asObservablenull107 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
108     /*
109      * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
110      * asObservable is already invoked from unconfined
111      */
112     val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
113         try {
114             collect { value -> emitter.onNext(value) }
115             emitter.onComplete()
116         } catch (e: Throwable) {
117             // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
118             if (e !is CancellationException) {
119                 if (!emitter.tryOnError(e)) {
120                     handleUndeliverableException(e, coroutineContext)
121                 }
122             } else {
123                 emitter.onComplete()
124             }
125         }
126     }
127     emitter.setCancellable(RxCancellable(job))
128 }
129 
130 /**
131  * Converts the given flow to a cold flowable.
132  * The original flow is cancelled when the flowable subscriber is disposed.
133  *
134  * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
135  * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
136  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
137  * is used, so calls are performed from an arbitrary thread.
138  */
asFlowablenull139 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
140     Flowable.fromPublisher(asPublisher(context))
141 
142 /** @suppress */
143 @Suppress("UNUSED") // KT-42513
144 @JvmOverloads // binary compatibility
145 @JvmName("from")
146 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
147 public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
148     asFlowable(context)
149 
150 /** @suppress */
151 @Suppress("UNUSED") // KT-42513
152 @JvmOverloads // binary compatibility
153 @JvmName("from")
154 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
155 public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
156