1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.javafx 6 7 import javafx.beans.value.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.channels.* 10 import kotlinx.coroutines.flow.* 11 12 /** 13 * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits 14 * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick 15 * succession, only the last one will be emitted. 16 * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue] 17 * supports lazy evaluation, eager computation will be enforced while the flow is being collected. 18 * All the calls to JavaFX API are performed in [Dispatchers.JavaFx]. 19 * This flow emits at least the initial value. 20 * 21 * ### Operator fusion 22 * 23 * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused. 24 * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead. 25 */ 26 @ExperimentalCoroutinesApi // Since 1.3.x 27 public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> { 28 val listener = ChangeListener<T> { _, _, newValue -> 29 /* 30 * Do not propagate the exception to the ObservableValue, it 31 * already should've been handled by the downstream 32 */ 33 trySend(newValue) 34 } 35 addListener(listener) 36 send(value) 37 awaitClose { 38 removeListener(listener) 39 } 40 }.flowOn(Dispatchers.JavaFx).conflate() 41