1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx3
6
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import kotlinx.coroutines.reactive.*
13 import org.reactivestreams.*
14 import java.util.concurrent.atomic.*
15 import kotlin.coroutines.*
16
17 /**
18 * Converts this job to the hot reactive completable that signals
19 * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
20 *
21 * Every subscriber gets the signal at the same time.
22 * Unsubscribing from the resulting completable **does not** affect the original job in any way.
23 *
24 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
25 * in the future to account for the concept of structured concurrency.
26 *
27 * @param context -- the coroutine context from which the resulting completable is going to be signalled
28 */
29 public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
30 this@asCompletable.join()
31 }
32
33 /**
34 * Converts this deferred value to the hot reactive maybe that signals
35 * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
36 *
37 * Every subscriber gets the same completion value.
38 * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
39 *
40 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
41 * in the future to account for the concept of structured concurrency.
42 *
43 * @param context -- the coroutine context from which the resulting maybe is going to be signalled
44 */
asMaybenull45 public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T & Any> = rxMaybe(context) {
46 this@asMaybe.await()
47 }
48
49 /**
50 * Converts this deferred value to the hot reactive single that signals either
51 * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
52 *
53 * Every subscriber gets the same completion value.
54 * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
55 *
56 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
57 * in the future to account for the concept of structured concurrency.
58 *
59 * @param context -- the coroutine context from which the resulting single is going to be signalled
60 */
<lambda>null61 public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
62 this@asSingle.await()
63 }
64
65 /**
66 * Transforms given cold [ObservableSource] into cold [Flow].
67 *
68 * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
69 * is applied to the resulting flow.
70 *
71 * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
72 * resulting flow to specify a user-defined value and to control what happens when data is produced faster
73 * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
74 */
<lambda>null75 public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
76 val disposableRef = AtomicReference<Disposable>()
77 val observer = object : Observer<T> {
78 override fun onComplete() { close() }
79 override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
80 override fun onNext(t: T) {
81 /*
82 * Channel was closed by the downstream, so the exception (if any)
83 * also was handled by the same downstream
84 */
85 try {
86 trySendBlocking(t)
87 } catch (e: InterruptedException) {
88 // RxJava interrupts the source
89 }
90 }
91 override fun onError(e: Throwable) { close(e) }
92 }
93
94 subscribe(observer)
95 awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
96 }
97
98 /**
99 * Converts the given flow to a cold observable.
100 * The original flow is cancelled when the observable subscriber is disposed.
101 *
102 * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
103 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
104 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
105 * is used, so calls are performed from an arbitrary thread.
106 */
asObservablenull107 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
108 /*
109 * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
110 * asObservable is already invoked from unconfined
111 */
112 val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
113 try {
114 collect { value -> emitter.onNext(value) }
115 emitter.onComplete()
116 } catch (e: Throwable) {
117 // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
118 if (e !is CancellationException) {
119 if (!emitter.tryOnError(e)) {
120 handleUndeliverableException(e, coroutineContext)
121 }
122 } else {
123 emitter.onComplete()
124 }
125 }
126 }
127 emitter.setCancellable(RxCancellable(job))
128 }
129
130 /**
131 * Converts the given flow to a cold flowable.
132 * The original flow is cancelled when the flowable subscriber is disposed.
133 *
134 * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
135 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
136 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
137 * is used, so calls are performed from an arbitrary thread.
138 */
asFlowablenull139 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
140 Flowable.fromPublisher(asPublisher(context))
141
142 /** @suppress */
143 @Suppress("UNUSED") // KT-42513
144 @JvmOverloads // binary compatibility
145 @JvmName("from")
146 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
147 public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
148 asFlowable(context)
149
150 /** @suppress */
151 @Suppress("UNUSED") // KT-42513
152 @JvmOverloads // binary compatibility
153 @JvmName("from")
154 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
155 public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
156