• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.javafx
2 
3 import javafx.beans.value.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.*
7 
8 /**
9  * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
10  * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
11  * succession, only the last one will be emitted.
12  * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
13  * supports lazy evaluation, eager computation will be enforced while the flow is being collected.
14  * All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
15  * This flow emits at least the initial value.
16  *
17  * ### Operator fusion
18  *
19  * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
20  * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
21  */
22 @ExperimentalCoroutinesApi // Since 1.3.x
23 public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
24     val listener = ChangeListener<T> { _, _, newValue ->
25         /*
26          * Do not propagate the exception to the ObservableValue, it
27          * already should've been handled by the downstream
28          */
29         trySend(newValue)
30     }
31     addListener(listener)
32     send(value)
33     awaitClose {
34         removeListener(listener)
35     }
36 }.flowOn(Dispatchers.JavaFx).conflate()
37