<lambda>null1 package kotlinx.coroutines.javafx
2
3 import javafx.beans.value.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.*
7
8 /**
9 * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
10 * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
11 * succession, only the last one will be emitted.
12 * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
13 * supports lazy evaluation, eager computation will be enforced while the flow is being collected.
14 * All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
15 * This flow emits at least the initial value.
16 *
17 * ### Operator fusion
18 *
19 * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
20 * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
21 */
22 @ExperimentalCoroutinesApi // Since 1.3.x
23 public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
24 val listener = ChangeListener<T> { _, _, newValue ->
25 /*
26 * Do not propagate the exception to the ObservableValue, it
27 * already should've been handled by the downstream
28 */
29 trySend(newValue)
30 }
31 addListener(listener)
32 send(value)
33 awaitClose {
34 removeListener(listener)
35 }
36 }.flowOn(Dispatchers.JavaFx).conflate()
37