1 package kotlinx.coroutines.reactor 2 3 import kotlinx.coroutines.* 4 import reactor.core.Disposable 5 import reactor.core.scheduler.Scheduler 6 import java.util.concurrent.TimeUnit 7 import kotlin.coroutines.CoroutineContext 8 9 /** 10 * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]. 11 */ asCoroutineDispatchernull12public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) 13 14 /** 15 * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. 16 * @param scheduler a scheduler. 17 */ 18 public class SchedulerCoroutineDispatcher( 19 /** 20 * Underlying scheduler of current [CoroutineDispatcher]. 21 */ 22 public val scheduler: Scheduler 23 ) : CoroutineDispatcher(), Delay { 24 /** @suppress */ 25 override fun dispatch(context: CoroutineContext, block: Runnable) { 26 scheduler.schedule(block) 27 } 28 29 /** @suppress */ 30 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 31 val disposable = scheduler.schedule({ 32 with(continuation) { resumeUndispatched(Unit) } 33 }, timeMillis, TimeUnit.MILLISECONDS) 34 continuation.disposeOnCancellation(disposable.asDisposableHandle()) 35 } 36 37 /** @suppress */ 38 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = 39 scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle() 40 41 /** @suppress */ 42 override fun toString(): String = scheduler.toString() 43 /** @suppress */ 44 override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler 45 /** @suppress */ 46 override fun hashCode(): Int = System.identityHashCode(scheduler) 47 } 48 Disposablenull49private fun Disposable.asDisposableHandle(): DisposableHandle = 50 DisposableHandle { this@asDisposableHandle.dispose() } 51