• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import io.reactivex.rxjava3.core.*
4 import io.reactivex.rxjava3.disposables.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.flow.*
8 import kotlinx.coroutines.reactive.*
9 import org.reactivestreams.*
10 import java.util.concurrent.atomic.*
11 import kotlin.coroutines.*
12 
13 /**
14  * Converts this job to the hot reactive completable that signals
15  * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
16  *
17  * Every subscriber gets the signal at the same time.
18  * Unsubscribing from the resulting completable **does not** affect the original job in any way.
19  *
20  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
21  *    in the future to account for the concept of structured concurrency.
22  *
23  * @param context -- the coroutine context from which the resulting completable is going to be signalled
24  */
25 public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
26     this@asCompletable.join()
27 }
28 
29 /**
30  * Converts this deferred value to the hot reactive maybe that signals
31  * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
32  *
33  * Every subscriber gets the same completion value.
34  * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
35  *
36  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
37  *    in the future to account for the concept of structured concurrency.
38  *
39  * @param context -- the coroutine context from which the resulting maybe is going to be signalled
40  */
asMaybenull41 public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T & Any> = rxMaybe(context) {
42     this@asMaybe.await()
43 }
44 
45 /**
46  * Converts this deferred value to the hot reactive single that signals either
47  * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
48  *
49  * Every subscriber gets the same completion value.
50  * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
51  *
52  * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
53  *    in the future to account for the concept of structured concurrency.
54  *
55  * @param context -- the coroutine context from which the resulting single is going to be signalled
56  */
<lambda>null57 public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
58     this@asSingle.await()
59 }
60 
61 /**
62  * Transforms given cold [ObservableSource] into cold [Flow].
63  *
64  * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
65  * is applied to the resulting flow.
66  *
67  * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
68  * resulting flow to specify a user-defined value and to control what happens when data is produced faster
69  * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
70  */
<lambda>null71 public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
72     val disposableRef = AtomicReference<Disposable>()
73     val observer = object : Observer<T> {
74         override fun onComplete() { close() }
75         override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
76         override fun onNext(t: T) {
77             /*
78              * Channel was closed by the downstream, so the exception (if any)
79              * also was handled by the same downstream
80              */
81             try {
82                 trySendBlocking(t)
83             } catch (e: InterruptedException) {
84                 // RxJava interrupts the source
85             }
86         }
87         override fun onError(e: Throwable) { close(e) }
88     }
89 
90     subscribe(observer)
91     awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
92 }
93 
94 /**
95  * Converts the given flow to a cold observable.
96  * The original flow is cancelled when the observable subscriber is disposed.
97  *
98  * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
99  * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
100  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
101  * is used, so calls are performed from an arbitrary thread.
102  */
asObservablenull103 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
104     /*
105      * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
106      * asObservable is already invoked from unconfined
107      */
108     val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
109         try {
110             collect { value -> emitter.onNext(value) }
111             emitter.onComplete()
112         } catch (e: Throwable) {
113             // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
114             if (e !is CancellationException) {
115                 if (!emitter.tryOnError(e)) {
116                     handleUndeliverableException(e, coroutineContext)
117                 }
118             } else {
119                 emitter.onComplete()
120             }
121         }
122     }
123     emitter.setCancellable(RxCancellable(job))
124 }
125 
126 /**
127  * Converts the given flow to a cold flowable.
128  * The original flow is cancelled when the flowable subscriber is disposed.
129  *
130  * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
131  * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
132  * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
133  * is used, so calls are performed from an arbitrary thread.
134  */
asFlowablenull135 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
136     Flowable.fromPublisher(asPublisher(context))
137 
138 /** @suppress */
139 @Suppress("UNUSED") // KT-42513
140 @JvmOverloads // binary compatibility
141 @JvmName("from")
142 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
143 public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
144     asFlowable(context)
145 
146 /** @suppress */
147 @Suppress("UNUSED") // KT-42513
148 @JvmOverloads // binary compatibility
149 @JvmName("from")
150 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
151 public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
152