• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.kairos
18 
19 import com.android.systemui.kairos.util.MapPatch
20 import com.android.systemui.kairos.util.Maybe
21 import com.android.systemui.kairos.util.WithPrev
22 import com.android.systemui.kairos.util.map
23 import com.android.systemui.kairos.util.mapMaybeValues
24 import com.android.systemui.kairos.util.zipWith
25 
26 // TODO: caching story? should each Scope have a cache of applied Stateful instances?
27 /** A computation that can accumulate [Events] into [State]. */
28 typealias Stateful<R> = StateScope.() -> R
29 
30 /**
31  * Returns a [Stateful] that, when [applied][StateScope.applyStateful], invokes [block] with the
32  * applier's [StateScope].
33  */
34 @ExperimentalKairosApi
35 @Suppress("NOTHING_TO_INLINE")
36 inline fun <A> statefully(noinline block: StateScope.() -> A): Stateful<A> = block
37 
38 /**
39  * Operations that accumulate state within the Kairos network.
40  *
41  * State accumulation is an ongoing process that has a lifetime. Use `-Latest` combinators, such as
42  * [mapLatestStateful], to create smaller, nested lifecycles so that accumulation isn't running
43  * longer than needed.
44  */
45 @ExperimentalKairosApi
46 interface StateScope : TransactionScope {
47 
48     /**
49      * Defers invoking [block] until after the current [StateScope] code-path completes, returning a
50      * [DeferredValue] that can be used to reference the result.
51      *
52      * Useful for recursive definitions.
53      *
54      * @see DeferredValue
55      */
56     fun <A> deferredStateScope(block: StateScope.() -> A): DeferredValue<A>
57 
58     /**
59      * Returns a [State] that holds onto the most recently emitted value from this [Events], or
60      * [initialValue] if nothing has been emitted since it was constructed.
61      *
62      * Note that the value contained within the [State] is not updated until *after* all [Events]
63      * have been processed; this keeps the value of the [State] consistent during the entire Kairos
64      * transaction.
65      *
66      * @see holdState
67      */
68     fun <A> Events<A>.holdStateDeferred(initialValue: DeferredValue<A>): State<A>
69 
70     /**
71      * Returns a [State] holding a [Map] that is updated incrementally whenever this emits a value.
72      *
73      * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted
74      * map, an associated value of [present][Maybe.present] will insert or replace the value in the
75      * tracked [Map], and an associated value of [absent][Maybe.absent] will remove the key from the
76      * tracked [Map].
77      *
78      * @sample com.android.systemui.kairos.KairosSamples.incrementals
79      * @see MapPatch
80      */
81     fun <K, V> Events<MapPatch<K, V>>.foldStateMapIncrementally(
82         initialValues: DeferredValue<Map<K, V>>
83     ): Incremental<K, V>
84 
85     /**
86      * Returns an [Events] the emits the result of applying [Statefuls][Stateful] emitted from the
87      * original [Events].
88      *
89      * Unlike [applyLatestStateful], state accumulation is not stopped with each subsequent emission
90      * of the original [Events].
91      */
92     fun <A> Events<Stateful<A>>.applyStatefuls(): Events<A>
93 
94     /**
95      * Returns an [Events] containing the results of applying each [Stateful] emitted from the
96      * original [Events], and a [DeferredValue] containing the result of applying [init]
97      * immediately.
98      *
99      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
100      * then the previously-active [Stateful] will be stopped with no replacement.
101      *
102      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful]
103      * with the same key is stopped.
104      *
105      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
106      */
107     fun <K, A, B> Events<MapPatch<K, Stateful<A>>>.applyLatestStatefulForKey(
108         init: DeferredValue<Map<K, Stateful<B>>>,
109         numKeys: Int? = null,
110     ): Pair<Events<MapPatch<K, A>>, DeferredValue<Map<K, B>>>
111 
112     // TODO: everything below this comment can be made into extensions once we have context params
113 
114     /**
115      * Returns an [Events] that emits from a merged, incrementally-accumulated collection of
116      * [Events] emitted from this, following the patch rules outlined in
117      * [Map.applyPatch][com.android.systemui.kairos.util.applyPatch].
118      *
119      * ```
120      *   fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementally(
121      *     initialEvents: DeferredValue<Map<K, Events<V>>>,
122      *   ): Events<Map<K, V>> =
123      *     foldMapIncrementally(initialEvents).mergeEventsIncrementally(initialEvents)
124      * ```
125      *
126      * @see Incremental.mergeEventsIncrementally
127      * @see merge
128      */
129     fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementally(
130         initialEvents: DeferredValue<Map<K, Events<V>>>
131     ): Events<Map<K, V>> = foldStateMapIncrementally(initialEvents).mergeEventsIncrementally()
132 
133     /**
134      * Returns an [Events] that emits from a merged, incrementally-accumulated collection of
135      * [Events] emitted from this, following the patch rules outlined in
136      * [Map.applyPatch][com.android.systemui.kairos.util.applyPatch].
137      *
138      * ```
139      *   fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementallyPromptly(
140      *     initialEvents: DeferredValue<Map<K, Events<V>>>,
141      *   ): Events<Map<K, V>> =
142      *     foldMapIncrementally(initialEvents).mergeEventsIncrementallyPromptly(initialEvents)
143      * ```
144      *
145      * @see Incremental.mergeEventsIncrementallyPromptly
146      * @see merge
147      */
148     fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementallyPromptly(
149         initialEvents: DeferredValue<Map<K, Events<V>>>
150     ): Events<Map<K, V>> =
151         foldStateMapIncrementally(initialEvents).mergeEventsIncrementallyPromptly()
152 
153     /**
154      * Returns an [Events] that emits from a merged, incrementally-accumulated collection of
155      * [Events] emitted from this, following the patch rules outlined in
156      * [Map.applyPatch][com.android.systemui.kairos.util.applyPatch].
157      *
158      * ```
159      *   fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementally(
160      *     initialEvents: Map<K, Events<V>>,
161      *   ): Events<Map<K, V>> =
162      *     foldMapIncrementally(initialEvents).mergeEventsIncrementally(initialEvents)
163      * ```
164      *
165      * @see Incremental.mergeEventsIncrementally
166      * @see merge
167      */
168     fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementally(
169         initialEvents: Map<K, Events<V>> = emptyMap()
170     ): Events<Map<K, V>> = mergeEventsIncrementally(deferredOf(initialEvents))
171 
172     /**
173      * Returns an [Events] that emits from a merged, incrementally-accumulated collection of
174      * [Events] emitted from this, following the patch rules outlined in
175      * [Map.applyPatch][com.android.systemui.kairos.util.applyPatch].
176      *
177      * ```
178      *   fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementallyPromptly(
179      *     initialEvents: Map<K, Events<V>>,
180      *   ): Events<Map<K, V>> =
181      *     foldMapIncrementally(initialEvents).mergeEventsIncrementallyPromptly(initialEvents)
182      * ```
183      *
184      * @see Incremental.mergeEventsIncrementallyPromptly
185      * @see merge
186      */
187     fun <K, V> Events<MapPatch<K, Events<V>>>.mergeEventsIncrementallyPromptly(
188         initialEvents: Map<K, Events<V>> = emptyMap()
189     ): Events<Map<K, V>> = mergeEventsIncrementallyPromptly(deferredOf(initialEvents))
190 
191     /** Applies the [Stateful] within this [StateScope]. */
192     fun <A> Stateful<A>.applyStateful(): A = this()
193 
194     /**
195      * Applies the [Stateful] within this [StateScope], returning the result as a [DeferredValue].
196      */
197     fun <A> Stateful<A>.applyStatefulDeferred(): DeferredValue<A> = deferredStateScope {
198         applyStateful()
199     }
200 
201     /**
202      * Returns a [State] that holds onto the most recently emitted value from this [Events], or
203      * [initialValue] if nothing has been emitted since it was constructed.
204      *
205      * Note that the value contained within the [State] is not updated until *after* all [Events]
206      * have been processed; this keeps the value of the [State] consistent during the entire Kairos
207      * transaction.
208      *
209      * @sample com.android.systemui.kairos.KairosSamples.holdState
210      * @see holdStateDeferred
211      */
212     fun <A> Events<A>.holdState(initialValue: A): State<A> =
213         holdStateDeferred(deferredOf(initialValue))
214 
215     /**
216      * Returns an [Events] containing the results of applying [transform] to each value of the
217      * original [Events].
218      *
219      * [transform] can perform state accumulation via its [StateScope] receiver. Unlike
220      * [mapLatestStateful], accumulation is not stopped with each subsequent emission of the
221      * original [Events].
222      *
223      * ```
224      *   fun <A, B> Events<A>.mapStateful(transform: StateScope.(A) -> B): Events<B> =
225      *       map { statefully { transform(it) } }.applyStatefuls()
226      * ```
227      */
228     fun <A, B> Events<A>.mapStateful(transform: StateScope.(A) -> B): Events<B> =
229         mapCheap { statefully { transform(it) } }.applyStatefuls()
230 
231     /**
232      * Returns a [State] the holds the result of applying the [Stateful] held by the original
233      * [State].
234      *
235      * Unlike [applyLatestStateful], state accumulation is not stopped with each state change.
236      *
237      * ```
238      *   fun <A> State<Stateful<A>>.applyStatefuls(): State<A> =
239      *       changes
240      *           .applyStatefuls()
241      *           .holdState(initialValue = sample().applyStateful())
242      * ```
243      */
244     fun <A> State<Stateful<A>>.applyStatefuls(): State<A> =
245         changes
246             .applyStatefuls()
247             .holdStateDeferred(
248                 initialValue = deferredStateScope { sampleDeferred().value.applyStateful() }
249             )
250 
251     /**
252      * Returns an [Events] that acts like the most recent [Events] to be emitted from the original
253      * [Events].
254      *
255      * ```
256      *   fun <A> Events<Events<A>>.flatten() = holdState(emptyEvents).switchEvents()
257      * ```
258      *
259      * @see switchEvents
260      */
261     fun <A> Events<Events<A>>.flatten() = holdState(emptyEvents).switchEvents()
262 
263     /**
264      * Returns an [Events] containing the results of applying [transform] to each value of the
265      * original [Events].
266      *
267      * [transform] can perform state accumulation via its [StateScope] receiver. With each
268      * invocation of [transform], state accumulation from previous invocation is stopped.
269      *
270      * ```
271      *   fun <A, B> Events<A>.mapLatestStateful(transform: StateScope.(A) -> B): Events<B> =
272      *       map { statefully { transform(it) } }.applyLatestStateful()
273      * ```
274      */
275     fun <A, B> Events<A>.mapLatestStateful(transform: StateScope.(A) -> B): Events<B> =
276         mapCheap { statefully { transform(it) } }.applyLatestStateful()
277 
278     /**
279      * Returns an [Events] that switches to a new [Events] produced by [transform] every time the
280      * original [Events] emits a value.
281      *
282      * [transform] can perform state accumulation via its [StateScope] receiver. With each
283      * invocation of [transform], state accumulation from previous invocation is stopped.
284      *
285      * ```
286      *   fun <A, B> Events<A>.flatMapLatestStateful(
287      *       transform: StateScope.(A) -> Events<B>
288      *   ): Events<B> =
289      *       mapLatestStateful(transform).flatten()
290      * ```
291      */
292     fun <A, B> Events<A>.flatMapLatestStateful(transform: StateScope.(A) -> Events<B>): Events<B> =
293         mapLatestStateful(transform).flatten()
294 
295     /**
296      * Returns an [Events] containing the results of applying each [Stateful] emitted from the
297      * original [Events].
298      *
299      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful] is
300      * stopped.
301      *
302      * @sample com.android.systemui.kairos.KairosSamples.applyLatestStateful
303      */
304     fun <A> Events<Stateful<A>>.applyLatestStateful(): Events<A> = applyLatestStateful {}.first
305 
306     /**
307      * Returns a [State] containing the value returned by applying the [Stateful] held by the
308      * original [State].
309      *
310      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful] is
311      * stopped.
312      */
313     fun <A> State<Stateful<A>>.applyLatestStateful(): State<A> {
314         val (changes, init) = changes.applyLatestStateful { sample()() }
315         return changes.holdStateDeferred(init)
316     }
317 
318     /**
319      * Returns an [Events] containing the results of applying each [Stateful] emitted from the
320      * original [Events], and a [DeferredValue] containing the result of applying [init]
321      * immediately.
322      *
323      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful] is
324      * stopped.
325      */
326     fun <A, B> Events<Stateful<B>>.applyLatestStateful(
327         init: Stateful<A>
328     ): Pair<Events<B>, DeferredValue<A>> {
329         val (events, result) =
330             mapCheap { spec -> mapOf(Unit to Maybe.present(spec)) }
331                 .applyLatestStatefulForKey(init = mapOf(Unit to init), numKeys = 1)
332         val outEvents: Events<B> =
333             events.mapMaybe {
334                 checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" }
335             }
336         val outInit: DeferredValue<A> = deferredTransactionScope {
337             val initResult: Map<Unit, A> = result.value
338             check(Unit in initResult) {
339                 "applyLatest: expected initial result, but none present in: $initResult"
340             }
341             initResult.getValue(Unit)
342         }
343         return Pair(outEvents, outInit)
344     }
345 
346     /**
347      * Returns an [Events] containing the results of applying each [Stateful] emitted from the
348      * original [Events], and a [DeferredValue] containing the result of applying [init]
349      * immediately.
350      *
351      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful]
352      * with the same key is stopped.
353      *
354      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
355      * then the previously-active [Stateful] will be stopped with no replacement.
356      *
357      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
358      */
359     fun <K, A, B> Events<MapPatch<K, Stateful<A>>>.applyLatestStatefulForKey(
360         init: Map<K, Stateful<B>>,
361         numKeys: Int? = null,
362     ): Pair<Events<MapPatch<K, A>>, DeferredValue<Map<K, B>>> =
363         applyLatestStatefulForKey(deferredOf(init), numKeys)
364 
365     /**
366      * Returns an [Incremental] containing the latest results of applying each [Stateful] emitted
367      * from the original [Incremental]'s [updates].
368      *
369      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful]
370      * with the same key is stopped.
371      *
372      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
373      * then the previously-active [Stateful] will be stopped with no replacement.
374      *
375      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
376      */
377     fun <K, V> Incremental<K, Stateful<V>>.applyLatestStatefulForKey(
378         numKeys: Int? = null
379     ): Incremental<K, V> {
380         val (events, init) = updates.applyLatestStatefulForKey(sampleDeferred())
381         return events.foldStateMapIncrementally(init)
382     }
383 
384     /**
385      * Returns a [State] containing the latest results of applying each [Stateful] emitted from the
386      * original [Events].
387      *
388      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful]
389      * with the same key is stopped.
390      *
391      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
392      * then the previously-active [Stateful] will be stopped with no replacement.
393      *
394      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
395      */
396     fun <K, A> Events<MapPatch<K, Stateful<A>>>.holdLatestStatefulForKey(
397         init: DeferredValue<Map<K, Stateful<A>>>,
398         numKeys: Int? = null,
399     ): Incremental<K, A> {
400         val (changes, initialValues) = applyLatestStatefulForKey(init, numKeys)
401         return changes.foldStateMapIncrementally(initialValues)
402     }
403 
404     /**
405      * Returns a [State] containing the latest results of applying each [Stateful] emitted from the
406      * original [Events].
407      *
408      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful]
409      * with the same key is stopped.
410      *
411      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
412      * then the previously-active [Stateful] will be stopped with no replacement.
413      *
414      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
415      */
416     fun <K, A> Events<MapPatch<K, Stateful<A>>>.holdLatestStatefulForKey(
417         init: Map<K, Stateful<A>> = emptyMap(),
418         numKeys: Int? = null,
419     ): Incremental<K, A> = holdLatestStatefulForKey(deferredOf(init), numKeys)
420 
421     /**
422      * Returns an [Events] containing the results of applying each [Stateful] emitted from the
423      * original [Events].
424      *
425      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful]
426      * with the same key is stopped.
427      *
428      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
429      * then the previously-active [Stateful] will be stopped with no replacement.
430      *
431      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
432      *
433      * @sample com.android.systemui.kairos.KairosSamples.applyLatestStatefulForKey
434      */
435     fun <K, A> Events<MapPatch<K, Stateful<A>>>.applyLatestStatefulForKey(
436         numKeys: Int? = null
437     ): Events<MapPatch<K, A>> =
438         applyLatestStatefulForKey(init = emptyMap<K, Stateful<*>>(), numKeys = numKeys).first
439 
440     /**
441      * Returns an [Events] containing the results of applying [transform] to each value of the
442      * original [Events], and a [DeferredValue] containing the result of applying [transform] to
443      * [initialValues] immediately.
444      *
445      * [transform] can perform state accumulation via its [StateScope] receiver. With each
446      * invocation of [transform], state accumulation from previous invocation is stopped.
447      *
448      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
449      * then the previously-active [StateScope] will be stopped with no replacement.
450      *
451      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
452      */
453     fun <K, A, B> Events<MapPatch<K, A>>.mapLatestStatefulForKey(
454         initialValues: DeferredValue<Map<K, A>>,
455         numKeys: Int? = null,
456         transform: StateScope.(A) -> B,
457     ): Pair<Events<MapPatch<K, B>>, DeferredValue<Map<K, B>>> =
458         map { patch -> patch.mapValues { (_, v) -> v.map { statefully { transform(it) } } } }
459             .applyLatestStatefulForKey(
460                 deferredStateScope {
461                     initialValues.value.mapValues { (_, v) -> statefully { transform(v) } }
462                 },
463                 numKeys = numKeys,
464             )
465 
466     /**
467      * Returns an [Events] containing the results of applying [transform] to each value of the
468      * original [Events], and a [DeferredValue] containing the result of applying [transform] to
469      * [initialValues] immediately.
470      *
471      * [transform] can perform state accumulation via its [StateScope] receiver. With each
472      * invocation of [transform], state accumulation from previous invocation is stopped.
473      *
474      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
475      * then the previously-active [StateScope] will be stopped with no replacement.
476      *
477      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
478      */
479     fun <K, A, B> Events<MapPatch<K, A>>.mapLatestStatefulForKey(
480         initialValues: Map<K, A>,
481         numKeys: Int? = null,
482         transform: StateScope.(A) -> B,
483     ): Pair<Events<MapPatch<K, B>>, DeferredValue<Map<K, B>>> =
484         mapLatestStatefulForKey(deferredOf(initialValues), numKeys, transform)
485 
486     /**
487      * Returns an [Events] containing the results of applying [transform] to each value of the
488      * original [Events].
489      *
490      * [transform] can perform state accumulation via its [StateScope] receiver. With each
491      * invocation of [transform], state accumulation from previous invocation is stopped.
492      *
493      * If the [Maybe] contained within the value for an associated key is [absent][Maybe.absent],
494      * then the previously-active [StateScope] will be stopped with no replacement.
495      *
496      * The optional [numKeys] argument is an optimization used to initialize the internal storage.
497      *
498      * ```
499      *   fun <K, A, B> Events<MapPatch<K, A>>.mapLatestStatefulForKey(
500      *       numKeys: Int? = null,
501      *       transform: StateScope.(A) -> B,
502      *   ): Pair<Events<MapPatch<K, B>>, DeferredValue<Map<K, B>>> =
503      *       map { patch -> patch.mapValues { (_, mv) -> mv.map { statefully { transform(it) } } } }
504      *           .applyLatestStatefulForKey(numKeys)
505      * ```
506      */
507     fun <K, A, B> Events<MapPatch<K, A>>.mapLatestStatefulForKey(
508         numKeys: Int? = null,
509         transform: StateScope.(A) -> B,
510     ): Events<MapPatch<K, B>> = mapLatestStatefulForKey(emptyMap(), numKeys, transform).first
511 
512     /**
513      * Returns an [Events] that will only emit the next event of the original [Events], and then
514      * will act as [emptyEvents].
515      *
516      * If the original [Events] is emitting an event at this exact time, then it will be the only
517      * even emitted from the result [Events].
518      *
519      * ```
520      *   fun <A> Events<A>.nextOnly(): Events<A> =
521      *       EventsLoop<A>().apply {
522      *           loopback = map { emptyEvents }.holdState(this@nextOnly).switchEvents()
523      *       }
524      * ```
525      */
526     fun <A> Events<A>.nextOnly(): Events<A> =
527         if (this === emptyEvents) {
528             this
529         } else {
530             EventsLoop<A>().apply {
531                 loopback = mapCheap { emptyEvents }.holdState(this@nextOnly).switchEvents()
532             }
533         }
534 
535     /**
536      * Returns an [Events] that skips the next emission of the original [Events].
537      *
538      * ```
539      *   fun <A> Events<A>.skipNext(): Events<A> =
540      *       nextOnly().map { this@skipNext }.holdState(emptyEvents).switchEvents()
541      * ```
542      */
543     fun <A> Events<A>.skipNext(): Events<A> =
544         if (this === emptyEvents) {
545             this
546         } else {
547             nextOnly().mapCheap { this@skipNext }.holdState(emptyEvents).switchEvents()
548         }
549 
550     /**
551      * Returns an [Events] that emits values from the original [Events] up until [stop] emits a
552      * value.
553      *
554      * If the original [Events] emits at the same time as [stop], then the returned [Events] will
555      * emit that value.
556      *
557      * ```
558      *   fun <A> Events<A>.takeUntil(stop: Events<*>): Events<A> =
559      *       stop.map { emptyEvents }.nextOnly().holdState(this).switchEvents()
560      * ```
561      */
562     fun <A> Events<A>.takeUntil(stop: Events<*>): Events<A> =
563         if (stop === emptyEvents) {
564             this
565         } else {
566             stop.mapCheap { emptyEvents }.nextOnly().holdState(this).switchEvents()
567         }
568 
569     /**
570      * Invokes [stateful] in a new [StateScope] that is a child of this one.
571      *
572      * This new scope is stopped when [stop] first emits a value, or when the parent scope is
573      * stopped. Stopping will end all state accumulation; any [States][State] returned from this
574      * scope will no longer update.
575      */
576     fun <A> childStateScope(stop: Events<*>, stateful: Stateful<A>): DeferredValue<A> {
577         val (_, init: DeferredValue<Map<Unit, A>>) =
578             stop
579                 .nextOnly()
580                 .map { mapOf(Unit to Maybe.absent<Stateful<A>>()) }
581                 .applyLatestStatefulForKey(init = mapOf(Unit to stateful), numKeys = 1)
582         return deferredStateScope { init.value.getValue(Unit) }
583     }
584 
585     /**
586      * Returns an [Events] that emits values from the original [Events] up to and including a value
587      * is emitted that satisfies [predicate].
588      *
589      * ```
590      *   fun <A> Events<A>.takeUntil(predicate: TransactionScope.(A) -> Boolean): Events<A> =
591      *       takeUntil(filter(predicate))
592      * ```
593      */
594     fun <A> Events<A>.takeUntil(predicate: TransactionScope.(A) -> Boolean): Events<A> =
595         takeUntil(filter(predicate))
596 
597     /**
598      * Returns a [State] that is incrementally updated when this [Events] emits a value, by applying
599      * [transform] to both the emitted value and the currently tracked state.
600      *
601      * Note that the value contained within the [State] is not updated until *after* all [Events]
602      * have been processed; this keeps the value of the [State] consistent during the entire Kairos
603      * transaction.
604      *
605      * ```
606      *   fun <A, B> Events<A>.foldState(
607      *       initialValue: B,
608      *       transform: TransactionScope.(A, B) -> B,
609      *   ): State<B> {
610      *       lateinit var state: State<B>
611      *       return map { a -> transform(a, state.sample()) }
612      *           .holdState(initialValue)
613      *           .also { state = it }
614      *   }
615      * ```
616      */
617     fun <A, B> Events<A>.foldState(
618         initialValue: B,
619         transform: TransactionScope.(A, B) -> B,
620     ): State<B> {
621         lateinit var state: State<B>
622         return map { a -> transform(a, state.sample()) }.holdState(initialValue).also { state = it }
623     }
624 
625     /**
626      * Returns a [State] that is incrementally updated when this [Events] emits a value, by applying
627      * [transform] to both the emitted value and the currently tracked state.
628      *
629      * Note that the value contained within the [State] is not updated until *after* all [Events]
630      * have been processed; this keeps the value of the [State] consistent during the entire Kairos
631      * transaction.
632      *
633      * ```
634      *   fun <A, B> Events<A>.foldStateDeferred(
635      *       initialValue: DeferredValue<B>,
636      *       transform: TransactionScope.(A, B) -> B,
637      *   ): State<B> {
638      *       lateinit var state: State<B>
639      *       return map { a -> transform(a, state.sample()) }
640      *           .holdStateDeferred(initialValue)
641      *           .also { state = it }
642      *   }
643      * ```
644      */
645     fun <A, B> Events<A>.foldStateDeferred(
646         initialValue: DeferredValue<B>,
647         transform: TransactionScope.(A, B) -> B,
648     ): State<B> {
649         lateinit var state: State<B>
650         return map { a -> transform(a, state.sample()) }
651             .holdStateDeferred(initialValue)
652             .also { state = it }
653     }
654 
655     /**
656      * Returns a [State] that holds onto the result of applying the most recently emitted [Stateful]
657      * this [Events], or [init] if nothing has been emitted since it was constructed.
658      *
659      * When each [Stateful] is applied, state accumulation from the previously-active [Stateful] is
660      * stopped.
661      *
662      * Note that the value contained within the [State] is not updated until *after* all [Events]
663      * have been processed; this keeps the value of the [State] consistent during the entire Kairos
664      * transaction.
665      *
666      * ```
667      *   fun <A> Events<Stateful<A>>.holdLatestStateful(init: Stateful<A>): State<A> {
668      *       val (changes, initApplied) = applyLatestStateful(init)
669      *       return changes.holdStateDeferred(initApplied)
670      *   }
671      * ```
672      */
673     fun <A> Events<Stateful<A>>.holdLatestStateful(init: Stateful<A>): State<A> {
674         val (changes, initApplied) = applyLatestStateful(init)
675         return changes.holdStateDeferred(initApplied)
676     }
677 
678     /**
679      * Returns an [Events] that emits the two most recent emissions from the original [Events].
680      * [initialValue] is used as the previous value for the first emission.
681      *
682      * Shorthand for `sample(hold(init)) { new, old -> Pair(old, new) }`
683      */
684     fun <S, T : S> Events<T>.pairwise(initialValue: S): Events<WithPrev<S, T>> {
685         val previous = holdState(initialValue)
686         return mapCheap { new -> WithPrev(previousValue = previous.sample(), newValue = new) }
687     }
688 
689     /**
690      * Returns an [Events] that emits the two most recent emissions from the original [Events]. Note
691      * that the returned [Events] will not emit until the original [Events] has emitted twice.
692      */
693     fun <A> Events<A>.pairwise(): Events<WithPrev<A, A>> =
694         mapCheap { Maybe.present(it) }
695             .pairwise(Maybe.absent)
696             .mapMaybe { (prev, next) -> prev.zipWith(next, ::WithPrev) }
697 
698     /**
699      * Returns a [State] that holds both the current and previous values of the original [State].
700      * [initialPreviousValue] is used as the first previous value.
701      *
702      * Shorthand for `sample(hold(init)) { new, old -> Pair(old, new) }`
703      */
704     fun <S, T : S> State<T>.pairwise(initialPreviousValue: S): State<WithPrev<S, T>> =
705         changes
706             .pairwise(initialPreviousValue)
707             .holdStateDeferred(
708                 deferredTransactionScope { WithPrev(initialPreviousValue, sample()) }
709             )
710 
711     /**
712      * Returns a [State] holding a [Map] that is updated incrementally whenever this emits a value.
713      *
714      * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted
715      * map, an associated value of [Maybe.present] will insert or replace the value in the tracked
716      * [Map], and an associated value of [absent][Maybe.absent] will remove the key from the tracked
717      * [Map].
718      */
719     fun <K, V> Events<MapPatch<K, V>>.foldStateMapIncrementally(
720         initialValues: Map<K, V> = emptyMap()
721     ): Incremental<K, V> = foldStateMapIncrementally(deferredOf(initialValues))
722 
723     /**
724      * Returns an [Events] that wraps each emission of the original [Events] into an [IndexedValue],
725      * containing the emitted value and its index (starting from zero).
726      *
727      * ```
728      *   fun <A> Events<A>.withIndex(): Events<IndexedValue<A>> {
729      *     val index = fold(0) { _, oldIdx -> oldIdx + 1 }
730      *     return sample(index) { a, idx -> IndexedValue(idx, a) }
731      *   }
732      * ```
733      */
734     fun <A> Events<A>.withIndex(): Events<IndexedValue<A>> {
735         val index = foldState(0) { _, old -> old + 1 }
736         return sample(index) { a, idx -> IndexedValue(idx, a) }
737     }
738 
739     /**
740      * Returns an [Events] containing the results of applying [transform] to each value of the
741      * original [Events] and its index (starting from zero).
742      *
743      * ```
744      *   fun <A> Events<A>.mapIndexed(transform: TransactionScope.(Int, A) -> B): Events<B> {
745      *       val index = foldState(0) { _, i -> i + 1 }
746      *       return sample(index) { a, idx -> transform(idx, a) }
747      *   }
748      * ```
749      */
750     fun <A, B> Events<A>.mapIndexed(transform: TransactionScope.(Int, A) -> B): Events<B> {
751         val index = foldState(0) { _, i -> i + 1 }
752         return sample(index) { a, idx -> transform(idx, a) }
753     }
754 
755     /**
756      * Returns an [Events] where all subsequent repetitions of the same value are filtered out.
757      *
758      * ```
759      *   fun <A> Events<A>.distinctUntilChanged(): Events<A> {
760      *       val state: State<Any?> = holdState(Any())
761      *       return filter { it != state.sample() }
762      *   }
763      * ```
764      */
765     fun <A> Events<A>.distinctUntilChanged(): Events<A> {
766         val state: State<Any?> = holdState(Any())
767         return filter { it != state.sample() }
768     }
769 
770     /**
771      * Returns a new [Events] that emits at the same rate as the original [Events], but combines the
772      * emitted value with the most recent emission from [other] using [transform].
773      *
774      * Note that the returned [Events] will not emit anything until [other] has emitted at least one
775      * value.
776      *
777      * ```
778      *   fun <A, B, C> Events<A>.sample(
779      *       other: Events<B>,
780      *       transform: TransactionScope.(A, B) -> C,
781      *   ): Events<C> {
782      *       val state = other.mapCheap { Maybe.present(it) }.holdState(Maybe.absent)
783      *       return sample(state) { a, b -> b.map { transform(a, it) } }.filterPresent()
784      *   }
785      * ```
786      */
787     fun <A, B, C> Events<A>.sample(
788         other: Events<B>,
789         transform: TransactionScope.(A, B) -> C,
790     ): Events<C> {
791         val state = other.mapCheap { Maybe.present(it) }.holdState(Maybe.absent)
792         return sample(state) { a, b -> b.map { transform(a, it) } }.filterPresent()
793     }
794 
795     /**
796      * Returns a [State] that samples the [Transactional] held by the given [State] within the same
797      * transaction that the state changes.
798      *
799      * ```
800      *   fun <A> State<Transactional<A>>.sampleTransactionals(): State<A> =
801      *       changes
802      *           .sampleTransactionals()
803      *           .holdStateDeferred(deferredTransactionScope { sample().sample() })
804      * ```
805      */
806     fun <A> State<Transactional<A>>.sampleTransactionals(): State<A> =
807         changes
808             .sampleTransactionals()
809             .holdStateDeferred(deferredTransactionScope { sample().sample() })
810 
811     /**
812      * Returns a [State] that transforms the value held inside this [State] by applying it to the
813      * given function [transform].
814      *
815      * Note that this is less efficient than [State.map], which should be preferred if [transform]
816      * does not need access to [TransactionScope].
817      *
818      * ```
819      *   fun <A, B> State<A>.mapTransactionally(transform: TransactionScope.(A) -> B): State<B> =
820      *       map { transactionally { transform(it) } }.sampleTransactionals()
821      * ```
822      */
823     fun <A, B> State<A>.mapTransactionally(transform: TransactionScope.(A) -> B): State<B> =
824         map { transactionally { transform(it) } }.sampleTransactionals()
825 
826     /**
827      * Returns a [State] whose value is generated with [transform] by combining the current values
828      * of each given [State].
829      *
830      * Note that this is less efficient than [combine], which should be preferred if [transform]
831      * does not need access to [TransactionScope].
832      *
833      * ```
834      *   fun <A, B, Z> combineTransactionally(
835      *       stateA: State<A>,
836      *       stateB: State<B>,
837      *       transform: TransactionScope.(A, B) -> Z,
838      *   ): State<Z> =
839      *       combine(stateA, stateB) { a, b -> transactionally { transform(a, b) } }
840      *           .sampleTransactionals()
841      * ```
842      *
843      * @see State.combineTransactionally
844      */
845     fun <A, B, Z> combineTransactionally(
846         stateA: State<A>,
847         stateB: State<B>,
848         transform: TransactionScope.(A, B) -> Z,
849     ): State<Z> =
850         combine(stateA, stateB) { a, b -> transactionally { transform(a, b) } }
851             .sampleTransactionals()
852 
853     /**
854      * Returns a [State] whose value is generated with [transform] by combining the current values
855      * of each given [State].
856      *
857      * Note that this is less efficient than [combine], which should be preferred if [transform]
858      * does not need access to [TransactionScope].
859      *
860      * @see State.combineTransactionally
861      */
862     fun <A, B, C, Z> combineTransactionally(
863         stateA: State<A>,
864         stateB: State<B>,
865         stateC: State<C>,
866         transform: TransactionScope.(A, B, C) -> Z,
867     ): State<Z> =
868         combine(stateA, stateB, stateC) { a, b, c -> transactionally { transform(a, b, c) } }
869             .sampleTransactionals()
870 
871     /**
872      * Returns a [State] whose value is generated with [transform] by combining the current values
873      * of each given [State].
874      *
875      * Note that this is less efficient than [combine], which should be preferred if [transform]
876      * does not need access to [TransactionScope].
877      *
878      * @see State.combineTransactionally
879      */
880     fun <A, B, C, D, Z> combineTransactionally(
881         stateA: State<A>,
882         stateB: State<B>,
883         stateC: State<C>,
884         stateD: State<D>,
885         transform: TransactionScope.(A, B, C, D) -> Z,
886     ): State<Z> =
887         combine(stateA, stateB, stateC, stateD) { a, b, c, d ->
888                 transactionally { transform(a, b, c, d) }
889             }
890             .sampleTransactionals()
891 
892     /**
893      * Returns a [State] by applying [transform] to the value held by the original [State].
894      *
895      * Note that this is less efficient than [flatMap], which should be preferred if [transform]
896      * does not need access to [TransactionScope].
897      *
898      * ```
899      *   fun <A, B> State<A>.flatMapTransactionally(
900      *       transform: TransactionScope.(A) -> State<B>
901      *   ): State<B> = map { transactionally { transform(it) } }.sampleTransactionals().flatten()
902      * ```
903      */
904     fun <A, B> State<A>.flatMapTransactionally(
905         transform: TransactionScope.(A) -> State<B>
906     ): State<B> = map { transactionally { transform(it) } }.sampleTransactionals().flatten()
907 
908     /**
909      * Returns a [State] whose value is generated with [transform] by combining the current values
910      * of each given [State].
911      *
912      * Note that this is less efficient than [combine], which should be preferred if [transform]
913      * does not need access to [TransactionScope].
914      *
915      * @see State.combineTransactionally
916      */
917     fun <A, Z> combineTransactionally(
918         vararg states: State<A>,
919         transform: TransactionScope.(List<A>) -> Z,
920     ): State<Z> = combine(*states).mapTransactionally(transform)
921 
922     /**
923      * Returns a [State] whose value is generated with [transform] by combining the current values
924      * of each given [State].
925      *
926      * Note that this is less efficient than [combine], which should be preferred if [transform]
927      * does not need access to [TransactionScope].
928      *
929      * @see State.combineTransactionally
930      */
931     fun <A, Z> Iterable<State<A>>.combineTransactionally(
932         transform: TransactionScope.(List<A>) -> Z
933     ): State<Z> = combine().mapTransactionally(transform)
934 
935     /**
936      * Returns a [State] by combining the values held inside the given [State]s by applying them to
937      * the given function [transform].
938      *
939      * Note that this is less efficient than [combine], which should be preferred if [transform]
940      * does not need access to [TransactionScope].
941      */
942     @Suppress("INAPPLICABLE_JVM_NAME")
943     @JvmName(name = "combineStateTransactionally")
944     fun <A, B, C> State<A>.combineTransactionally(
945         other: State<B>,
946         transform: TransactionScope.(A, B) -> C,
947     ): State<C> = combineTransactionally(this, other, transform)
948 
949     /**
950      * Returns an [Incremental] that reflects the state of the original [Incremental], but also adds
951      * / removes entries based on the state of the original's values.
952      *
953      * ```
954      *   fun <K, V> Incremental<K, State<Maybe<V>>>.applyStateIncrementally(): Incremental<K, V> =
955      *       mapValues { (_, v) -> v.changes }
956      *           .mergeEventsIncrementallyPromptly()
957      *           .foldStateMapIncrementally(
958      *               deferredStateScope { sample().mapMaybeValues { (_, s) -> s.sample() } }
959      *           )
960      * ```
961      */
962     fun <K, V> Incremental<K, State<Maybe<V>>>.applyStateIncrementally(): Incremental<K, V> =
963         mapValues { (_, v) -> v.changes }
964             .mergeEventsIncrementallyPromptly()
965             .foldStateMapIncrementally(
966                 deferredStateScope { sample().mapMaybeValues { (_, s) -> s.sample() } }
967             )
968 
969     /**
970      * Returns an [Incremental] that reflects the state of the original [Incremental], but also adds
971      * / removes entries based on the [State] returned from applying [transform] to the original's
972      * entries.
973      *
974      * ```
975      *   fun <K, V, U> Incremental<K, V>.mapIncrementalState(
976      *       transform: KairosScope.(Map.Entry<K, V>) -> State<Maybe<U>>
977      *   ): Incremental<K, U> = mapValues { transform(it) }.applyStateIncrementally()
978      * ```
979      */
980     fun <K, V, U> Incremental<K, V>.mapIncrementalState(
981         transform: KairosScope.(Map.Entry<K, V>) -> State<Maybe<U>>
982     ): Incremental<K, U> = mapValues { transform(it) }.applyStateIncrementally()
983 
984     /**
985      * Returns an [Incremental] that reflects the state of the original [Incremental], but also adds
986      * / removes entries based on the [State] returned from applying [transform] to the original's
987      * entries, such that entries are added when that state is `true`, and removed when `false`.
988      *
989      * ```
990      *   fun <K, V> Incremental<K, V>.filterIncrementally(
991      *       transform: KairosScope.(Map.Entry<K, V>) -> State<Boolean>
992      *   ): Incremental<K, V> = mapIncrementalState { entry ->
993      *       transform(entry).map { if (it) Maybe.present(entry.value) else Maybe.absent }
994      *   }
995      * ```
996      */
997     fun <K, V> Incremental<K, V>.filterIncrementally(
998         transform: KairosScope.(Map.Entry<K, V>) -> State<Boolean>
999     ): Incremental<K, V> = mapIncrementalState { entry ->
1000         transform(entry).map { if (it) Maybe.present(entry.value) else Maybe.absent }
1001     }
1002 
1003     /**
1004      * Returns an [Incremental] that samples the [Transactionals][Transactional] held by the
1005      * original within the same transaction that the incremental [updates].
1006      *
1007      * ```
1008      *   fun <K, V> Incremental<K, Transactional<V>>.sampleTransactionals(): Incremental<K, V> =
1009      *       updates
1010      *           .map { patch -> patch.mapValues { (k, mv) -> mv.map { it.sample() } } }
1011      *           .foldStateMapIncrementally(
1012      *               deferredStateScope { sample().mapValues { (k, v) -> v.sample() } }
1013      *           )
1014      * ```
1015      */
1016     fun <K, V> Incremental<K, Transactional<V>>.sampleTransactionals(): Incremental<K, V> =
1017         updates
1018             .map { patch -> patch.mapValues { (k, mv) -> mv.map { it.sample() } } }
1019             .foldStateMapIncrementally(
1020                 deferredStateScope { sample().mapValues { (k, v) -> v.sample() } }
1021             )
1022 
1023     /**
1024      * Returns an [Incremental] that tracks the entries of the original incremental, but values
1025      * replaced with those obtained by applying [transform] to each original entry.
1026      *
1027      * Note that this is less efficient than [mapValues], which should be preferred if [transform]
1028      * does not need access to [TransactionScope].
1029      *
1030      * ```
1031      *   fun <K, V, U> Incremental<K, V>.mapValuesTransactionally(
1032      *       transform: TransactionScope.(Map.Entry<K, V>) -> U
1033      *   ): Incremental<K, U> =
1034      *       mapValues { transactionally { transform(it) } }.sampleTransactionals()
1035      * ```
1036      */
1037     fun <K, V, U> Incremental<K, V>.mapValuesTransactionally(
1038         transform: TransactionScope.(Map.Entry<K, V>) -> U
1039     ): Incremental<K, U> = mapValues { transactionally { transform(it) } }.sampleTransactionals()
1040 }
1041