1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.kairos
18 
19 import com.android.systemui.kairos.util.Maybe
20 import com.android.systemui.kairos.util.map
21 import kotlin.coroutines.CoroutineContext
22 import kotlin.coroutines.EmptyCoroutineContext
23 import kotlinx.coroutines.CompletableDeferred
24 import kotlinx.coroutines.Deferred
25 import kotlinx.coroutines.DisposableHandle
26 import kotlinx.coroutines.Job
27 import kotlinx.coroutines.awaitCancellation
28 import kotlinx.coroutines.flow.Flow
29 import kotlinx.coroutines.flow.FlowCollector
30 import kotlinx.coroutines.flow.MutableSharedFlow
31 import kotlinx.coroutines.flow.MutableStateFlow
32 import kotlinx.coroutines.flow.SharedFlow
33 import kotlinx.coroutines.flow.StateFlow
34 import kotlinx.coroutines.flow.dropWhile
35 import kotlinx.coroutines.flow.scan
36 
37 /** A computation that can modify the Kairos network. */
38 typealias BuildSpec<A> = BuildScope.() -> A
39 
40 /**
41  * Constructs a [BuildSpec]. The passed [block] will be invoked with a [BuildScope] that can be used
42  * to perform network-building operations, including adding new inputs and outputs to the network,
43  * as well as all operations available in [TransactionScope].
44  */
45 @ExperimentalKairosApi
46 @Suppress("NOTHING_TO_INLINE")
47 inline fun <A> buildSpec(noinline block: BuildScope.() -> A): BuildSpec<A> = block
48 
49 /** Applies the [BuildSpec] within this [BuildScope]. */
50 @ExperimentalKairosApi
51 inline operator fun <A> BuildScope.invoke(block: BuildScope.() -> A) = run(block)
52 
53 /** Operations that add inputs and outputs to a Kairos network. */
54 @ExperimentalKairosApi
55 interface BuildScope : HasNetwork, StateScope {
56 
57     /**
58      * Defers invoking [block] until after the current [BuildScope] code-path completes, returning a
59      * [DeferredValue] that can be used to reference the result.
60      *
61      * Useful for recursive definitions.
62      *
63      * @see deferredBuildScopeAction
64      * @see DeferredValue
65      */
66     fun <R> deferredBuildScope(block: BuildScope.() -> R): DeferredValue<R>
67 
68     /**
69      * Defers invoking [block] until after the current [BuildScope] code-path completes.
70      *
71      * Useful for recursive definitions.
72      *
73      * @see deferredBuildScope
74      */
75     fun deferredBuildScopeAction(block: BuildScope.() -> Unit)
76 
77     /**
78      * Returns an [Events] containing the results of applying [transform] to each value of the
79      * original [Events].
80      *
81      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
82      * Unlike [mapLatestBuild], these modifications are not undone with each subsequent emission of
83      * the original [Events].
84      *
85      * **NOTE:** This API does not [observe] the original [Events], meaning that unless the returned
86      * (or a downstream) [Events] is observed separately, [transform] will not be invoked, and no
87      * internal side-effects will occur.
88      */
89     fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B>
90 
91     /**
92      * Invokes [block] whenever this [Events] emits a value, allowing side-effects to be safely
93      * performed in reaction to the emission.
94      *
95      * Specifically, [block] is deferred to the end of the transaction, and is only actually
96      * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
97      * -Latest combinator, for example.
98      *
99      * [Disposing][DisposableHandle.dispose] of the returned [DisposableHandle] will stop the
100      * observation of new emissions. It will however *not* cancel any running effects from previous
101      * emissions. To achieve this behavior, use [launchScope] or [asyncScope] to create a child
102      * build scope:
103      * ```
104      *   val job = launchScope {
105      *       events.observe { x ->
106      *           launchEffect { longRunningEffect(x) }
107      *       }
108      *   }
109      *   // cancels observer and any running effects:
110      *   job.cancel()
111      * ```
112      */
113     // TODO: remove disposable handle return? might add more confusion than convenience
114     fun <A> Events<A>.observe(
115         coroutineContext: CoroutineContext = EmptyCoroutineContext,
116         block: EffectScope.(A) -> Unit = {},
117     ): DisposableHandle
118 
119     /**
120      * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
121      * original [Events], and a [DeferredValue] containing the result of applying [initialSpecs]
122      * immediately.
123      *
124      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
125      * same key are undone (any registered [observers][observe] are unregistered, and any pending
126      * [side-effects][effect] are cancelled).
127      *
128      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
129      * previously-active [BuildSpec] will be undone with no replacement.
130      */
131     fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey(
132         initialSpecs: DeferredValue<Map<K, BuildSpec<B>>>,
133         numKeys: Int? = null,
134     ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>>
135 
136     /**
137      * Creates an instance of an [Events] with elements that are emitted from [builder].
138      *
139      * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
140      * provided [EventProducerScope].
141      *
142      * By default, [builder] is only running while the returned [Events] is being
143      * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
144      * ```
145      *   events { ... }.apply { observe() }
146      * ```
147      */
148     // TODO: eventually this should be defined on KairosNetwork + an extension on HasNetwork
149     //  - will require modifying InputNode so that it can be manually killed, as opposed to using
150     //    takeUntil (which requires a StateScope).
151     fun <T> events(builder: suspend EventProducerScope<T>.() -> Unit): Events<T>
152 
153     /**
154      * Creates an instance of an [Events] with elements that are emitted from [builder].
155      *
156      * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the
157      * provided [CoalescingEventProducerScope].
158      *
159      * By default, [builder] is only running while the returned [Events] is being
160      * [observed][observe]. If you want it to run at all times, simply add a no-op observer:
161      * ```
162      *   events { ... }.apply { observe() }
163      * ```
164      *
165      * In the event of backpressure, emissions are *coalesced* into batches. When a value is
166      * [emitted][CoalescingEventProducerScope.emit] from [builder], it is merged into the batch via
167      * [coalesce]. Once the batch is consumed by the kairos network in the next transaction, the
168      * batch is reset back to [getInitialValue].
169      */
170     // TODO: see TODO for [events]
171     fun <In, Out> coalescingEvents(
172         getInitialValue: () -> Out,
173         coalesce: (old: Out, new: In) -> Out,
174         builder: suspend CoalescingEventProducerScope<In>.() -> Unit,
175     ): Events<Out>
176 
177     /**
178      * Creates a new [BuildScope] that is a child of this one.
179      *
180      * This new scope can be manually cancelled via the returned [Job], or will be cancelled
181      * automatically when its parent is cancelled. Cancellation will unregister all
182      * [observers][observe] and cancel all scheduled [effects][effect].
183      *
184      * The return value from [block] can be accessed via the returned [DeferredValue].
185      */
186     // TODO: return a DisposableHandle instead of Job?
187     fun <A> asyncScope(block: BuildSpec<A>): Pair<DeferredValue<A>, Job>
188 
189     // TODO: once we have context params, these can all become extensions:
190 
191     /**
192      * Returns an [Events] containing the results of applying the given [transform] function to each
193      * value of the original [Events].
194      *
195      * Unlike [Events.map], [transform] can perform arbitrary asynchronous code. This code is run
196      * outside of the current Kairos transaction; when [transform] returns, the returned value is
197      * emitted from the result [Events] in a new transaction.
198      *
199      * ```
200      *     fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> =
201      *         mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten()
202      * ```
203      */
204     fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> =
205         mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten()
206 
207     /**
208      * Invokes [block] whenever this [Events] emits a value. [block] receives an [BuildScope] that
209      * can be used to make further modifications to the Kairos network, and/or perform side-effects
210      * via [effect].
211      *
212      * @see observe
213      */
214     fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
215         mapBuild(block).observe()
216 
217     /**
218      * Returns a [StateFlow] whose [value][StateFlow.value] tracks the current
219      * [value of this State][State.sample], and will emit at the same rate as [State.changes].
220      */
221     fun <A> State<A>.toStateFlow(): StateFlow<A> {
222         val innerStateFlow = MutableStateFlow(sampleDeferred())
223         changes.observe { innerStateFlow.value = deferredOf(it) }
224         return object : StateFlow<A> {
225             override val replayCache: List<A>
226                 get() = innerStateFlow.replayCache.map { it.value }
227 
228             override val value: A
229                 get() = innerStateFlow.value.value
230 
231             override suspend fun collect(collector: FlowCollector<A>): Nothing {
232                 innerStateFlow.collect { collector.emit(it.value) }
233             }
234         }
235     }
236 
237     /**
238      * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits the current
239      * [value][State.sample] of this [State] followed by all [changes].
240      */
241     fun <A> State<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
242         val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
243         deferredBuildScope {
244             result.tryEmit(sample())
245             changes.observe { a -> result.tryEmit(a) }
246         }
247         return result
248     }
249 
250     /**
251      * Returns a [SharedFlow] configured with a replay cache of size [replay] that emits values
252      * whenever this [Events] emits.
253      */
254     fun <A> Events<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
255         val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
256         observe { a -> result.tryEmit(a) }
257         return result
258     }
259 
260     /**
261      * Returns a [State] that holds onto the value returned by applying the most recently emitted
262      * [BuildSpec] from the original [Events], or the value returned by applying [initialSpec] if
263      * nothing has been emitted since it was constructed.
264      *
265      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
266      * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
267      * are cancelled).
268      */
269     fun <A> Events<BuildSpec<A>>.holdLatestSpec(initialSpec: BuildSpec<A>): State<A> {
270         val (changes: Events<A>, initApplied: DeferredValue<A>) = applyLatestSpec(initialSpec)
271         return changes.holdStateDeferred(initApplied)
272     }
273 
274     /**
275      * Returns a [State] containing the value returned by applying the [BuildSpec] held by the
276      * original [State].
277      *
278      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
279      * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
280      * are cancelled).
281      */
282     fun <A> State<BuildSpec<A>>.applyLatestSpec(): State<A> {
283         val (appliedChanges: Events<A>, init: DeferredValue<A>) =
284             changes.applyLatestSpec(buildSpec { sample().applySpec() })
285         return appliedChanges.holdStateDeferred(init)
286     }
287 
288     /**
289      * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
290      * original [Events].
291      *
292      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
293      * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
294      * are cancelled).
295      */
296     fun <A> Events<BuildSpec<A>>.applyLatestSpec(): Events<A> = applyLatestSpec(buildSpec {}).first
297 
298     /**
299      * Returns an [Events] that switches to a new [Events] produced by [transform] every time the
300      * original [Events] emits a value.
301      *
302      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
303      * When the original [Events] emits a new value, those changes are undone (any registered
304      * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
305      */
306     fun <A, B> Events<A>.flatMapLatestBuild(transform: BuildScope.(A) -> Events<B>): Events<B> =
307         mapCheap { buildSpec { transform(it) } }.applyLatestSpec().flatten()
308 
309     /**
310      * Returns a [State] by applying [transform] to the value held by the original [State].
311      *
312      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
313      * When the value held by the original [State] changes, those changes are undone (any registered
314      * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
315      */
316     fun <A, B> State<A>.flatMapLatestBuild(transform: BuildScope.(A) -> State<B>): State<B> =
317         mapLatestBuild { transform(it) }.flatten()
318 
319     /**
320      * Returns a [State] that transforms the value held inside this [State] by applying it to the
321      * [transform].
322      *
323      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
324      * When the value held by the original [State] changes, those changes are undone (any registered
325      * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
326      */
327     fun <A, B> State<A>.mapLatestBuild(transform: BuildScope.(A) -> B): State<B> =
328         mapCheapUnsafe { buildSpec { transform(it) } }.applyLatestSpec()
329 
330     /**
331      * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
332      * original [Events], and a [DeferredValue] containing the result of applying [initialSpec]
333      * immediately.
334      *
335      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] are undone
336      * (any registered [observers][observe] are unregistered, and any pending [side-effects][effect]
337      * are cancelled).
338      */
339     fun <A : Any?, B> Events<BuildSpec<B>>.applyLatestSpec(
340         initialSpec: BuildSpec<A>
341     ): Pair<Events<B>, DeferredValue<A>> {
342         val (events, result) =
343             mapCheap { spec -> mapOf(Unit to Maybe.present(spec)) }
344                 .applyLatestSpecForKey(initialSpecs = mapOf(Unit to initialSpec), numKeys = 1)
345         val outEvents: Events<B> =
346             events.mapMaybe {
347                 checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" }
348             }
349         val outInit: DeferredValue<A> = deferredBuildScope {
350             val initResult: Map<Unit, A> = result.value
351             check(Unit in initResult) {
352                 "applyLatest: expected initial result, but none present in: $initResult"
353             }
354             @Suppress("UNCHECKED_CAST")
355             initResult.getOrDefault(Unit) { null } as A
356         }
357         return Pair(outEvents, outInit)
358     }
359 
360     /**
361      * Returns an [Events] containing the results of applying [transform] to each value of the
362      * original [Events].
363      *
364      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
365      * With each invocation of [transform], changes from the previous invocation are undone (any
366      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
367      * cancelled).
368      */
369     fun <A, B> Events<A>.mapLatestBuild(transform: BuildScope.(A) -> B): Events<B> =
370         mapCheap { buildSpec { transform(it) } }.applyLatestSpec()
371 
372     /**
373      * Returns an [Events] containing the results of applying [transform] to each value of the
374      * original [Events], and a [DeferredValue] containing the result of applying [transform] to
375      * [initialValue] immediately.
376      *
377      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
378      * With each invocation of [transform], changes from the previous invocation are undone (any
379      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
380      * cancelled).
381      */
382     fun <A, B> Events<A>.mapLatestBuild(
383         initialValue: A,
384         transform: BuildScope.(A) -> B,
385     ): Pair<Events<B>, DeferredValue<B>> =
386         mapLatestBuildDeferred(deferredOf(initialValue), transform)
387 
388     /**
389      * Returns an [Events] containing the results of applying [transform] to each value of the
390      * original [Events], and a [DeferredValue] containing the result of applying [transform] to
391      * [initialValue] immediately.
392      *
393      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
394      * With each invocation of [transform], changes from the previous invocation are undone (any
395      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
396      * cancelled).
397      */
398     fun <A, B> Events<A>.mapLatestBuildDeferred(
399         initialValue: DeferredValue<A>,
400         transform: BuildScope.(A) -> B,
401     ): Pair<Events<B>, DeferredValue<B>> =
402         mapCheap { buildSpec { transform(it) } }
403             .applyLatestSpec(initialSpec = buildSpec { transform(initialValue.value) })
404 
405     /**
406      * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
407      * original [Events], and a [DeferredValue] containing the result of applying [initialSpecs]
408      * immediately.
409      *
410      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
411      * same key are undone (any registered [observers][observe] are unregistered, and any pending
412      * [side-effects][effect] are cancelled).
413      *
414      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
415      * previously-active [BuildSpec] will be undone with no replacement.
416      */
417     fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey(
418         initialSpecs: Map<K, BuildSpec<B>>,
419         numKeys: Int? = null,
420     ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> =
421         applyLatestSpecForKey(deferredOf(initialSpecs), numKeys)
422 
423     /**
424      * Returns an [Incremental] containing the results of applying each [BuildSpec] emitted from the
425      * original [Incremental].
426      *
427      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
428      * same key are undone (any registered [observers][observe] are unregistered, and any pending
429      * [side-effects][effect] are cancelled).
430      *
431      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
432      * previously-active [BuildSpec] will be undone with no replacement.
433      */
434     fun <K, V> Incremental<K, BuildSpec<V>>.applyLatestSpecForKey(
435         numKeys: Int? = null
436     ): Incremental<K, V> {
437         val (events, initial) = updates.applyLatestSpecForKey(sampleDeferred(), numKeys)
438         return events.foldStateMapIncrementally(initial)
439     }
440 
441     /**
442      * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
443      * original [Events].
444      *
445      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
446      * same key are undone (any registered [observers][observe] are unregistered, and any pending
447      * [side-effects][effect] are cancelled).
448      *
449      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
450      * previously-active [BuildSpec] will be undone with no replacement.
451      */
452     fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.applyLatestSpecForKey(
453         numKeys: Int? = null
454     ): Events<Map<K, Maybe<V>>> =
455         applyLatestSpecForKey<K, V, Nothing>(deferredOf(emptyMap()), numKeys).first
456 
457     /**
458      * Returns a [State] containing the latest results of applying each [BuildSpec] emitted from the
459      * original [Events].
460      *
461      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
462      * same key are undone (any registered [observers][observe] are unregistered, and any pending
463      * [side-effects][effect] are cancelled).
464      *
465      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
466      * previously-active [BuildSpec] will be undone with no replacement.
467      */
468     fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey(
469         initialSpecs: DeferredValue<Map<K, BuildSpec<V>>>,
470         numKeys: Int? = null,
471     ): Incremental<K, V> {
472         val (changes, initialValues) = applyLatestSpecForKey(initialSpecs, numKeys)
473         return changes.foldStateMapIncrementally(initialValues)
474     }
475 
476     /**
477      * Returns a [State] containing the latest results of applying each [BuildSpec] emitted from the
478      * original [Events].
479      *
480      * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the
481      * same key are undone (any registered [observers][observe] are unregistered, and any pending
482      * [side-effects][effect] are cancelled).
483      *
484      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
485      * previously-active [BuildSpec] will be undone with no replacement.
486      */
487     fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey(
488         initialSpecs: Map<K, BuildSpec<V>> = emptyMap(),
489         numKeys: Int? = null,
490     ): Incremental<K, V> = holdLatestSpecForKey(deferredOf(initialSpecs), numKeys)
491 
492     /**
493      * Returns an [Events] containing the results of applying [transform] to each value of the
494      * original [Events], and a [DeferredValue] containing the result of applying [transform] to
495      * [initialValues] immediately.
496      *
497      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
498      * With each invocation of [transform], changes from the previous invocation are undone (any
499      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
500      * cancelled).
501      *
502      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
503      * previously-active [BuildScope] will be undone with no replacement.
504      */
505     fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey(
506         initialValues: DeferredValue<Map<K, A>>,
507         numKeys: Int? = null,
508         transform: BuildScope.(K, A) -> B,
509     ): Pair<Events<Map<K, Maybe<B>>>, DeferredValue<Map<K, B>>> =
510         map { patch -> patch.mapValues { (k, v) -> v.map { buildSpec { transform(k, it) } } } }
511             .applyLatestSpecForKey(
512                 deferredBuildScope {
513                     initialValues.value.mapValues { (k, v) -> buildSpec { transform(k, v) } }
514                 },
515                 numKeys = numKeys,
516             )
517 
518     /**
519      * Returns an [Events] containing the results of applying [transform] to each value of the
520      * original [Events], and a [DeferredValue] containing the result of applying [transform] to
521      * [initialValues] immediately.
522      *
523      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
524      * With each invocation of [transform], changes from the previous invocation are undone (any
525      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
526      * cancelled).
527      *
528      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
529      * previously-active [BuildScope] will be undone with no replacement.
530      */
531     fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey(
532         initialValues: Map<K, A>,
533         numKeys: Int? = null,
534         transform: BuildScope.(K, A) -> B,
535     ): Pair<Events<Map<K, Maybe<B>>>, DeferredValue<Map<K, B>>> =
536         mapLatestBuildForKey(deferredOf(initialValues), numKeys, transform)
537 
538     /**
539      * Returns an [Events] containing the results of applying [transform] to each value of the
540      * original [Events].
541      *
542      * [transform] can perform modifications to the Kairos network via its [BuildScope] receiver.
543      * With each invocation of [transform], changes from the previous invocation are undone (any
544      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
545      * cancelled).
546      *
547      * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the
548      * previously-active [BuildScope] will be undone with no replacement.
549      */
550     fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey(
551         numKeys: Int? = null,
552         transform: BuildScope.(K, A) -> B,
553     ): Events<Map<K, Maybe<B>>> = mapLatestBuildForKey(emptyMap(), numKeys, transform).first
554 
555     /** Returns a [Deferred] containing the next value to be emitted from this [Events]. */
556     fun <R> Events<R>.nextDeferred(): Deferred<R> {
557         lateinit var next: CompletableDeferred<R>
558         val job = launchScope { nextOnly().observe { next.complete(it) } }
559         next = CompletableDeferred(parent = job)
560         return next
561     }
562 
563     /** Returns a [State] that reflects the [StateFlow.value] of this [StateFlow]. */
564     fun <A> StateFlow<A>.toState(): State<A> {
565         val initial = value
566         return events { dropWhile { it == initial }.collect { emit(it) } }.holdState(initial)
567     }
568 
569     /** Returns an [Events] that emits whenever this [Flow] emits. */
570     fun <A> Flow<A>.toEvents(): Events<A> = events { collect { emit(it) } }
571 
572     /**
573      * Shorthand for:
574      * ```
575      * flow.toEvents().holdState(initialValue)
576      * ```
577      */
578     fun <A> Flow<A>.toState(initialValue: A): State<A> = toEvents().holdState(initialValue)
579 
580     /**
581      * Shorthand for:
582      * ```
583      * flow.scan(initialValue, operation).toEvents().holdState(initialValue)
584      * ```
585      */
586     fun <A, B> Flow<A>.scanToState(initialValue: B, operation: (B, A) -> B): State<B> =
587         scan(initialValue, operation).toEvents().holdState(initialValue)
588 
589     /**
590      * Shorthand for:
591      * ```
592      * flow.scan(initialValue) { a, f -> f(a) }.toEvents().holdState(initialValue)
593      * ```
594      */
595     fun <A> Flow<(A) -> A>.scanToState(initialValue: A): State<A> =
596         scanToState(initialValue) { a, f -> f(a) }
597 
598     /**
599      * Invokes [block] whenever this [Events] emits a value. [block] receives an [BuildScope] that
600      * can be used to make further modifications to the Kairos network, and/or perform side-effects
601      * via [effect].
602      *
603      * With each invocation of [block], changes from the previous invocation are undone (any
604      * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
605      * cancelled).
606      */
607     fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
608         mapLatestBuild { block(it) }.observe()
609 
610     /**
611      * Invokes [block] whenever this [Events] emits a value, allowing side-effects to be safely
612      * performed in reaction to the emission.
613      *
614      * With each invocation of [block], running effects from the previous invocation are cancelled.
615      */
616     fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle {
617         var innerJob: Job? = null
618         return observeBuild {
619             innerJob?.cancel()
620             innerJob = effect { block(it) }
621         }
622     }
623 
624     /**
625      * Invokes [block] with the value held by this [State], allowing side-effects to be safely
626      * performed in reaction to the state changing.
627      *
628      * With each invocation of [block], running effects from the previous invocation are cancelled.
629      */
630     fun <A> State<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job = launchScope {
631         var innerJob = effect { block(sample()) }
632         changes.observeBuild {
633             innerJob.cancel()
634             innerJob = effect { block(it) }
635         }
636     }
637 
638     /**
639      * Applies [block] to the value held by this [State]. [block] receives an [BuildScope] that can
640      * be used to make further modifications to the Kairos network, and/or perform side-effects via
641      * [effect].
642      *
643      * [block] can perform modifications to the Kairos network via its [BuildScope] receiver. With
644      * each invocation of [block], changes from the previous invocation are undone (any registered
645      * [observers][observe] are unregistered, and any pending [side-effects][effect] are cancelled).
646      */
647     fun <A> State<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
648         var innerJob: Job = launchScope { block(sample()) }
649         changes.observeBuild {
650             innerJob.cancel()
651             innerJob = launchScope { block(it) }
652         }
653     }
654 
655     /** Applies the [BuildSpec] within this [BuildScope]. */
656     fun <A> BuildSpec<A>.applySpec(): A = this()
657 
658     /**
659      * Applies the [BuildSpec] within this [BuildScope], returning the result as an [DeferredValue].
660      */
661     fun <A> BuildSpec<A>.applySpecDeferred(): DeferredValue<A> = deferredBuildScope { applySpec() }
662 
663     /**
664      * Invokes [block] on the value held in this [State]. [block] receives an [BuildScope] that can
665      * be used to make further modifications to the Kairos network, and/or perform side-effects via
666      * [effect].
667      *
668      * ```
669      *     fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
670      *         block(sample())
671      *         changes.observeBuild(block)
672      *     }
673      * ```
674      */
675     fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
676         block(sample())
677         changes.observeBuild(block)
678     }
679 
680     /**
681      * Invokes [block] with the current value of this [State], re-invoking whenever it changes,
682      * allowing side-effects to be safely performed in reaction value changing.
683      *
684      * Specifically, [block] is deferred to the end of the transaction, and is only actually
685      * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
686      * -Latest combinator, for example.
687      *
688      * If the [State] is changing within the *current* transaction (i.e. [changes] is presently
689      * emitting) then [block] will be invoked for the first time with the new value; otherwise, it
690      * will be invoked with the [current][sample] value.
691      */
692     fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle =
693         now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) }
694 }
695 
696 /**
697  * Returns an [Events] that emits the result of [block] once it completes. [block] is evaluated
698  * outside of the current Kairos transaction; when it completes, the returned [Events] emits in a
699  * new transaction.
700  *
701  * ```
702  *   fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
703  *       events { emit(block()) }.apply { observe() }
704  * ```
705  */
706 @ExperimentalKairosApi
asyncEventnull707 fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
708     events {
709             // TODO: if block completes synchronously, it would be nice to emit within this
710             //  transaction
711             emit(block())
712         }
<lambda>null713         .apply { observe() }
714 
715 /**
716  * Performs a side-effect in a safe manner w/r/t the current Kairos transaction.
717  *
718  * Specifically, [block] is deferred to the end of the current transaction, and is only actually
719  * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
720  * -Latest combinator, for example.
721  *
722  * ```
723  *   fun BuildScope.effect(
724  *       context: CoroutineContext = EmptyCoroutineContext,
725  *       block: EffectScope.() -> Unit,
726  *   ): Job =
727  *       launchScope { now.observe(context) { block() } }
728  * ```
729  */
730 @ExperimentalKairosApi
BuildScopenull731 fun BuildScope.effect(
732     context: CoroutineContext = EmptyCoroutineContext,
733     block: EffectScope.() -> Unit,
734 ): Job = launchScope { now.observe(context) { block() } }
735 
736 /**
737  * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
738  *
739  * This coroutine is not actually started until the *end* of the current Kairos transaction. This is
740  * done because the current [BuildScope] might be deactivated within this transaction, perhaps due
741  * to a -Latest combinator. If this happens, then the coroutine will never actually be started.
742  *
743  * ```
744  *   fun BuildScope.launchEffect(block: suspend KairosScope.() -> Unit): Job =
745  *       effect { effectCoroutineScope.launch { block() } }
746  * ```
747  */
748 @ExperimentalKairosApi
BuildScopenull749 fun BuildScope.launchEffect(block: suspend KairosCoroutineScope.() -> Unit): Job =
750     asyncEffect(block)
751 
752 /**
753  * Launches [block] in a new coroutine, returning the result as a [Deferred].
754  *
755  * This coroutine is not actually started until the *end* of the current Kairos transaction. This is
756  * done because the current [BuildScope] might be deactivated within this transaction, perhaps due
757  * to a -Latest combinator. If this happens, then the coroutine will never actually be started.
758  *
759  * Shorthand for:
760  * ```
761  *   fun <R> BuildScope.asyncEffect(block: suspend KairosScope.() -> R): Deferred<R> =
762  *       CompletableDeferred<R>.apply {
763  *               effect { effectCoroutineScope.launch { complete(block()) } }
764  *           }
765  *           .await()
766  * ```
767  */
768 @ExperimentalKairosApi
769 fun <R> BuildScope.asyncEffect(block: suspend KairosCoroutineScope.() -> R): Deferred<R> {
770     val result = CompletableDeferred<R>()
771     val job = effect { launch { result.complete(block()) } }
772     val handle = job.invokeOnCompletion { result.cancel() }
773     result.invokeOnCompletion {
774         handle.dispose()
775         job.cancel()
776     }
777     return result
778 }
779 
780 /** Like [BuildScope.asyncScope], but ignores the result of [block]. */
781 @ExperimentalKairosApi
BuildScopenull782 fun BuildScope.launchScope(block: BuildSpec<*>): Job = asyncScope(block).second
783 
784 /**
785  * Creates an instance of an [Events] with elements that are emitted from [builder].
786  *
787  * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
788  * [MutableState].
789  *
790  * By default, [builder] is only running while the returned [Events] is being
791  * [observed][BuildScope.observe]. If you want it to run at all times, simply add a no-op observer:
792  * ```
793  * events { ... }.apply { observe() }
794  * ```
795  *
796  * In the event of backpressure, emissions are *coalesced* into batches. When a value is
797  * [emitted][CoalescingEventProducerScope.emit] from [builder], it is merged into the batch via
798  * [coalesce]. Once the batch is consumed by the Kairos network in the next transaction, the batch
799  * is reset back to [initialValue].
800  */
801 @ExperimentalKairosApi
802 fun <In, Out> BuildScope.coalescingEvents(
803     initialValue: Out,
804     coalesce: (old: Out, new: In) -> Out,
805     builder: suspend CoalescingEventProducerScope<In>.() -> Unit,
806 ): Events<Out> = coalescingEvents(getInitialValue = { initialValue }, coalesce, builder)
807 
808 /**
809  * Creates an instance of an [Events] with elements that are emitted from [builder].
810  *
811  * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the provided
812  * [MutableState].
813  *
814  * By default, [builder] is only running while the returned [Events] is being
815  * [observed][BuildScope.observe]. If you want it to run at all times, simply add a no-op observer:
816  * ```
817  * events { ... }.apply { observe() }
818  * ```
819  *
820  * In the event of backpressure, emissions are *conflated*; any older emissions are dropped and only
821  * the most recent emission will be used when the Kairos network is ready.
822  */
823 @ExperimentalKairosApi
conflatedEventsnull824 fun <T> BuildScope.conflatedEvents(
825     builder: suspend CoalescingEventProducerScope<T>.() -> Unit
826 ): Events<T> =
827     coalescingEvents<T, Any?>(initialValue = Any(), coalesce = { _, new -> new }, builder = builder)
<lambda>null828         .mapCheap {
829             @Suppress("UNCHECKED_CAST")
830             it as T
831         }
832 
833 /** Scope for emitting to a [BuildScope.coalescingEvents]. */
interfacenull834 fun interface CoalescingEventProducerScope<in T> {
835     /**
836      * Inserts [value] into the current batch, enqueueing it for emission from this [Events] if not
837      * already pending.
838      *
839      * Backpressure occurs when [emit] is called while the Kairos network is currently in a
840      * transaction; if called multiple times, then emissions will be coalesced into a single batch
841      * that is then processed when the network is ready.
842      */
843     fun emit(value: T)
844 }
845 
846 /** Scope for emitting to a [BuildScope.events]. */
interfacenull847 fun interface EventProducerScope<in T> {
848     /**
849      * Emits a [value] to this [Events], suspending the caller until the Kairos transaction
850      * containing the emission has completed.
851      */
852     suspend fun emit(value: T)
853 }
854 
855 /**
856  * Suspends forever. Upon cancellation, runs [block]. Useful for unregistering callbacks inside of
857  * [BuildScope.events] and [BuildScope.coalescingEvents].
858  */
awaitClosenull859 suspend fun awaitClose(block: () -> Unit): Nothing =
860     try {
861         awaitCancellation()
862     } finally {
863         block()
864     }
865 
866 /**
867  * Runs [spec] in this [BuildScope], and then re-runs it whenever [rebuildSignal] emits. Returns a
868  * [State] that holds the result of the currently-active [BuildSpec].
869  */
870 @ExperimentalKairosApi
rebuildOnnull871 fun <A> BuildScope.rebuildOn(rebuildSignal: Events<*>, spec: BuildSpec<A>): State<A> =
872     rebuildSignal.map { spec }.holdLatestSpec(spec)
873