1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import reactor.core.Disposable 9 import reactor.core.scheduler.Scheduler 10 import java.util.concurrent.TimeUnit 11 import kotlin.coroutines.CoroutineContext 12 13 /** 14 * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]. 15 */ Schedulernull16fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) 17 18 /** 19 * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. 20 * @param scheduler a scheduler. 21 */ 22 public class SchedulerCoroutineDispatcher( 23 /** 24 * Underlying scheduler of current [CoroutineDispatcher]. 25 */ 26 public val scheduler: Scheduler 27 ) : CoroutineDispatcher(), Delay { 28 /** @suppress */ 29 override fun dispatch(context: CoroutineContext, block: Runnable) { 30 scheduler.schedule(block) 31 } 32 33 /** @suppress */ 34 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { 35 val disposable = scheduler.schedule({ 36 with(continuation) { resumeUndispatched(Unit) } 37 }, timeMillis, TimeUnit.MILLISECONDS) 38 continuation.disposeOnCancellation(disposable.asDisposableHandle()) 39 } 40 41 /** @suppress */ 42 override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = 43 scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle() 44 45 /** @suppress */ 46 override fun toString(): String = scheduler.toString() 47 /** @suppress */ 48 override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler 49 /** @suppress */ 50 override fun hashCode(): Int = System.identityHashCode(scheduler) 51 } 52 Disposablenull53private fun Disposable.asDisposableHandle(): DisposableHandle = 54 object : DisposableHandle { 55 override fun dispose() = this@asDisposableHandle.dispose() 56 }