1 /*
<lambda>null2 * Copyright (C) 2024 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.systemui.kairos
18
19 import com.android.systemui.kairos.util.Maybe
20 import com.android.systemui.kairos.util.map
21 import kotlin.coroutines.CoroutineContext
22 import kotlin.coroutines.EmptyCoroutineContext
23 import kotlinx.coroutines.CompletableDeferred
24 import kotlinx.coroutines.Deferred
25 import kotlinx.coroutines.DisposableHandle
26 import kotlinx.coroutines.Job
27 import kotlinx.coroutines.awaitCancellation
28 import kotlinx.coroutines.flow.Flow
29 import kotlinx.coroutines.flow.FlowCollector
30 import kotlinx.coroutines.flow.MutableSharedFlow
31 import kotlinx.coroutines.flow.MutableStateFlow
32 import kotlinx.coroutines.flow.SharedFlow
33 import kotlinx.coroutines.flow.StateFlow
34 import kotlinx.coroutines.flow.dropWhile
35 import kotlinx.coroutines.flow.scan
36
37 /** A computation that can modify the Kairos network. */
38 typealias BuildSpec<A> = BuildScope.() -> A
39
40 /**
41 * Constructs a [BuildSpec]. The passed [block] will be invoked with a [BuildScope] that can be used
42 * to perform network-building operations, including adding new inputs and outputs to the network,
43 * as well as all operations available in [TransactionScope].
44 */
45 @ExperimentalKairosApi
46 @Suppress("NOTHING_TO_INLINE")
47 inline fun <A> buildSpec(noinline block: BuildScope.() -> A): BuildSpec<A> = block
48
49 /** Applies the [BuildSpec] within this [BuildScope]. */
50 @ExperimentalKairosApi
51 inline operator fun <A> BuildScope.invoke(block: BuildScope.() -> A) = run(block)
52
53 /** Operations that add inputs and outputs to a Kairos network. */
54 @ExperimentalKairosApi
55 interface BuildScope : HasNetwork, StateScope {
56
57 /**
58 * Defers invoking [block] until after the current [BuildScope] code-path completes, returning a
59 * [DeferredValue] that can be used to reference the result.
60 *
61 * Useful for recursive definitions.
62 *
63 * @see deferredBuildScopeAction
64 * @see DeferredValue
65 */
66 fun <R> deferredBuildScope(block: BuildScope.() -> R): DeferredValue<R>
67
68 /**
69 * Defers invoking [block] until after the current [BuildScope] code-path completes.
70 *
71 * Useful for recursive definitions.
72 *
73 * @see deferredBuildScope
74 */
75 fun deferredBuildScopeAction(block: BuildScope.() -> Unit)
76
77 /**
78 * Returns an [Events] containing the results of applying [transform] to each value of the
79 * original [Events].
80 *
81 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
82 * Unlike [mapLatestBuild], these modifications are not undone with each subsequent emission of
83 * the original [Events].
84 *
85 * **NOTE:** This API does not [observe] the original [Events], meaning that unless the returned
86 * (or a downstream) [Events] is observed separately, [transform] will not be invoked, and no
87 * internal side-effects will occur.
88 */
89 fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B>
90
91 /**
92 * Invokes [block] whenever this [Events] emits a value, allowing side-effects to be safely
93 * performed in reaction to the emission.
94 *
95 * Specifically, [block] is deferred to the end of the transaction, and is only actually
96 * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
97 * -Latest combinator, for example.
98 *
99 * [Disposing][DisposableHandle.dispose] of the returned [DisposableHandle] will stop the
100 * observation of new emissions. It will however *not* cancel any running effects from previous
101 * emissions. To achieve this behavior, use [launchScope] or [asyncScope] to create a child
102 * build scope:
103 * ```
104 * val job = launchScope {
105 * events.observe { x ->
106 * launchEffect { longRunningEffect(x) }
107 * }
108 * }
109 * // cancels observer and any running effects:
110 * job.cancel()
111 * ```
112 */
113 // TODO: remove disposable handle return? might add more confusion than convenience
114 fun <A> Events<A>.observe(
115 coroutineContext: CoroutineContext = EmptyCoroutineContext,
116 block: EffectScope.(A) -> Unit = {},
117 ): DisposableHandle
118
119 /**
120 * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
121 * original [Events], and a [DeferredValue] containing the result of applying [initialSpecs]
122 * immediately.
123 *
124 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
125 * same key are undone (any registered [observers][observe] are unregistered, and any pending
126 * [side-effects][effect] are cancelled).
127 *
128 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
129 * previously-active [BuildSpec] will be undone with no replacement.
130 */
131 fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey(
132 initialSpecs: DeferredValue<Map<K, BuildSpec<B>>>,
133 numKeys: Int? = null,
134 ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>>
135
136 /**
137 * Creates an instance of an [Events] with elements that are emitted from [builder].
138 *
139 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
140 * provided [EventProducerScope].
141 *
142 * By default, [builder] is only running while the returned [Events] is being
143 * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
144 * ```
145 * events { ... }.apply { observe() }
146 * ```
147 */
148 // TODO: eventually this should be defined on KairosNetwork + an extension on HasNetwork
149 // - will require modifying InputNode so that it can be manually killed, as opposed to using
150 // takeUntil (which requires a StateScope).
151 fun <T> events(builder: suspend EventProducerScope<T>.() -> Unit): Events<T>
152
153 /**
154 * Creates an instance of an [Events] with elements that are emitted from [builder].
155 *
156 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
157 * provided [CoalescingEventProducerScope].
158 *
159 * By default, [builder] is only running while the returned [Events] is being
160 * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
161 * ```
162 * events { ... }.apply { observe() }
163 * ```
164 *
165 * In the event of backpressure, emissions are *coalesced* into batches. When a value is
166 * [emitted][CoalescingEventProducerScope.emit] from [builder], it is merged into the batch via
167 * [coalesce]. Once the batch is consumed by the kairos network in the next transaction, the
168 * batch is reset back to [getInitialValue].
169 */
170 // TODO: see TODO for [events]
171 fun <In, Out> coalescingEvents(
172 getInitialValue: () -> Out,
173 coalesce: (old: Out, new: In) -> Out,
174 builder: suspend CoalescingEventProducerScope<In>.() -> Unit,
175 ): Events<Out>
176
177 /**
178 * Creates a new [BuildScope] that is a child of this one.
179 *
180 * This new scope can be manually cancelled via the returned [Job], or will be cancelled
181 * automatically when its parent is cancelled. Cancellation will unregister all
182 * [observers][observe] and cancel all scheduled [effects][effect].
183 *
184 * The return value from [block] can be accessed via the returned [DeferredValue].
185 */
186 // TODO: return a DisposableHandle instead of Job?
187 fun <A> asyncScope(block: BuildSpec<A>): Pair<DeferredValue<A>, Job>
188
189 // TODO: once we have context params, these can all become extensions:
190
191 /**
192 * Returns an [Events] containing the results of applying the given [transform] function to each
193 * value of the original [Events].
194 *
195 * Unlike [Events.map], [transform] can perform arbitrary asynchronous code. This code is run
196 * outside of the current Kairos transaction; when [transform] returns, the returned value is
197 * emitted from the result [Events] in a new transaction.
198 *
199 * ```
200 * fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> =
201 * mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten()
202 * ```
203 */
204 fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> =
205 mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten()
206
207 /**
208 * Invokes [block] whenever this [Events] emits a value. [block] receives an [BuildScope] that
209 * can be used to make further modifications to the Kairos network, and/or perform side-effects
210 * via [effect].
211 *
212 * @see observe
213 */
214 fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
215 mapBuild(block).observe()
216
217 /**
218 * Returns a [StateFlow] whose [value][StateFlow.value] tracks the current
219 * [value of this State][State.sample], and will emit at the same rate as [State.changes].
220 */
221 fun <A> State<A>.toStateFlow(): StateFlow<A> {
222 val innerStateFlow = MutableStateFlow(sampleDeferred())
223 changes.observe { innerStateFlow.value = deferredOf(it) }
224 return object : StateFlow<A> {
225 override val replayCache: List<A>
226 get() = innerStateFlow.replayCache.map { it.value }
227
228 override val value: A
229 get() = innerStateFlow.value.value
230
231 override suspend fun collect(collector: FlowCollector<A>): Nothing {
232 innerStateFlow.collect { collector.emit(it.value) }
233 }
234 }
235 }
236
237 /**
238 * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits the current
239 * [value][State.sample] of this [State] followed by all [changes].
240 */
241 fun <A> State<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
242 val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
243 deferredBuildScope {
244 result.tryEmit(sample())
245 changes.observe { a -> result.tryEmit(a) }
246 }
247 return result
248 }
249
250 /**
251 * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits values
252 * whenever this [Events] emits.
253 */
254 fun <A> Events<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
255 val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
256 observe { a -> result.tryEmit(a) }
257 return result
258 }
259
260 /**
261 * Returns a [State] that holds onto the value returned by applying the most recently emitted
262 * [BuildSpec] from the original [Events], or the value returned by applying [initialSpec] if
263 * nothing has been emitted since it was constructed.
264 *
265 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
266 * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
267 * are cancelled).
268 */
269 fun <A> Events<BuildSpec<A>>.holdLatestSpec(initialSpec: BuildSpec<A>): State<A> {
270 val (changes: Events<A>, initApplied: DeferredValue<A>) = applyLatestSpec(initialSpec)
271 return changes.holdStateDeferred(initApplied)
272 }
273
274 /**
275 * Returns a [State] containing the value returned by applying the [BuildSpec] held by the
276 * original [State].
277 *
278 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
279 * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
280 * are cancelled).
281 */
282 fun <A> State<BuildSpec<A>>.applyLatestSpec(): State<A> {
283 val (appliedChanges: Events<A>, init: DeferredValue<A>) =
284 changes.applyLatestSpec(buildSpec { sample().applySpec() })
285 return appliedChanges.holdStateDeferred(init)
286 }
287
288 /**
289 * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
290 * original [Events].
291 *
292 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
293 * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
294 * are cancelled).
295 */
296 fun <A> Events<BuildSpec<A>>.applyLatestSpec(): Events<A> = applyLatestSpec(buildSpec {}).first
297
298 /**
299 * Returns an [Events] that switches to a new [Events] produced by [transform] every time the
300 * original [Events] emits a value.
301 *
302 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
303 * When the original [Events] emits a new value, those changes are undone (any registered
304 * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
305 */
306 fun <A, B> Events<A>.flatMapLatestBuild(transform: BuildScope.(A) -> Events<B>): Events<B> =
307 mapCheap { buildSpec { transform(it) } }.applyLatestSpec().flatten()
308
309 /**
310 * Returns a [State] by applying [transform] to the value held by the original [State].
311 *
312 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
313 * When the value held by the original [State] changes, those changes are undone (any registered
314 * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
315 */
316 fun <A, B> State<A>.flatMapLatestBuild(transform: BuildScope.(A) -> State<B>): State<B> =
317 mapLatestBuild { transform(it) }.flatten()
318
319 /**
320 * Returns a [State] that transforms the value held inside this [State] by applying it to the
321 * [transform].
322 *
323 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
324 * When the value held by the original [State] changes, those changes are undone (any registered
325 * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
326 */
327 fun <A, B> State<A>.mapLatestBuild(transform: BuildScope.(A) -> B): State<B> =
328 mapCheapUnsafe { buildSpec { transform(it) } }.applyLatestSpec()
329
330 /**
331 * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
332 * original [Events], and a [DeferredValue] containing the result of applying [initialSpec]
333 * immediately.
334 *
335 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
336 * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
337 * are cancelled).
338 */
339 fun <A : Any?, B> Events<BuildSpec<B>>.applyLatestSpec(
340 initialSpec: BuildSpec<A>
341 ): Pair<Events<B>, DeferredValue<A>> {
342 val (events, result) =
343 mapCheap { spec -> mapOf(Unit to Maybe.present(spec)) }
344 .applyLatestSpecForKey(initialSpecs = mapOf(Unit to initialSpec), numKeys = 1)
345 val outEvents: Events<B> =
346 events.mapMaybe {
347 checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" }
348 }
349 val outInit: DeferredValue<A> = deferredBuildScope {
350 val initResult: Map<Unit, A> = result.value
351 check(Unit in initResult) {
352 "applyLatest: expected initial result, but none present in: $initResult"
353 }
354 @Suppress("UNCHECKED_CAST")
355 initResult.getOrDefault(Unit) { null } as A
356 }
357 return Pair(outEvents, outInit)
358 }
359
360 /**
361 * Returns an [Events] containing the results of applying [transform] to each value of the
362 * original [Events].
363 *
364 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
365 * With each invocation of [transform], changes from the previous invocation are undone (any
366 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
367 * cancelled).
368 */
369 fun <A, B> Events<A>.mapLatestBuild(transform: BuildScope.(A) -> B): Events<B> =
370 mapCheap { buildSpec { transform(it) } }.applyLatestSpec()
371
372 /**
373 * Returns an [Events] containing the results of applying [transform] to each value of the
374 * original [Events], and a [DeferredValue] containing the result of applying [transform] to
375 * [initialValue] immediately.
376 *
377 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
378 * With each invocation of [transform], changes from the previous invocation are undone (any
379 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
380 * cancelled).
381 */
382 fun <A, B> Events<A>.mapLatestBuild(
383 initialValue: A,
384 transform: BuildScope.(A) -> B,
385 ): Pair<Events<B>, DeferredValue<B>> =
386 mapLatestBuildDeferred(deferredOf(initialValue), transform)
387
388 /**
389 * Returns an [Events] containing the results of applying [transform] to each value of the
390 * original [Events], and a [DeferredValue] containing the result of applying [transform] to
391 * [initialValue] immediately.
392 *
393 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
394 * With each invocation of [transform], changes from the previous invocation are undone (any
395 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
396 * cancelled).
397 */
398 fun <A, B> Events<A>.mapLatestBuildDeferred(
399 initialValue: DeferredValue<A>,
400 transform: BuildScope.(A) -> B,
401 ): Pair<Events<B>, DeferredValue<B>> =
402 mapCheap { buildSpec { transform(it) } }
403 .applyLatestSpec(initialSpec = buildSpec { transform(initialValue.value) })
404
405 /**
406 * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
407 * original [Events], and a [DeferredValue] containing the result of applying [initialSpecs]
408 * immediately.
409 *
410 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
411 * same key are undone (any registered [observers][observe] are unregistered, and any pending
412 * [side-effects][effect] are cancelled).
413 *
414 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
415 * previously-active [BuildSpec] will be undone with no replacement.
416 */
417 fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey(
418 initialSpecs: Map<K, BuildSpec<B>>,
419 numKeys: Int? = null,
420 ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> =
421 applyLatestSpecForKey(deferredOf(initialSpecs), numKeys)
422
423 /**
424 * Returns an [Incremental] containing the results of applying each [BuildSpec] emitted from the
425 * original [Incremental].
426 *
427 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
428 * same key are undone (any registered [observers][observe] are unregistered, and any pending
429 * [side-effects][effect] are cancelled).
430 *
431 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
432 * previously-active [BuildSpec] will be undone with no replacement.
433 */
434 fun <K, V> Incremental<K, BuildSpec<V>>.applyLatestSpecForKey(
435 numKeys: Int? = null
436 ): Incremental<K, V> {
437 val (events, initial) = updates.applyLatestSpecForKey(sampleDeferred(), numKeys)
438 return events.foldStateMapIncrementally(initial)
439 }
440
441 /**
442 * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
443 * original [Events].
444 *
445 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
446 * same key are undone (any registered [observers][observe] are unregistered, and any pending
447 * [side-effects][effect] are cancelled).
448 *
449 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
450 * previously-active [BuildSpec] will be undone with no replacement.
451 */
452 fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.applyLatestSpecForKey(
453 numKeys: Int? = null
454 ): Events<Map<K, Maybe<V>>> =
455 applyLatestSpecForKey<K, V, Nothing>(deferredOf(emptyMap()), numKeys).first
456
457 /**
458 * Returns a [State] containing the latest results of applying each [BuildSpec] emitted from the
459 * original [Events].
460 *
461 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
462 * same key are undone (any registered [observers][observe] are unregistered, and any pending
463 * [side-effects][effect] are cancelled).
464 *
465 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
466 * previously-active [BuildSpec] will be undone with no replacement.
467 */
468 fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey(
469 initialSpecs: DeferredValue<Map<K, BuildSpec<V>>>,
470 numKeys: Int? = null,
471 ): Incremental<K, V> {
472 val (changes, initialValues) = applyLatestSpecForKey(initialSpecs, numKeys)
473 return changes.foldStateMapIncrementally(initialValues)
474 }
475
476 /**
477 * Returns a [State] containing the latest results of applying each [BuildSpec] emitted from the
478 * original [Events].
479 *
480 * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
481 * same key are undone (any registered [observers][observe] are unregistered, and any pending
482 * [side-effects][effect] are cancelled).
483 *
484 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
485 * previously-active [BuildSpec] will be undone with no replacement.
486 */
487 fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey(
488 initialSpecs: Map<K, BuildSpec<V>> = emptyMap(),
489 numKeys: Int? = null,
490 ): Incremental<K, V> = holdLatestSpecForKey(deferredOf(initialSpecs), numKeys)
491
492 /**
493 * Returns an [Events] containing the results of applying [transform] to each value of the
494 * original [Events], and a [DeferredValue] containing the result of applying [transform] to
495 * [initialValues] immediately.
496 *
497 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
498 * With each invocation of [transform], changes from the previous invocation are undone (any
499 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
500 * cancelled).
501 *
502 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
503 * previously-active [BuildScope] will be undone with no replacement.
504 */
505 fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey(
506 initialValues: DeferredValue<Map<K, A>>,
507 numKeys: Int? = null,
508 transform: BuildScope.(K, A) -> B,
509 ): Pair<Events<Map<K, Maybe<B>>>, DeferredValue<Map<K, B>>> =
510 map { patch -> patch.mapValues { (k, v) -> v.map { buildSpec { transform(k, it) } } } }
511 .applyLatestSpecForKey(
512 deferredBuildScope {
513 initialValues.value.mapValues { (k, v) -> buildSpec { transform(k, v) } }
514 },
515 numKeys = numKeys,
516 )
517
518 /**
519 * Returns an [Events] containing the results of applying [transform] to each value of the
520 * original [Events], and a [DeferredValue] containing the result of applying [transform] to
521 * [initialValues] immediately.
522 *
523 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
524 * With each invocation of [transform], changes from the previous invocation are undone (any
525 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
526 * cancelled).
527 *
528 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
529 * previously-active [BuildScope] will be undone with no replacement.
530 */
531 fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey(
532 initialValues: Map<K, A>,
533 numKeys: Int? = null,
534 transform: BuildScope.(K, A) -> B,
535 ): Pair<Events<Map<K, Maybe<B>>>, DeferredValue<Map<K, B>>> =
536 mapLatestBuildForKey(deferredOf(initialValues), numKeys, transform)
537
538 /**
539 * Returns an [Events] containing the results of applying [transform] to each value of the
540 * original [Events].
541 *
542 * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
543 * With each invocation of [transform], changes from the previous invocation are undone (any
544 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
545 * cancelled).
546 *
547 * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
548 * previously-active [BuildScope] will be undone with no replacement.
549 */
550 fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey(
551 numKeys: Int? = null,
552 transform: BuildScope.(K, A) -> B,
553 ): Events<Map<K, Maybe<B>>> = mapLatestBuildForKey(emptyMap(), numKeys, transform).first
554
555 /** Returns a [Deferred] containing the next value to be emitted from this [Events]. */
556 fun <R> Events<R>.nextDeferred(): Deferred<R> {
557 lateinit var next: CompletableDeferred<R>
558 val job = launchScope { nextOnly().observe { next.complete(it) } }
559 next = CompletableDeferred(parent = job)
560 return next
561 }
562
563 /** Returns a [State] that reflects the [StateFlow.value] of this [StateFlow]. */
564 fun <A> StateFlow<A>.toState(): State<A> {
565 val initial = value
566 return events { dropWhile { it == initial }.collect { emit(it) } }.holdState(initial)
567 }
568
569 /** Returns an [Events] that emits whenever this [Flow] emits. */
570 fun <A> Flow<A>.toEvents(): Events<A> = events { collect { emit(it) } }
571
572 /**
573 * Shorthand for:
574 * ```
575 * flow.toEvents().holdState(initialValue)
576 * ```
577 */
578 fun <A> Flow<A>.toState(initialValue: A): State<A> = toEvents().holdState(initialValue)
579
580 /**
581 * Shorthand for:
582 * ```
583 * flow.scan(initialValue, operation).toEvents().holdState(initialValue)
584 * ```
585 */
586 fun <A, B> Flow<A>.scanToState(initialValue: B, operation: (B, A) -> B): State<B> =
587 scan(initialValue, operation).toEvents().holdState(initialValue)
588
589 /**
590 * Shorthand for:
591 * ```
592 * flow.scan(initialValue) { a, f -> f(a) }.toEvents().holdState(initialValue)
593 * ```
594 */
595 fun <A> Flow<(A) -> A>.scanToState(initialValue: A): State<A> =
596 scanToState(initialValue) { a, f -> f(a) }
597
598 /**
599 * Invokes [block] whenever this [Events] emits a value. [block] receives an [BuildScope] that
600 * can be used to make further modifications to the Kairos network, and/or perform side-effects
601 * via [effect].
602 *
603 * With each invocation of [block], changes from the previous invocation are undone (any
604 * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
605 * cancelled).
606 */
607 fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
608 mapLatestBuild { block(it) }.observe()
609
610 /**
611 * Invokes [block] whenever this [Events] emits a value, allowing side-effects to be safely
612 * performed in reaction to the emission.
613 *
614 * With each invocation of [block], running effects from the previous invocation are cancelled.
615 */
616 fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle {
617 var innerJob: Job? = null
618 return observeBuild {
619 innerJob?.cancel()
620 innerJob = effect { block(it) }
621 }
622 }
623
624 /**
625 * Invokes [block] with the value held by this [State], allowing side-effects to be safely
626 * performed in reaction to the state changing.
627 *
628 * With each invocation of [block], running effects from the previous invocation are cancelled.
629 */
630 fun <A> State<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job = launchScope {
631 var innerJob = effect { block(sample()) }
632 changes.observeBuild {
633 innerJob.cancel()
634 innerJob = effect { block(it) }
635 }
636 }
637
638 /**
639 * Applies [block] to the value held by this [State]. [block] receives an [BuildScope] that can
640 * be used to make further modifications to the Kairos network, and/or perform side-effects via
641 * [effect].
642 *
643 * [block] can perform modifications to the Kairos network via its [BuildScope] receiver. With
644 * each invocation of [block], changes from the previous invocation are undone (any registered
645 * [observers][observe] are unregistered, and any pending [side-effects][effect] are cancelled).
646 */
647 fun <A> State<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
648 var innerJob: Job = launchScope { block(sample()) }
649 changes.observeBuild {
650 innerJob.cancel()
651 innerJob = launchScope { block(it) }
652 }
653 }
654
655 /** Applies the [BuildSpec] within this [BuildScope]. */
656 fun <A> BuildSpec<A>.applySpec(): A = this()
657
658 /**
659 * Applies the [BuildSpec] within this [BuildScope], returning the result as an [DeferredValue].
660 */
661 fun <A> BuildSpec<A>.applySpecDeferred(): DeferredValue<A> = deferredBuildScope { applySpec() }
662
663 /**
664 * Invokes [block] on the value held in this [State]. [block] receives an [BuildScope] that can
665 * be used to make further modifications to the Kairos network, and/or perform side-effects via
666 * [effect].
667 *
668 * ```
669 * fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
670 * block(sample())
671 * changes.observeBuild(block)
672 * }
673 * ```
674 */
675 fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
676 block(sample())
677 changes.observeBuild(block)
678 }
679
680 /**
681 * Invokes [block] with the current value of this [State], re-invoking whenever it changes,
682 * allowing side-effects to be safely performed in reaction value changing.
683 *
684 * Specifically, [block] is deferred to the end of the transaction, and is only actually
685 * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
686 * -Latest combinator, for example.
687 *
688 * If the [State] is changing within the *current* transaction (i.e. [changes] is presently
689 * emitting) then [block] will be invoked for the first time with the new value; otherwise, it
690 * will be invoked with the [current][sample] value.
691 */
692 fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle =
693 now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) }
694 }
695
696 /**
697 * Returns an [Events] that emits the result of [block] once it completes. [block] is evaluated
698 * outside of the current Kairos transaction; when it completes, the returned [Events] emits in a
699 * new transaction.
700 *
701 * ```
702 * fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
703 * events { emit(block()) }.apply { observe() }
704 * ```
705 */
706 @ExperimentalKairosApi
asyncEventnull707 fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
708 events {
709 // TODO: if block completes synchronously, it would be nice to emit within this
710 // transaction
711 emit(block())
712 }
<lambda>null713 .apply { observe() }
714
715 /**
716 * Performs a side-effect in a safe manner w/r/t the current Kairos transaction.
717 *
718 * Specifically, [block] is deferred to the end of the current transaction, and is only actually
719 * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
720 * -Latest combinator, for example.
721 *
722 * ```
723 * fun BuildScope.effect(
724 * context: CoroutineContext = EmptyCoroutineContext,
725 * block: EffectScope.() -> Unit,
726 * ): Job =
727 * launchScope { now.observe(context) { block() } }
728 * ```
729 */
730 @ExperimentalKairosApi
BuildScopenull731 fun BuildScope.effect(
732 context: CoroutineContext = EmptyCoroutineContext,
733 block: EffectScope.() -> Unit,
734 ): Job = launchScope { now.observe(context) { block() } }
735
736 /**
737 * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
738 *
739 * This coroutine is not actually started until the *end* of the current Kairos transaction. This is
740 * done because the current [BuildScope] might be deactivated within this transaction, perhaps due
741 * to a -Latest combinator. If this happens, then the coroutine will never actually be started.
742 *
743 * ```
744 * fun BuildScope.launchEffect(block: suspend KairosScope.() -> Unit): Job =
745 * effect { effectCoroutineScope.launch { block() } }
746 * ```
747 */
748 @ExperimentalKairosApi
BuildScopenull749 fun BuildScope.launchEffect(block: suspend KairosCoroutineScope.() -> Unit): Job =
750 asyncEffect(block)
751
752 /**
753 * Launches [block] in a new coroutine, returning the result as a [Deferred].
754 *
755 * This coroutine is not actually started until the *end* of the current Kairos transaction. This is
756 * done because the current [BuildScope] might be deactivated within this transaction, perhaps due
757 * to a -Latest combinator. If this happens, then the coroutine will never actually be started.
758 *
759 * Shorthand for:
760 * ```
761 * fun <R> BuildScope.asyncEffect(block: suspend KairosScope.() -> R): Deferred<R> =
762 * CompletableDeferred<R>.apply {
763 * effect { effectCoroutineScope.launch { complete(block()) } }
764 * }
765 * .await()
766 * ```
767 */
768 @ExperimentalKairosApi
769 fun <R> BuildScope.asyncEffect(block: suspend KairosCoroutineScope.() -> R): Deferred<R> {
770 val result = CompletableDeferred<R>()
771 val job = effect { launch { result.complete(block()) } }
772 val handle = job.invokeOnCompletion { result.cancel() }
773 result.invokeOnCompletion {
774 handle.dispose()
775 job.cancel()
776 }
777 return result
778 }
779
780 /** Like [BuildScope.asyncScope], but ignores the result of [block]. */
781 @ExperimentalKairosApi
BuildScopenull782 fun BuildScope.launchScope(block: BuildSpec<*>): Job = asyncScope(block).second
783
784 /**
785 * Creates an instance of an [Events] with elements that are emitted from [builder].
786 *
787 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
788 * [MutableState].
789 *
790 * By default, [builder] is only running while the returned [Events] is being
791 * [observed][BuildScope.observe]. If you want it to run at all times, simply add a no-op observer:
792 * ```
793 * events { ... }.apply { observe() }
794 * ```
795 *
796 * In the event of backpressure, emissions are *coalesced* into batches. When a value is
797 * [emitted][CoalescingEventProducerScope.emit] from [builder], it is merged into the batch via
798 * [coalesce]. Once the batch is consumed by the Kairos network in the next transaction, the batch
799 * is reset back to [initialValue].
800 */
801 @ExperimentalKairosApi
802 fun <In, Out> BuildScope.coalescingEvents(
803 initialValue: Out,
804 coalesce: (old: Out, new: In) -> Out,
805 builder: suspend CoalescingEventProducerScope<In>.() -> Unit,
806 ): Events<Out> = coalescingEvents(getInitialValue = { initialValue }, coalesce, builder)
807
808 /**
809 * Creates an instance of an [Events] with elements that are emitted from [builder].
810 *
811 * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
812 * [MutableState].
813 *
814 * By default, [builder] is only running while the returned [Events] is being
815 * [observed][BuildScope.observe]. If you want it to run at all times, simply add a no-op observer:
816 * ```
817 * events { ... }.apply { observe() }
818 * ```
819 *
820 * In the event of backpressure, emissions are *conflated*; any older emissions are dropped and only
821 * the most recent emission will be used when the Kairos network is ready.
822 */
823 @ExperimentalKairosApi
conflatedEventsnull824 fun <T> BuildScope.conflatedEvents(
825 builder: suspend CoalescingEventProducerScope<T>.() -> Unit
826 ): Events<T> =
827 coalescingEvents<T, Any?>(initialValue = Any(), coalesce = { _, new -> new }, builder = builder)
<lambda>null828 .mapCheap {
829 @Suppress("UNCHECKED_CAST")
830 it as T
831 }
832
833 /** Scope for emitting to a [BuildScope.coalescingEvents]. */
interfacenull834 fun interface CoalescingEventProducerScope<in T> {
835 /**
836 * Inserts [value] into the current batch, enqueueing it for emission from this [Events] if not
837 * already pending.
838 *
839 * Backpressure occurs when [emit] is called while the Kairos network is currently in a
840 * transaction; if called multiple times, then emissions will be coalesced into a single batch
841 * that is then processed when the network is ready.
842 */
843 fun emit(value: T)
844 }
845
846 /** Scope for emitting to a [BuildScope.events]. */
interfacenull847 fun interface EventProducerScope<in T> {
848 /**
849 * Emits a [value] to this [Events], suspending the caller until the Kairos transaction
850 * containing the emission has completed.
851 */
852 suspend fun emit(value: T)
853 }
854
855 /**
856 * Suspends forever. Upon cancellation, runs [block]. Useful for unregistering callbacks inside of
857 * [BuildScope.events] and [BuildScope.coalescingEvents].
858 */
awaitClosenull859 suspend fun awaitClose(block: () -> Unit): Nothing =
860 try {
861 awaitCancellation()
862 } finally {
863 block()
864 }
865
866 /**
867 * Runs [spec] in this [BuildScope], and then re-runs it whenever [rebuildSignal] emits. Returns a
868 * [State] that holds the result of the currently-active [BuildSpec].
869 */
870 @ExperimentalKairosApi
rebuildOnnull871 fun <A> BuildScope.rebuildOn(rebuildSignal: Events<*>, spec: BuildSpec<A>): State<A> =
872 rebuildSignal.map { spec }.holdLatestSpec(spec)
873