• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.javafx
6 
7 import javafx.beans.value.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 
12 /**
13  * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
14  * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
15  * succession, only the last one will be emitted.
16  * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
17  * supports lazy evaluation, eager computation will be enforced while the flow is being collected.
18  * All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
19  * This flow emits at least the initial value.
20  *
21  * ### Operator fusion
22  *
23  * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
24  * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
25  */
26 @ExperimentalCoroutinesApi // Since 1.3.x
27 public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
28     val listener = ChangeListener<T> { _, _, newValue ->
29         /*
30          * Do not propagate the exception to the ObservableValue, it
31          * already should've been handled by the downstream
32          */
33         trySend(newValue)
34     }
35     addListener(listener)
36     send(value)
37     awaitClose {
38         removeListener(listener)
39     }
40 }.flowOn(Dispatchers.JavaFx).conflate()
41