• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import reactor.core.Disposable
9 import reactor.core.scheduler.Scheduler
10 import java.util.concurrent.TimeUnit
11 import kotlin.coroutines.CoroutineContext
12 
13 /**
14  * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher].
15  */
asCoroutineDispatchernull16 public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
17 
18 /**
19  * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
20  * @param scheduler a scheduler.
21  */
22 public class SchedulerCoroutineDispatcher(
23     /**
24      * Underlying scheduler of current [CoroutineDispatcher].
25      */
26     public val scheduler: Scheduler
27 ) : CoroutineDispatcher(), Delay {
28     /** @suppress */
29     override fun dispatch(context: CoroutineContext, block: Runnable) {
30         scheduler.schedule(block)
31     }
32 
33     /** @suppress */
34     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
35         val disposable = scheduler.schedule({
36             with(continuation) { resumeUndispatched(Unit) }
37         }, timeMillis, TimeUnit.MILLISECONDS)
38         continuation.disposeOnCancellation(disposable.asDisposableHandle())
39     }
40 
41     /** @suppress */
42     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
43         scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle()
44 
45     /** @suppress */
46     override fun toString(): String = scheduler.toString()
47     /** @suppress */
48     override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler
49     /** @suppress */
50     override fun hashCode(): Int = System.identityHashCode(scheduler)
51 }
52 
Disposablenull53 private fun Disposable.asDisposableHandle(): DisposableHandle =
54     object : DisposableHandle {
55         override fun dispose() = this@asDisposableHandle.dispose()
56     }
57