• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx2
2 
3 import io.reactivex.*
4 import io.reactivex.disposables.*
5 import io.reactivex.plugins.*
6 import kotlinx.atomicfu.*
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import java.util.concurrent.*
10 import kotlin.coroutines.*
11 
12 /**
13  * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
14  * and provides native support of [delay] and [withTimeout].
15  */
16 public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
17     if (this is DispatcherScheduler) {
18         dispatcher
19     } else {
20         SchedulerCoroutineDispatcher(this)
21     }
22 
23 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
24 @JvmName("asCoroutineDispatcher")
asCoroutineDispatcher0null25 public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
26     SchedulerCoroutineDispatcher(this)
27 
28 /**
29  * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
30  */
31 public fun CoroutineDispatcher.asScheduler(): Scheduler =
32     if (this is SchedulerCoroutineDispatcher) {
33         scheduler
34     } else {
35         DispatcherScheduler(this)
36     }
37 
38 private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
39 
40     private val schedulerJob = SupervisorJob()
41 
42     /**
43      * The scope for everything happening in this [DispatcherScheduler].
44      *
45      * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
46      */
47     private val scope = CoroutineScope(schedulerJob + dispatcher)
48 
49     /**
50      * The counter of created workers, for their pretty-printing.
51      */
52     private val workerCounter = atomic(1L)
53 
scheduleDirectnull54     override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
55         scope.scheduleTask(block, unit.toMillis(delay)) { task ->
56             Runnable { scope.launch { task() } }
57         }
58 
createWorkernull59     override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
60 
61     override fun shutdown() {
62         schedulerJob.cancel()
63     }
64 
65     private class DispatcherWorker(
66         private val counter: Long,
67         private val dispatcher: CoroutineDispatcher,
68         parentJob: Job
69     ) : Worker() {
70 
71         private val workerJob = SupervisorJob(parentJob)
72         private val workerScope = CoroutineScope(workerJob + dispatcher)
73         private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
74 
75         init {
<lambda>null76             workerScope.launch {
77                 blockChannel.consumeEach {
78                     it()
79                 }
80             }
81         }
82 
schedulenull83         override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
84             workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
85                 Runnable { blockChannel.trySend(task) }
86             }
87 
isDisposednull88         override fun isDisposed(): Boolean = !workerScope.isActive
89 
90         override fun dispose() {
91             blockChannel.close()
92             workerJob.cancel()
93         }
94 
toStringnull95         override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
96     }
97 
98     override fun toString(): String = dispatcher.toString()
99 }
100 
101 private typealias Task = suspend () -> Unit
102 
103 /**
104  * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
105  * milliseconds.
106  */
107 private fun CoroutineScope.scheduleTask(
108     block: Runnable,
109     delayMillis: Long,
110     adaptForScheduling: (Task) -> Runnable
111 ): Disposable {
112     val ctx = coroutineContext
113     var handle: DisposableHandle? = null
114     val disposable = Disposables.fromRunnable {
115         // null if delay <= 0
116         handle?.dispose()
117     }
118     val decoratedBlock = RxJavaPlugins.onSchedule(block)
119     suspend fun task() {
120         if (disposable.isDisposed) return
121         try {
122             runInterruptible {
123                 decoratedBlock.run()
124             }
125         } catch (e: Throwable) {
126             handleUndeliverableException(e, ctx)
127         }
128     }
129 
130     val toSchedule = adaptForScheduling(::task)
131     if (!isActive) return Disposables.disposed()
132     if (delayMillis <= 0) {
133         toSchedule.run()
134     } else {
135         @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2
136         ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
137     }
138     return disposable
139 }
140 
141 /**
142  * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
143  */
144 public class SchedulerCoroutineDispatcher(
145     /**
146      * Underlying scheduler of current [CoroutineDispatcher].
147      */
148     public val scheduler: Scheduler
149 ) : CoroutineDispatcher(), Delay {
150     /** @suppress */
dispatchnull151     override fun dispatch(context: CoroutineContext, block: Runnable) {
152         scheduler.scheduleDirect(block)
153     }
154 
155     /** @suppress */
scheduleResumeAfterDelaynull156     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
157         val disposable = scheduler.scheduleDirect({
158             with(continuation) { resumeUndispatched(Unit) }
159         }, timeMillis, TimeUnit.MILLISECONDS)
160         continuation.disposeOnCancellation(disposable)
161     }
162 
163     /** @suppress */
invokeOnTimeoutnull164     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
165         val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
166         return DisposableHandle { disposable.dispose() }
167     }
168 
169     /** @suppress */
toStringnull170     override fun toString(): String = scheduler.toString()
171 
172     /** @suppress */
173     override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
174 
175     /** @suppress */
176     override fun hashCode(): Int = System.identityHashCode(scheduler)
177 }
178