1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.rx3
6
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.*
9 import io.reactivex.rxjava3.plugins.*
10 import kotlinx.atomicfu.*
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.CancellationException
13 import kotlinx.coroutines.channels.*
14 import java.util.concurrent.*
15 import kotlin.coroutines.*
16
17 /**
18 * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
19 * and provides native support of [delay] and [withTimeout].
20 */
21 public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
22 if (this is DispatcherScheduler) {
23 dispatcher
24 } else {
25 SchedulerCoroutineDispatcher(this)
26 }
27
28 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
29 @JvmName("asCoroutineDispatcher")
asCoroutineDispatcher0null30 public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
31 SchedulerCoroutineDispatcher(this)
32
33 /**
34 * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
35 */
36 public fun CoroutineDispatcher.asScheduler(): Scheduler =
37 if (this is SchedulerCoroutineDispatcher) {
38 scheduler
39 } else {
40 DispatcherScheduler(this)
41 }
42
43 private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
44
45 private val schedulerJob = SupervisorJob()
46
47 /**
48 * The scope for everything happening in this [DispatcherScheduler].
49 *
50 * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
51 */
52 private val scope = CoroutineScope(schedulerJob + dispatcher)
53
54 /**
55 * The counter of created workers, for their pretty-printing.
56 */
57 private val workerCounter = atomic(1L)
58
scheduleDirectnull59 override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
60 scope.scheduleTask(block, unit.toMillis(delay)) { task ->
61 Runnable { scope.launch { task() } }
62 }
63
createWorkernull64 override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
65
66 override fun shutdown() {
67 schedulerJob.cancel()
68 }
69
70 private class DispatcherWorker(
71 private val counter: Long,
72 private val dispatcher: CoroutineDispatcher,
73 parentJob: Job
74 ) : Worker() {
75
76 private val workerJob = SupervisorJob(parentJob)
77 private val workerScope = CoroutineScope(workerJob + dispatcher)
78 private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
79
80 init {
<lambda>null81 workerScope.launch {
82 blockChannel.consumeEach {
83 it()
84 }
85 }
86 }
87
schedulenull88 override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
89 workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
90 Runnable { blockChannel.trySend(task) }
91 }
92
isDisposednull93 override fun isDisposed(): Boolean = !workerScope.isActive
94
95 override fun dispose() {
96 blockChannel.close()
97 workerJob.cancel()
98 }
99
toStringnull100 override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
101 }
102
103 override fun toString(): String = dispatcher.toString()
104 }
105
106 private typealias Task = suspend () -> Unit
107
108 /**
109 * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
110 * milliseconds.
111 */
112 private fun CoroutineScope.scheduleTask(
113 block: Runnable,
114 delayMillis: Long,
115 adaptForScheduling: (Task) -> Runnable
116 ): Disposable {
117 val ctx = coroutineContext
118 var handle: DisposableHandle? = null
119 val disposable = Disposable.fromRunnable {
120 // null if delay <= 0
121 handle?.dispose()
122 }
123 val decoratedBlock = RxJavaPlugins.onSchedule(block)
124 suspend fun task() {
125 if (disposable.isDisposed) return
126 try {
127 runInterruptible {
128 decoratedBlock.run()
129 }
130 } catch (e: Throwable) {
131 handleUndeliverableException(e, ctx)
132 }
133 }
134
135 val toSchedule = adaptForScheduling(::task)
136 if (!isActive) return Disposable.disposed()
137 if (delayMillis <= 0) {
138 toSchedule.run()
139 } else {
140 @Suppress("INVISIBLE_MEMBER")
141 ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
142 }
143 return disposable
144 }
145
146 /**
147 * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
148 */
149 public class SchedulerCoroutineDispatcher(
150 /**
151 * Underlying scheduler of current [CoroutineDispatcher].
152 */
153 public val scheduler: Scheduler
154 ) : CoroutineDispatcher(), Delay {
155 /** @suppress */
dispatchnull156 override fun dispatch(context: CoroutineContext, block: Runnable) {
157 scheduler.scheduleDirect(block)
158 }
159
160 /** @suppress */
scheduleResumeAfterDelaynull161 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
162 val disposable = scheduler.scheduleDirect({
163 with(continuation) { resumeUndispatched(Unit) }
164 }, timeMillis, TimeUnit.MILLISECONDS)
165 continuation.disposeOnCancellation(disposable)
166 }
167
168 /** @suppress */
invokeOnTimeoutnull169 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
170 val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
171 return DisposableHandle { disposable.dispose() }
172 }
173
174 /** @suppress */
toStringnull175 override fun toString(): String = scheduler.toString()
176
177 /** @suppress */
178 override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
179
180 /** @suppress */
181 override fun hashCode(): Int = System.identityHashCode(scheduler)
182 }
183