• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  *  Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.javafx
6 
7 import javafx.beans.value.ChangeListener
8 import javafx.beans.value.ObservableValue
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.awaitClose
11 import kotlinx.coroutines.flow.*
12 
13 /**
14  * Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
15  * its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
16  * succession, only the last one will be emitted.
17  * Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
18  * supports lazy evaluation, eager computation will be enforced while the flow is being collected.
19  * All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
20  * This flow emits at least the initial value.
21  *
22  * ### Operator fusion
23  *
24  * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
25  * [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
26  */
27 @ExperimentalCoroutinesApi // Since 1.3.x
28 public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
29     val listener = ChangeListener<T> { _, _, newValue ->
30         try {
31             offer(newValue)
32         } catch (e: CancellationException) {
33             // In case the event fires after the channel is closed
34         }
35     }
36     addListener(listener)
37     send(value)
38     awaitClose {
39         removeListener(listener)
40     }
41 }.flowOn(Dispatchers.JavaFx).conflate()
42