• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.*
9 import io.reactivex.rxjava3.plugins.*
10 import kotlinx.atomicfu.*
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.CancellationException
13 import kotlinx.coroutines.channels.*
14 import java.util.concurrent.*
15 import kotlin.coroutines.*
16 
17 /**
18  * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
19  * and provides native support of [delay] and [withTimeout].
20  */
21 public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
22     if (this is DispatcherScheduler) {
23         dispatcher
24     } else {
25         SchedulerCoroutineDispatcher(this)
26     }
27 
28 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
29 @JvmName("asCoroutineDispatcher")
asCoroutineDispatcher0null30 public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
31     SchedulerCoroutineDispatcher(this)
32 
33 /**
34  * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
35  */
36 public fun CoroutineDispatcher.asScheduler(): Scheduler =
37     if (this is SchedulerCoroutineDispatcher) {
38         scheduler
39     } else {
40         DispatcherScheduler(this)
41     }
42 
43 private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
44 
45     private val schedulerJob = SupervisorJob()
46 
47     /**
48      * The scope for everything happening in this [DispatcherScheduler].
49      *
50      * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
51      */
52     private val scope = CoroutineScope(schedulerJob + dispatcher)
53 
54     /**
55      * The counter of created workers, for their pretty-printing.
56      */
57     private val workerCounter = atomic(1L)
58 
scheduleDirectnull59     override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
60         scope.scheduleTask(block, unit.toMillis(delay)) { task ->
61             Runnable { scope.launch { task() } }
62         }
63 
createWorkernull64     override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
65 
66     override fun shutdown() {
67         schedulerJob.cancel()
68     }
69 
70     private class DispatcherWorker(
71         private val counter: Long,
72         private val dispatcher: CoroutineDispatcher,
73         parentJob: Job
74     ) : Worker() {
75 
76         private val workerJob = SupervisorJob(parentJob)
77         private val workerScope = CoroutineScope(workerJob + dispatcher)
78         private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
79 
80         init {
<lambda>null81             workerScope.launch {
82                 blockChannel.consumeEach {
83                     it()
84                 }
85             }
86         }
87 
schedulenull88         override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
89             workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
90                 Runnable { blockChannel.trySend(task) }
91             }
92 
isDisposednull93         override fun isDisposed(): Boolean = !workerScope.isActive
94 
95         override fun dispose() {
96             blockChannel.close()
97             workerJob.cancel()
98         }
99 
toStringnull100         override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
101     }
102 
103     override fun toString(): String = dispatcher.toString()
104 }
105 
106 private typealias Task = suspend () -> Unit
107 
108 /**
109  * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
110  * milliseconds.
111  */
112 private fun CoroutineScope.scheduleTask(
113     block: Runnable,
114     delayMillis: Long,
115     adaptForScheduling: (Task) -> Runnable
116 ): Disposable {
117     val ctx = coroutineContext
118     var handle: DisposableHandle? = null
119     val disposable = Disposable.fromRunnable {
120         // null if delay <= 0
121         handle?.dispose()
122     }
123     val decoratedBlock = RxJavaPlugins.onSchedule(block)
124     suspend fun task() {
125         if (disposable.isDisposed) return
126         try {
127             runInterruptible {
128                 decoratedBlock.run()
129             }
130         } catch (e: Throwable) {
131             handleUndeliverableException(e, ctx)
132         }
133     }
134 
135     val toSchedule = adaptForScheduling(::task)
136     if (!isActive) return Disposable.disposed()
137     if (delayMillis <= 0) {
138         toSchedule.run()
139     } else {
140         @Suppress("INVISIBLE_MEMBER")
141         ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
142     }
143     return disposable
144 }
145 
146 /**
147  * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
148  */
149 public class SchedulerCoroutineDispatcher(
150     /**
151      * Underlying scheduler of current [CoroutineDispatcher].
152      */
153     public val scheduler: Scheduler
154 ) : CoroutineDispatcher(), Delay {
155     /** @suppress */
dispatchnull156     override fun dispatch(context: CoroutineContext, block: Runnable) {
157         scheduler.scheduleDirect(block)
158     }
159 
160     /** @suppress */
scheduleResumeAfterDelaynull161     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
162         val disposable = scheduler.scheduleDirect({
163             with(continuation) { resumeUndispatched(Unit) }
164         }, timeMillis, TimeUnit.MILLISECONDS)
165         continuation.disposeOnCancellation(disposable)
166     }
167 
168     /** @suppress */
invokeOnTimeoutnull169     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
170         val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
171         return DisposableHandle { disposable.dispose() }
172     }
173 
174     /** @suppress */
toStringnull175     override fun toString(): String = scheduler.toString()
176 
177     /** @suppress */
178     override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
179 
180     /** @suppress */
181     override fun hashCode(): Int = System.identityHashCode(scheduler)
182 }
183