1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx2
6
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import kotlinx.coroutines.reactive.*
13 import org.reactivestreams.*
14 import java.util.concurrent.atomic.*
15 import kotlin.coroutines.*
16
17 /**
18 * Converts this job to the hot reactive completable that signals
19 * with [onCompleted][CompletableObserver.onComplete] when the corresponding job completes.
20 *
21 * Every subscriber gets the signal at the same time.
22 * Unsubscribing from the resulting completable **does not** affect the original job in any way.
23 *
24 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
25 * in the future to account for the concept of structured concurrency.
26 *
27 * @param context -- the coroutine context from which the resulting completable is going to be signalled
28 */
29 @ExperimentalCoroutinesApi
30 public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
31 this@asCompletable.join()
32 }
33
34 /**
35 * Converts this deferred value to the hot reactive maybe that signals
36 * [onComplete][MaybeEmitter.onComplete], [onSuccess][MaybeEmitter.onSuccess] or [onError][MaybeEmitter.onError].
37 *
38 * Every subscriber gets the same completion value.
39 * Unsubscribing from the resulting maybe **does not** affect the original deferred value in any way.
40 *
41 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
42 * in the future to account for the concept of structured concurrency.
43 *
44 * @param context -- the coroutine context from which the resulting maybe is going to be signalled
45 */
46 @ExperimentalCoroutinesApi
<lambda>null47 public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
48 this@asMaybe.await()
49 }
50
51 /**
52 * Converts this deferred value to the hot reactive single that signals either
53 * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError].
54 *
55 * Every subscriber gets the same completion value.
56 * Unsubscribing from the resulting single **does not** affect the original deferred value in any way.
57 *
58 * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change
59 * in the future to account for the concept of structured concurrency.
60 *
61 * @param context -- the coroutine context from which the resulting single is going to be signalled
62 */
63 @ExperimentalCoroutinesApi
<lambda>null64 public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
65 this@asSingle.await()
66 }
67
68 /**
69 * Transforms given cold [ObservableSource] into cold [Flow].
70 *
71 * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator
72 * is applied to the resulting flow.
73 *
74 * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
75 * resulting flow to specify a user-defined value and to control what happens when data is produced faster
76 * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
77 */
78 @ExperimentalCoroutinesApi
<lambda>null79 public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
80 val disposableRef = AtomicReference<Disposable>()
81 val observer = object : Observer<T> {
82 override fun onComplete() { close() }
83 override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
84 override fun onNext(t: T) {
85 try {
86 sendBlocking(t)
87 } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
88 // Is handled by the downstream flow
89 }
90 }
91 override fun onError(e: Throwable) { close(e) }
92 }
93
94 subscribe(observer)
95 awaitClose { disposableRef.getAndSet(Disposables.disposed())?.dispose() }
96 }
97
98 /**
99 * Converts the given flow to a cold observable.
100 * The original flow is cancelled when the observable subscriber is disposed.
101 *
102 * An optional [context] can be specified to control the execution context of calls to [Observer] methods.
103 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
104 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
105 * is used, so calls are performed from an arbitrary thread.
106 */
107 @ExperimentalCoroutinesApi
asObservablenull108 public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
109 /*
110 * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
111 * asObservable is already invoked from unconfined
112 */
113 val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) {
114 try {
115 collect { value -> emitter.onNext(value) }
116 emitter.onComplete()
117 } catch (e: Throwable) {
118 // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
119 if (e !is CancellationException) {
120 if (!emitter.tryOnError(e)) {
121 handleUndeliverableException(e, coroutineContext)
122 }
123 } else {
124 emitter.onComplete()
125 }
126 }
127 }
128 emitter.setCancellable(RxCancellable(job))
129 }
130
131 /**
132 * Converts the given flow to a cold flowable.
133 * The original flow is cancelled when the flowable subscriber is disposed.
134 *
135 * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
136 * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
137 * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
138 * is used, so calls are performed from an arbitrary thread.
139 */
140 @ExperimentalCoroutinesApi
asFlowablenull141 public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
142 Flowable.fromPublisher(asPublisher(context))
143
144 @Deprecated(
145 message = "Deprecated in the favour of Flow",
146 level = DeprecationLevel.ERROR,
147 replaceWith = ReplaceWith("this.consumeAsFlow().asObservable(context)", "kotlinx.coroutines.flow.consumeAsFlow")
148 ) // Deprecated since 1.4.0
149 public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): Observable<T> = rxObservable(context) {
150 for (t in this@asObservable)
151 send(t)
152 }
153
154 @Suppress("UNUSED") // KT-42513
155 @JvmOverloads // binary compatibility
156 @JvmName("from")
157 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
_asFlowablenull158 public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
159 asFlowable(context)
160
161 @Suppress("UNUSED") // KT-42513
162 @JvmOverloads // binary compatibility
163 @JvmName("from")
164 @Deprecated(level = DeprecationLevel.HIDDEN, message = "") // Since 1.4, was experimental prior to that
165 public fun <T: Any> Flow<T>._asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = asObservable(context)
166