<lambda>null1 package kotlinx.coroutines.rx2
2
3 import io.reactivex.*
4 import io.reactivex.disposables.*
5 import io.reactivex.plugins.*
6 import kotlinx.atomicfu.*
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import java.util.concurrent.*
10 import kotlin.coroutines.*
11
12 /**
13 * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
14 * and provides native support of [delay] and [withTimeout].
15 */
16 public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
17 if (this is DispatcherScheduler) {
18 dispatcher
19 } else {
20 SchedulerCoroutineDispatcher(this)
21 }
22
23 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
24 @JvmName("asCoroutineDispatcher")
asCoroutineDispatcher0null25 public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
26 SchedulerCoroutineDispatcher(this)
27
28 /**
29 * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
30 */
31 public fun CoroutineDispatcher.asScheduler(): Scheduler =
32 if (this is SchedulerCoroutineDispatcher) {
33 scheduler
34 } else {
35 DispatcherScheduler(this)
36 }
37
38 private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
39
40 private val schedulerJob = SupervisorJob()
41
42 /**
43 * The scope for everything happening in this [DispatcherScheduler].
44 *
45 * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
46 */
47 private val scope = CoroutineScope(schedulerJob + dispatcher)
48
49 /**
50 * The counter of created workers, for their pretty-printing.
51 */
52 private val workerCounter = atomic(1L)
53
scheduleDirectnull54 override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
55 scope.scheduleTask(block, unit.toMillis(delay)) { task ->
56 Runnable { scope.launch { task() } }
57 }
58
createWorkernull59 override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
60
61 override fun shutdown() {
62 schedulerJob.cancel()
63 }
64
65 private class DispatcherWorker(
66 private val counter: Long,
67 private val dispatcher: CoroutineDispatcher,
68 parentJob: Job
69 ) : Worker() {
70
71 private val workerJob = SupervisorJob(parentJob)
72 private val workerScope = CoroutineScope(workerJob + dispatcher)
73 private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
74
75 init {
<lambda>null76 workerScope.launch {
77 blockChannel.consumeEach {
78 it()
79 }
80 }
81 }
82
schedulenull83 override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
84 workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
85 Runnable { blockChannel.trySend(task) }
86 }
87
isDisposednull88 override fun isDisposed(): Boolean = !workerScope.isActive
89
90 override fun dispose() {
91 blockChannel.close()
92 workerJob.cancel()
93 }
94
toStringnull95 override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
96 }
97
98 override fun toString(): String = dispatcher.toString()
99 }
100
101 private typealias Task = suspend () -> Unit
102
103 /**
104 * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
105 * milliseconds.
106 */
107 private fun CoroutineScope.scheduleTask(
108 block: Runnable,
109 delayMillis: Long,
110 adaptForScheduling: (Task) -> Runnable
111 ): Disposable {
112 val ctx = coroutineContext
113 var handle: DisposableHandle? = null
114 val disposable = Disposables.fromRunnable {
115 // null if delay <= 0
116 handle?.dispose()
117 }
118 val decoratedBlock = RxJavaPlugins.onSchedule(block)
119 suspend fun task() {
120 if (disposable.isDisposed) return
121 try {
122 runInterruptible {
123 decoratedBlock.run()
124 }
125 } catch (e: Throwable) {
126 handleUndeliverableException(e, ctx)
127 }
128 }
129
130 val toSchedule = adaptForScheduling(::task)
131 if (!isActive) return Disposables.disposed()
132 if (delayMillis <= 0) {
133 toSchedule.run()
134 } else {
135 @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
136 ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
137 }
138 return disposable
139 }
140
141 /**
142 * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
143 */
144 public class SchedulerCoroutineDispatcher(
145 /**
146 * Underlying scheduler of current [CoroutineDispatcher].
147 */
148 public val scheduler: Scheduler
149 ) : CoroutineDispatcher(), Delay {
150 /** @suppress */
dispatchnull151 override fun dispatch(context: CoroutineContext, block: Runnable) {
152 scheduler.scheduleDirect(block)
153 }
154
155 /** @suppress */
scheduleResumeAfterDelaynull156 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
157 val disposable = scheduler.scheduleDirect({
158 with(continuation) { resumeUndispatched(Unit) }
159 }, timeMillis, TimeUnit.MILLISECONDS)
160 continuation.disposeOnCancellation(disposable)
161 }
162
163 /** @suppress */
invokeOnTimeoutnull164 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
165 val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
166 return DisposableHandle { disposable.dispose() }
167 }
168
169 /** @suppress */
toStringnull170 override fun toString(): String = scheduler.toString()
171
172 /** @suppress */
173 override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
174
175 /** @suppress */
176 override fun hashCode(): Int = System.identityHashCode(scheduler)
177 }
178