1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx2
6
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import io.reactivex.plugins.*
10 import kotlinx.atomicfu.*
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.channels.*
13 import java.util.concurrent.*
14 import kotlin.coroutines.*
15
16 /**
17 * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
18 * and provides native support of [delay] and [withTimeout].
19 */
20 public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
21 if (this is DispatcherScheduler) {
22 dispatcher
23 } else {
24 SchedulerCoroutineDispatcher(this)
25 }
26
27 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
28 @JvmName("asCoroutineDispatcher")
asCoroutineDispatcher0null29 public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
30 SchedulerCoroutineDispatcher(this)
31
32 /**
33 * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
34 */
35 public fun CoroutineDispatcher.asScheduler(): Scheduler =
36 if (this is SchedulerCoroutineDispatcher) {
37 scheduler
38 } else {
39 DispatcherScheduler(this)
40 }
41
42 private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
43
44 private val schedulerJob = SupervisorJob()
45
46 /**
47 * The scope for everything happening in this [DispatcherScheduler].
48 *
49 * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
50 */
51 private val scope = CoroutineScope(schedulerJob + dispatcher)
52
53 /**
54 * The counter of created workers, for their pretty-printing.
55 */
56 private val workerCounter = atomic(1L)
57
scheduleDirectnull58 override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
59 scope.scheduleTask(block, unit.toMillis(delay)) { task ->
60 Runnable { scope.launch { task() } }
61 }
62
createWorkernull63 override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
64
65 override fun shutdown() {
66 schedulerJob.cancel()
67 }
68
69 private class DispatcherWorker(
70 private val counter: Long,
71 private val dispatcher: CoroutineDispatcher,
72 parentJob: Job
73 ) : Worker() {
74
75 private val workerJob = SupervisorJob(parentJob)
76 private val workerScope = CoroutineScope(workerJob + dispatcher)
77 private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
78
79 init {
<lambda>null80 workerScope.launch {
81 blockChannel.consumeEach {
82 it()
83 }
84 }
85 }
86
schedulenull87 override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
88 workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
89 Runnable { blockChannel.trySend(task) }
90 }
91
isDisposednull92 override fun isDisposed(): Boolean = !workerScope.isActive
93
94 override fun dispose() {
95 blockChannel.close()
96 workerJob.cancel()
97 }
98
toStringnull99 override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
100 }
101
102 override fun toString(): String = dispatcher.toString()
103 }
104
105 private typealias Task = suspend () -> Unit
106
107 /**
108 * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
109 * milliseconds.
110 */
111 private fun CoroutineScope.scheduleTask(
112 block: Runnable,
113 delayMillis: Long,
114 adaptForScheduling: (Task) -> Runnable
115 ): Disposable {
116 val ctx = coroutineContext
117 var handle: DisposableHandle? = null
118 val disposable = Disposables.fromRunnable {
119 // null if delay <= 0
120 handle?.dispose()
121 }
122 val decoratedBlock = RxJavaPlugins.onSchedule(block)
123 suspend fun task() {
124 if (disposable.isDisposed) return
125 try {
126 runInterruptible {
127 decoratedBlock.run()
128 }
129 } catch (e: Throwable) {
130 handleUndeliverableException(e, ctx)
131 }
132 }
133
134 val toSchedule = adaptForScheduling(::task)
135 if (!isActive) return Disposables.disposed()
136 if (delayMillis <= 0) {
137 toSchedule.run()
138 } else {
139 @Suppress("INVISIBLE_MEMBER")
140 ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
141 }
142 return disposable
143 }
144
145 /**
146 * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
147 */
148 public class SchedulerCoroutineDispatcher(
149 /**
150 * Underlying scheduler of current [CoroutineDispatcher].
151 */
152 public val scheduler: Scheduler
153 ) : CoroutineDispatcher(), Delay {
154 /** @suppress */
dispatchnull155 override fun dispatch(context: CoroutineContext, block: Runnable) {
156 scheduler.scheduleDirect(block)
157 }
158
159 /** @suppress */
scheduleResumeAfterDelaynull160 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
161 val disposable = scheduler.scheduleDirect({
162 with(continuation) { resumeUndispatched(Unit) }
163 }, timeMillis, TimeUnit.MILLISECONDS)
164 continuation.disposeOnCancellation(disposable)
165 }
166
167 /** @suppress */
invokeOnTimeoutnull168 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
169 val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
170 return DisposableHandle { disposable.dispose() }
171 }
172
173 /** @suppress */
toStringnull174 override fun toString(): String = scheduler.toString()
175
176 /** @suppress */
177 override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
178
179 /** @suppress */
180 override fun hashCode(): Int = System.identityHashCode(scheduler)
181 }