• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.reactor
2 
3 import kotlinx.coroutines.*
4 import reactor.core.Disposable
5 import reactor.core.scheduler.Scheduler
6 import java.util.concurrent.TimeUnit
7 import kotlin.coroutines.CoroutineContext
8 
9 /**
10  * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher].
11  */
asCoroutineDispatchernull12 public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
13 
14 /**
15  * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
16  * @param scheduler a scheduler.
17  */
18 public class SchedulerCoroutineDispatcher(
19     /**
20      * Underlying scheduler of current [CoroutineDispatcher].
21      */
22     public val scheduler: Scheduler
23 ) : CoroutineDispatcher(), Delay {
24     /** @suppress */
25     override fun dispatch(context: CoroutineContext, block: Runnable) {
26         scheduler.schedule(block)
27     }
28 
29     /** @suppress */
30     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
31         val disposable = scheduler.schedule({
32             with(continuation) { resumeUndispatched(Unit) }
33         }, timeMillis, TimeUnit.MILLISECONDS)
34         continuation.disposeOnCancellation(disposable.asDisposableHandle())
35     }
36 
37     /** @suppress */
38     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
39         scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle()
40 
41     /** @suppress */
42     override fun toString(): String = scheduler.toString()
43     /** @suppress */
44     override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
45     /** @suppress */
46     override fun hashCode(): Int = System.identityHashCode(scheduler)
47 }
48 
Disposablenull49 private fun Disposable.asDisposableHandle(): DisposableHandle =
50     DisposableHandle { this@asDisposableHandle.dispose() }
51