• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import io.reactivex.plugins.*
10 import kotlinx.atomicfu.*
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.channels.*
13 import java.util.concurrent.*
14 import kotlin.coroutines.*
15 
16 /**
17  * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
18  * and provides native support of [delay] and [withTimeout].
19  */
20 public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
21     if (this is DispatcherScheduler) {
22         dispatcher
23     } else {
24         SchedulerCoroutineDispatcher(this)
25     }
26 
27 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
28 @JvmName("asCoroutineDispatcher")
asCoroutineDispatcher0null29 public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
30     SchedulerCoroutineDispatcher(this)
31 
32 /**
33  * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
34  */
35 public fun CoroutineDispatcher.asScheduler(): Scheduler =
36     if (this is SchedulerCoroutineDispatcher) {
37         scheduler
38     } else {
39         DispatcherScheduler(this)
40     }
41 
42 private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {
43 
44     private val schedulerJob = SupervisorJob()
45 
46     /**
47      * The scope for everything happening in this [DispatcherScheduler].
48      *
49      * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
50      */
51     private val scope = CoroutineScope(schedulerJob + dispatcher)
52 
53     /**
54      * The counter of created workers, for their pretty-printing.
55      */
56     private val workerCounter = atomic(1L)
57 
scheduleDirectnull58     override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
59         scope.scheduleTask(block, unit.toMillis(delay)) { task ->
60             Runnable { scope.launch { task() } }
61         }
62 
createWorkernull63     override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)
64 
65     override fun shutdown() {
66         schedulerJob.cancel()
67     }
68 
69     private class DispatcherWorker(
70         private val counter: Long,
71         private val dispatcher: CoroutineDispatcher,
72         parentJob: Job
73     ) : Worker() {
74 
75         private val workerJob = SupervisorJob(parentJob)
76         private val workerScope = CoroutineScope(workerJob + dispatcher)
77         private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
78 
79         init {
<lambda>null80             workerScope.launch {
81                 blockChannel.consumeEach {
82                     it()
83                 }
84             }
85         }
86 
schedulenull87         override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
88             workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
89                 Runnable { blockChannel.trySend(task) }
90             }
91 
isDisposednull92         override fun isDisposed(): Boolean = !workerScope.isActive
93 
94         override fun dispose() {
95             blockChannel.close()
96             workerJob.cancel()
97         }
98 
toStringnull99         override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
100     }
101 
102     override fun toString(): String = dispatcher.toString()
103 }
104 
105 private typealias Task = suspend () -> Unit
106 
107 /**
108  * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
109  * milliseconds.
110  */
111 private fun CoroutineScope.scheduleTask(
112     block: Runnable,
113     delayMillis: Long,
114     adaptForScheduling: (Task) -> Runnable
115 ): Disposable {
116     val ctx = coroutineContext
117     var handle: DisposableHandle? = null
118     val disposable = Disposables.fromRunnable {
119         // null if delay <= 0
120         handle?.dispose()
121     }
122     val decoratedBlock = RxJavaPlugins.onSchedule(block)
123     suspend fun task() {
124         if (disposable.isDisposed) return
125         try {
126             runInterruptible {
127                 decoratedBlock.run()
128             }
129         } catch (e: Throwable) {
130             handleUndeliverableException(e, ctx)
131         }
132     }
133 
134     val toSchedule = adaptForScheduling(::task)
135     if (!isActive) return Disposables.disposed()
136     if (delayMillis <= 0) {
137         toSchedule.run()
138     } else {
139         @Suppress("INVISIBLE_MEMBER")
140         ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
141     }
142     return disposable
143 }
144 
145 /**
146  * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
147  */
148 public class SchedulerCoroutineDispatcher(
149     /**
150      * Underlying scheduler of current [CoroutineDispatcher].
151      */
152     public val scheduler: Scheduler
153 ) : CoroutineDispatcher(), Delay {
154     /** @suppress */
dispatchnull155     override fun dispatch(context: CoroutineContext, block: Runnable) {
156         scheduler.scheduleDirect(block)
157     }
158 
159     /** @suppress */
scheduleResumeAfterDelaynull160     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
161         val disposable = scheduler.scheduleDirect({
162             with(continuation) { resumeUndispatched(Unit) }
163         }, timeMillis, TimeUnit.MILLISECONDS)
164         continuation.disposeOnCancellation(disposable)
165     }
166 
167     /** @suppress */
invokeOnTimeoutnull168     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
169         val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
170         return DisposableHandle { disposable.dispose() }
171     }
172 
173     /** @suppress */
toStringnull174     override fun toString(): String = scheduler.toString()
175 
176     /** @suppress */
177     override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
178 
179     /** @suppress */
180     override fun hashCode(): Int = System.identityHashCode(scheduler)
181 }