1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.javafx 6 7 import javafx.beans.value.ChangeListener 8 import javafx.beans.value.ObservableValue 9 import kotlinx.coroutines.* 10 import kotlinx.coroutines.channels.awaitClose 11 import kotlinx.coroutines.flow.* 12 13 /** 14 * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits 15 * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick 16 * succession, only the last one will be emitted. 17 * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue] 18 * supports lazy evaluation, eager computation will be enforced while the flow is being collected. 19 * All the calls to JavaFX API are performed in [Dispatchers.JavaFx]. 20 * This flow emits at least the initial value. 21 * 22 * ### Operator fusion 23 * 24 * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused. 25 * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead. 26 */ 27 @ExperimentalCoroutinesApi // Since 1.3.x 28 public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> { 29 val listener = ChangeListener<T> { _, _, newValue -> 30 try { 31 offer(newValue) 32 } catch (e: CancellationException) { 33 // In case the event fires after the channel is closed 34 } 35 } 36 addListener(listener) 37 send(value) 38 awaitClose { 39 removeListener(listener) 40 } 41 }.flowOn(Dispatchers.JavaFx).conflate() 42