<lambda>null1 package kotlinx.coroutines.rx3
2
3 import io.reactivex.rxjava3.core.*
4 import io.reactivex.rxjava3.disposables.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.reactive.*
9 import org.reactivestreams.*
10 import java.util.concurrent.atomic.*
11 import kotlin.coroutines.*
12
13 /**
14 * Converts this job to the hot reactive completable that signals
15 * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
16 *
17 * Every subscriber gets the signal at the same time.
18 * Unsubscribing from the resulting completable **does not** affect the original job in any way.
19 *
20 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
21 * in the future to account for the concept of structured concurrency.
22 *
23 * @param context -- the coroutine context from which the resulting completable is going to be signalled
24 */
25 public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
26 this@asCompletable.join()
27 }
28
29 /**
30 * Converts this deferred value to the hot reactive maybe that signals
31 * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
32 *
33 * Every subscriber gets the same completion value.
34 * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
35 *
36 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
37 * in the future to account for the concept of structured concurrency.
38 *
39 * @param context -- the coroutine context from which the resulting maybe is going to be signalled
40 */
asMaybenull41 public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T & Any> = rxMaybe(context) {
42 this@asMaybe.await()
43 }
44
45 /**
46 * Converts this deferred value to the hot reactive single that signals either
47 * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
48 *
49 * Every subscriber gets the same completion value.
50 * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
51 *
52 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
53 * in the future to account for the concept of structured concurrency.
54 *
55 * @param context -- the coroutine context from which the resulting single is going to be signalled
56 */
<lambda>null57 public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
58 this@asSingle.await()
59 }
60
61 /**
62 * Transforms given cold [ObservableSource] into cold [Flow].
63 *
64 * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
65 * is applied to the resulting flow.
66 *
67 * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
68 * resulting flow to specify a user-defined value and to control what happens when data is produced faster
69 * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
70 */
<lambda>null71 public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
72 val disposableRef = AtomicReference<Disposable>()
73 val observer = object : Observer<T> {
74 override fun onComplete() { close() }
75 override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
76 override fun onNext(t: T) {
77 /*
78 * Channel was closed by the downstream, so the exception (if any)
79 * also was handled by the same downstream
80 */
81 try {
82 trySendBlocking(t)
83 } catch (e: InterruptedException) {
84 // RxJava interrupts the source
85 }
86 }
87 override fun onError(e: Throwable) { close(e) }
88 }
89
90 subscribe(observer)
91 awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
92 }
93
94 /**
95 * Converts the given flow to a cold observable.
96 * The original flow is cancelled when the observable subscriber is disposed.
97 *
98 * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
99 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
100 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
101 * is used, so calls are performed from an arbitrary thread.
102 */
asObservablenull103 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
104 /*
105 * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
106 * asObservable is already invoked from unconfined
107 */
108 val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
109 try {
110 collect { value -> emitter.onNext(value) }
111 emitter.onComplete()
112 } catch (e: Throwable) {
113 // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
114 if (e !is CancellationException) {
115 if (!emitter.tryOnError(e)) {
116 handleUndeliverableException(e, coroutineContext)
117 }
118 } else {
119 emitter.onComplete()
120 }
121 }
122 }
123 emitter.setCancellable(RxCancellable(job))
124 }
125
126 /**
127 * Converts the given flow to a cold flowable.
128 * The original flow is cancelled when the flowable subscriber is disposed.
129 *
130 * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
131 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
132 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
133 * is used, so calls are performed from an arbitrary thread.
134 */
asFlowablenull135 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
136 Flowable.fromPublisher(asPublisher(context))
137
138 /** @suppress */
139 @Suppress("UNUSED") // KT-42513
140 @JvmOverloads // binary compatibility
141 @JvmName("from")
142 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
143 public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
144 asFlowable(context)
145
146 /** @suppress */
147 @Suppress("UNUSED") // KT-42513
148 @JvmOverloads // binary compatibility
149 @JvmName("from")
150 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
151 public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
152