<lambda>null1 package com.android.systemui.kairos
2
3 import com.android.systemui.kairos.util.Either
4 import com.android.systemui.kairos.util.Either.First
5 import com.android.systemui.kairos.util.Either.Second
6 import com.android.systemui.kairos.util.Maybe
7 import com.android.systemui.kairos.util.Maybe.Absent
8 import com.android.systemui.kairos.util.map
9 import com.android.systemui.kairos.util.maybe
10 import kotlin.time.Duration
11 import kotlin.time.Duration.Companion.seconds
12 import kotlin.time.DurationUnit
13 import kotlin.time.measureTime
14 import kotlinx.coroutines.CompletableDeferred
15 import kotlinx.coroutines.ExperimentalCoroutinesApi
16 import kotlinx.coroutines.async
17 import kotlinx.coroutines.awaitCancellation
18 import kotlinx.coroutines.flow.Flow
19 import kotlinx.coroutines.flow.MutableSharedFlow
20 import kotlinx.coroutines.flow.MutableStateFlow
21 import kotlinx.coroutines.flow.SharedFlow
22 import kotlinx.coroutines.flow.SharingStarted
23 import kotlinx.coroutines.flow.StateFlow
24 import kotlinx.coroutines.flow.first
25 import kotlinx.coroutines.flow.flow
26 import kotlinx.coroutines.flow.map
27 import kotlinx.coroutines.flow.onEach
28 import kotlinx.coroutines.flow.stateIn
29 import kotlinx.coroutines.flow.toCollection
30 import kotlinx.coroutines.launch
31 import kotlinx.coroutines.test.TestScope
32 import kotlinx.coroutines.test.runCurrent
33 import kotlinx.coroutines.test.runTest
34 import org.junit.Assert.assertNull
35 import org.junit.Assert.assertTrue
36 import org.junit.Assert.fail
37 import org.junit.Test
38
39 @OptIn(ExperimentalCoroutinesApi::class)
40 class KairosTests {
41
42 @Test
43 fun basic() = runFrpTest { network ->
44 val emitter = network.mutableEvents<Int>()
45 var result: Int? = null
46 activateSpec(network) { emitter.observe { result = it } }
47 runCurrent()
48 emitter.emit(3)
49 runCurrent()
50 assertEquals(3, result)
51 runCurrent()
52 }
53
54 @Test
55 fun basicEvents() = runFrpTest { network ->
56 val emitter = network.mutableEvents<Int>()
57 println("starting network")
58 val result = activateSpecWithResult(network) { emitter.nextDeferred() }
59 runCurrent()
60 println("emitting")
61 emitter.emit(3)
62 runCurrent()
63 println("awaiting")
64 assertEquals(3, result.await())
65 runCurrent()
66 }
67
68 @Test
69 fun basicState() = runFrpTest { network ->
70 val emitter = network.mutableEvents<Int>()
71 val result = activateSpecWithResult(network) { emitter.holdState(0).changes.nextDeferred() }
72 runCurrent()
73
74 emitter.emit(3)
75 runCurrent()
76
77 assertEquals(3, result.await())
78 }
79
80 @Test
81 fun basicEvent() = runFrpTest { network ->
82 val emitter = MutableSharedFlow<Int>()
83 val result = activateSpecWithResult(network) { async { emitter.first() } }
84 runCurrent()
85 emitter.emit(1)
86 runCurrent()
87 assertTrue("Result eventual has not completed.", result.isCompleted)
88 assertEquals(1, result.await())
89 }
90
91 @Test
92 fun basicTransactional() = runFrpTest { network ->
93 var value: Int? = null
94 var bSource = 1
95 val emitter = network.mutableEvents<Unit>()
96 // Sampling this transactional will increment the source count.
97 val transactional = transactionally { bSource++ }
98 measureTime {
99 activateSpecWithResult(network) {
100 // Two different flows that sample the same transactional.
101 (0 until 2).map {
102 val sampled = emitter.sample(transactional) { _, v -> v }
103 sampled.toSharedFlow()
104 }
105 }
106 .forEach { backgroundScope.launch { it.collect { value = it } } }
107 runCurrent()
108 }
109 .also { println("setup: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
110
111 measureTime {
112 emitter.emit(Unit)
113 runCurrent()
114 }
115 .also { println("emit 1: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
116
117 // Even though the transactional would be sampled twice, the first result is cached.
118 assertEquals(2, bSource)
119 assertEquals(1, value)
120
121 measureTime {
122 bSource = 10
123 emitter.emit(Unit)
124 runCurrent()
125 }
126 .also { println("emit 2: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
127
128 assertEquals(11, bSource)
129 assertEquals(10, value)
130 }
131
132 @Test
133 fun diamondGraph() = runFrpTest { network ->
134 val flow = network.mutableEvents<Int>()
135 val ouevents =
136 activateSpecWithResult(network) {
137 // map Events like we map Flow
138 val left = flow.map { "left" to it }.onEach { println("left: $it") }
139 val right = flow.map { "right" to it }.onEach { println("right: $it") }
140
141 // convert Eventss to States so that they can be combined
142 val combined =
143 left.holdState("left" to 0).combine(right.holdState("right" to 0)) { l, r ->
144 l to r
145 }
146 combined.changes // get State changes
147 .onEach { println("merged: $it") }
148 .toSharedFlow() // convert back to Flow
149 }
150 runCurrent()
151
152 val results = mutableListOf<Pair<Pair<String, Int>, Pair<String, Int>>>()
153 backgroundScope.launch { ouevents.toCollection(results) }
154 runCurrent()
155
156 flow.emit(1)
157 runCurrent()
158
159 flow.emit(2)
160 runCurrent()
161
162 assertEquals(
163 listOf(("left" to 1) to ("right" to 1), ("left" to 2) to ("right" to 2)),
164 results,
165 )
166 }
167
168 @Test
169 fun staticNetwork() = runFrpTest { network ->
170 var finalSum: Int? = null
171
172 val intEmitter = network.mutableEvents<Int>()
173 val sampleEmitter = network.mutableEvents<Unit>()
174
175 activateSpecWithResult(network) {
176 val updates = intEmitter.map { a -> { b: Int -> a + b } }
177
178 val sumD =
179 StateLoop<Int>().apply {
180 loopback =
181 updates
182 .sample(this) { f, sum -> f(sum) }
183 .onEach { println("sum update: $it") }
184 .holdState(0)
185 }
186 sampleEmitter
187 .onEach { println("sampleEmitter emitted") }
188 .sample(sumD) { _, sum -> sum }
189 .onEach { println("sampled: $it") }
190 .nextDeferred()
191 }
192 .let { launch { finalSum = it.await() } }
193
194 runCurrent()
195
196 (1..5).forEach { i ->
197 println("emitting: $i")
198 intEmitter.emit(i)
199 runCurrent()
200 }
201 runCurrent()
202
203 sampleEmitter.emit(Unit)
204 runCurrent()
205
206 assertEquals(15, finalSum)
207 }
208
209 @Test
210 fun recursiveDefinition() = runFrpTest { network ->
211 var wasSold = false
212 var currentAmt: Int? = null
213
214 val coin = network.mutableEvents<Unit>()
215 val price = 50
216 val buildSpec = buildSpec {
217 val eSold = EventsLoop<Unit>()
218
219 val eInsert =
220 coin.map {
221 { runningTotal: Int ->
222 println("TEST: $runningTotal - 10 = ${runningTotal - 10}")
223 runningTotal - 10
224 }
225 }
226
227 val eReset =
228 eSold.map {
229 { _: Int ->
230 println("TEST: Resetting")
231 price
232 }
233 }
234
235 val eUpdate = eInsert.mergeWith(eReset) { f, g -> { a -> g(f(a)) } }
236
237 val dTotal = StateLoop<Int>()
238 dTotal.loopback = eUpdate.sample(dTotal) { f, total -> f(total) }.holdState(price)
239
240 val eAmt = dTotal.changes
241 val bAmt = transactionally { dTotal.sample() }
242 eSold.loopback =
243 coin
244 .sample(bAmt) { coin, total -> coin to total }
245 .mapMaybe { (_, total) -> maybe { guard { total <= 10 } } }
246
247 val amts = eAmt.filter { amt -> amt >= 0 }
248
249 amts.observe { currentAmt = it }
250 eSold.observe { wasSold = true }
251
252 eSold.nextDeferred()
253 }
254
255 activateSpec(network) { buildSpec.applySpec() }
256
257 runCurrent()
258
259 println()
260 println()
261 coin.emit(Unit)
262 runCurrent()
263
264 assertEquals(40, currentAmt)
265
266 println()
267 println()
268 coin.emit(Unit)
269 runCurrent()
270
271 assertEquals(30, currentAmt)
272
273 println()
274 println()
275 coin.emit(Unit)
276 runCurrent()
277
278 assertEquals(20, currentAmt)
279
280 println()
281 println()
282 coin.emit(Unit)
283 runCurrent()
284
285 assertEquals(10, currentAmt)
286 assertEquals(false, wasSold)
287
288 println()
289 println()
290 coin.emit(Unit)
291 runCurrent()
292
293 assertEquals(true, wasSold)
294 assertEquals(50, currentAmt)
295 }
296
297 @Test
298 fun promptCleanup() = runFrpTest { network ->
299 val emitter = network.mutableEvents<Int>()
300 val stopper = network.mutableEvents<Unit>()
301
302 var result: Int? = null
303
304 val flow = activateSpecWithResult(network) { emitter.takeUntil(stopper).toSharedFlow() }
305 backgroundScope.launch { flow.collect { result = it } }
306 runCurrent()
307
308 emitter.emit(2)
309 runCurrent()
310
311 assertEquals(2, result)
312
313 stopper.emit(Unit)
314 runCurrent()
315 }
316
317 @Test
318 fun switchEvents() = runFrpTest { network ->
319 var currentSum: Int? = null
320
321 val switchHandler = network.mutableEvents<Pair<Events<Int>, String>>()
322 val aHandler = network.mutableEvents<Int>()
323 val stopHandler = network.mutableEvents<Unit>()
324 val bHandler = network.mutableEvents<Int>()
325
326 val sumFlow =
327 activateSpecWithResult(network) {
328 val switchE = EventsLoop<Events<Int>>()
329 switchE.loopback =
330 switchHandler.mapStateful { (inevents, name) ->
331 println("[onEach] Switching to: $name")
332 val nextSwitch =
333 switchE.skipNext().onEach { println("[onEach] switched-out") }
334 val stopEvent =
335 stopHandler
336 .onEach { println("[onEach] stopped") }
337 .mergeWith(nextSwitch) { _, b -> b }
338 inevents.takeUntil(stopEvent)
339 }
340
341 val adderE: Events<(Int) -> Int> =
342 switchE.holdState(emptyEvents).switchEvents().map { a ->
343 println("[onEach] new number $a")
344 ({ sum: Int ->
345 println("$a+$sum=${a + sum}")
346 sum + a
347 })
348 }
349
350 val sumD = StateLoop<Int>()
351 sumD.loopback =
352 adderE
353 .sample(sumD) { f, sum -> f(sum) }
354 .onEach { println("[onEach] writing sum: $it") }
355 .holdState(0)
356 val sumE = sumD.changes
357
358 sumE.toSharedFlow()
359 }
360
361 runCurrent()
362
363 backgroundScope.launch { sumFlow.collect { currentSum = it } }
364
365 runCurrent()
366
367 switchHandler.emit(aHandler to "A")
368 runCurrent()
369
370 aHandler.emit(1)
371 runCurrent()
372
373 assertEquals(1, currentSum)
374
375 aHandler.emit(2)
376 runCurrent()
377
378 assertEquals(3, currentSum)
379
380 aHandler.emit(3)
381 runCurrent()
382
383 assertEquals(6, currentSum)
384
385 aHandler.emit(4)
386 runCurrent()
387
388 assertEquals(10, currentSum)
389
390 aHandler.emit(5)
391 runCurrent()
392
393 assertEquals(15, currentSum)
394
395 switchHandler.emit(bHandler to "B")
396 runCurrent()
397
398 aHandler.emit(6)
399 runCurrent()
400
401 assertEquals(15, currentSum)
402
403 bHandler.emit(6)
404 runCurrent()
405
406 assertEquals(21, currentSum)
407
408 bHandler.emit(7)
409 runCurrent()
410
411 assertEquals(28, currentSum)
412
413 bHandler.emit(8)
414 runCurrent()
415
416 assertEquals(36, currentSum)
417
418 bHandler.emit(9)
419 runCurrent()
420
421 assertEquals(45, currentSum)
422
423 bHandler.emit(10)
424 runCurrent()
425
426 assertEquals(55, currentSum)
427
428 println()
429 println("Stopping: B")
430 stopHandler.emit(Unit) // bHandler.complete()
431 runCurrent()
432
433 bHandler.emit(20)
434 runCurrent()
435
436 assertEquals(55, currentSum)
437
438 println()
439 println("Switching to: A2")
440 switchHandler.emit(aHandler to "A2")
441 runCurrent()
442
443 println("aHandler.emit(11)")
444 aHandler.emit(11)
445 runCurrent()
446
447 assertEquals(66, currentSum)
448
449 aHandler.emit(12)
450 runCurrent()
451
452 assertEquals(78, currentSum)
453
454 aHandler.emit(13)
455 runCurrent()
456
457 assertEquals(91, currentSum)
458
459 aHandler.emit(14)
460 runCurrent()
461
462 assertEquals(105, currentSum)
463
464 aHandler.emit(15)
465 runCurrent()
466
467 assertEquals(120, currentSum)
468
469 stopHandler.emit(Unit)
470 runCurrent()
471
472 aHandler.emit(100)
473 runCurrent()
474
475 assertEquals(120, currentSum)
476 }
477
478 @Test
479 fun switchIndirect() = runFrpTest { network ->
480 val emitter = network.mutableEvents<Unit>()
481 activateSpec(network) {
482 emptyEvents.map { emitter.map { 1 } }.flatten().map { "$it" }.observe()
483 }
484 runCurrent()
485 }
486
487 @Test
488 fun switchInWithResult() = runFrpTest { network ->
489 val emitter = network.mutableEvents<Unit>()
490 val out =
491 activateSpecWithResult(network) {
492 emitter.map { emitter.map { 1 } }.flatten().toSharedFlow()
493 }
494 val result = out.stateIn(backgroundScope, SharingStarted.Eagerly, null)
495 runCurrent()
496 emitter.emit(Unit)
497 runCurrent()
498 assertEquals(null, result.value)
499 }
500
501 @Test
502 fun switchInCompleted() = runFrpTest { network ->
503 val outputs = mutableListOf<Int>()
504
505 val switchAH = network.mutableEvents<Unit>()
506 val intAH = network.mutableEvents<Int>()
507 val stopEmitter = network.mutableEvents<Unit>()
508
509 val top = buildSpec {
510 val intS = intAH.takeUntil(stopEmitter)
511 val switched = switchAH.map { intS }.flatten()
512 switched.toSharedFlow()
513 }
514 val flow = activateSpecWithResult(network) { top.applySpec() }
515 backgroundScope.launch { flow.collect { outputs.add(it) } }
516 runCurrent()
517
518 switchAH.emit(Unit)
519 runCurrent()
520
521 stopEmitter.emit(Unit)
522 runCurrent()
523
524 // assertEquals(0, intAH.subscriptionCount.value)
525 intAH.emit(10)
526 runCurrent()
527
528 assertEquals(true, outputs.isEmpty())
529
530 switchAH.emit(Unit)
531 runCurrent()
532
533 // assertEquals(0, intAH.subscriptionCount.value)
534 intAH.emit(10)
535 runCurrent()
536
537 assertEquals(true, outputs.isEmpty())
538 }
539
540 @Test
541 fun switchEvents_outerCompletesFirst() = runFrpTest { network ->
542 var stepResult: Int? = null
543
544 val switchAH = network.mutableEvents<Unit>()
545 val switchStopEmitter = network.mutableEvents<Unit>()
546 val intStopEmitter = network.mutableEvents<Unit>()
547 val intAH = network.mutableEvents<Int>()
548 val flow =
549 activateSpecWithResult(network) {
550 val intS = intAH.takeUntil(intStopEmitter)
551 val switchS = switchAH.takeUntil(switchStopEmitter)
552
553 val switched = switchS.map { intS }.flatten()
554 switched.toSharedFlow()
555 }
556 backgroundScope.launch { flow.collect { stepResult = it } }
557 runCurrent()
558
559 // assertEquals(0, intAH.subscriptionCount.value)
560 intAH.emit(100)
561 runCurrent()
562
563 assertEquals(null, stepResult)
564
565 switchAH.emit(Unit)
566 runCurrent()
567
568 // assertEquals(1, intAH.subscriptionCount.value)
569
570 intAH.emit(5)
571 runCurrent()
572
573 assertEquals(5, stepResult)
574
575 println("stop outer")
576 switchStopEmitter.emit(Unit) // switchAH.complete()
577 runCurrent()
578
579 // assertEquals(1, intAH.subscriptionCount.value)
580 // assertEquals(0, switchAH.subscriptionCount.value)
581
582 intAH.emit(10)
583 runCurrent()
584
585 assertEquals(10, stepResult)
586
587 println("stop inner")
588 intStopEmitter.emit(Unit) // intAH.complete()
589 runCurrent()
590
591 // assertEquals(present(10), network.await())
592 }
593
594 @Test
595 fun mapEvents() = runFrpTest { network ->
596 val emitter = network.mutableEvents<Int>()
597 var stepResult: Int? = null
598
599 val flow =
600 activateSpecWithResult(network) {
601 val mappedS = emitter.map { it * it }
602 mappedS.toSharedFlow()
603 }
604
605 backgroundScope.launch { flow.collect { stepResult = it } }
606 runCurrent()
607
608 emitter.emit(1)
609 runCurrent()
610
611 assertEquals(1, stepResult)
612
613 emitter.emit(2)
614 runCurrent()
615
616 assertEquals(4, stepResult)
617
618 emitter.emit(10)
619 runCurrent()
620
621 assertEquals(100, stepResult)
622 }
623
624 @Test
625 fun mapTransactional() = runFrpTest { network ->
626 var doubledResult: Int? = null
627 var pullValue = 0
628 val a = transactionally { pullValue }
629 val b = transactionally { a.sample() * 2 }
630 val emitter = network.mutableEvents<Unit>()
631 val flow =
632 activateSpecWithResult(network) {
633 val sampleB = emitter.sample(b) { _, b -> b }
634 sampleB.toSharedFlow()
635 }
636
637 backgroundScope.launch { flow.collect { doubledResult = it } }
638
639 runCurrent()
640
641 emitter.emit(Unit)
642 runCurrent()
643
644 assertEquals(0, doubledResult)
645
646 pullValue = 5
647 emitter.emit(Unit)
648 runCurrent()
649
650 assertEquals(10, doubledResult)
651 }
652
653 @Test
654 fun mapState() = runFrpTest { network ->
655 val emitter = network.mutableEvents<Int>()
656 var stepResult: Int? = null
657 val flow =
658 activateSpecWithResult(network) {
659 val state = emitter.holdState(0).map { it + 2 }
660 val stateCurrent = transactionally { state.sample() }
661 val stateChanges = state.changes
662 val sampleState = emitter.sample(stateCurrent) { _, b -> b }
663 val merge = stateChanges.mergeWith(sampleState) { a, b -> a + b }
664 merge.toSharedFlow()
665 }
666 backgroundScope.launch { flow.collect { stepResult = it } }
667 runCurrent()
668
669 emitter.emit(1)
670 runCurrent()
671
672 assertEquals(5, stepResult)
673
674 emitter.emit(10)
675 runCurrent()
676
677 assertEquals(15, stepResult)
678 }
679
680 @Test
681 fun partitionEither() = runFrpTest { network ->
682 val emitter = network.mutableEvents<Either<Int, Int>>()
683 val result =
684 activateSpecWithResult(network) {
685 val (l, r) = emitter.partitionEither()
686 val pDiamond =
687 l.map { it * 2 }
688 .mergeWith(r.map { it * -1 }) { _, _ -> error("unexpected coincidence") }
689 pDiamond.holdState(null).toStateFlow()
690 }
691 runCurrent()
692
693 emitter.emit(First(10))
694 runCurrent()
695
696 assertEquals(20, result.value)
697
698 emitter.emit(Second(30))
699 runCurrent()
700
701 assertEquals(-30, result.value)
702 }
703
704 @Test
705 fun accumState() = runFrpTest { network ->
706 val emitter = network.mutableEvents<Int>()
707 val sampler = network.mutableEvents<Unit>()
708 var stepResult: Int? = null
709 val flow =
710 activateSpecWithResult(network) {
711 val sumState =
712 emitter.map { a -> { b: Int -> a + b } }.foldState(0) { f, a -> f(a) }
713
714 sumState.changes
715 .mergeWith(sampler.sample(sumState) { _, sum -> sum }) { _, _ ->
716 error("Unexpected coincidence")
717 }
718 .toSharedFlow()
719 }
720
721 backgroundScope.launch { flow.collect { stepResult = it } }
722 runCurrent()
723
724 emitter.emit(5)
725 runCurrent()
726 assertEquals(5, stepResult)
727
728 emitter.emit(10)
729 runCurrent()
730 assertEquals(15, stepResult)
731
732 sampler.emit(Unit)
733 runCurrent()
734 assertEquals(15, stepResult)
735 }
736
737 @Test
738 fun mergeEventss() = runFrpTest { network ->
739 val first = network.mutableEvents<Int>()
740 val stopFirst = network.mutableEvents<Unit>()
741 val second = network.mutableEvents<Int>()
742 val stopSecond = network.mutableEvents<Unit>()
743 var stepResult: Int? = null
744
745 val flow: SharedFlow<Int>
746 val setupDuration = measureTime {
747 flow =
748 activateSpecWithResult(network) {
749 val firstS = first.takeUntil(stopFirst)
750 val secondS = second.takeUntil(stopSecond)
751 val mergedS =
752 firstS.mergeWith(secondS) { _, _ -> error("Unexpected coincidence") }
753 mergedS.toSharedFlow()
754 // mergedS.last("onComplete")
755 }
756 backgroundScope.launch { flow.collect { stepResult = it } }
757 runCurrent()
758 }
759
760 // assertEquals(1, first.subscriptionCount.value)
761 // assertEquals(1, second.subscriptionCount.value)
762
763 val firstEmitDuration = measureTime {
764 first.emit(1)
765 runCurrent()
766 }
767
768 assertEquals(1, stepResult)
769
770 val secondEmitDuration = measureTime {
771 second.emit(2)
772 runCurrent()
773 }
774
775 assertEquals(2, stepResult)
776
777 val stopFirstDuration = measureTime {
778 stopFirst.emit(Unit)
779 runCurrent()
780 }
781
782 // assertEquals(0, first.subscriptionCount.value)
783 val testDeadEmitFirstDuration = measureTime {
784 first.emit(10)
785 runCurrent()
786 }
787
788 assertEquals(2, stepResult)
789
790 // assertEquals(1, second.subscriptionCount.value)
791
792 val secondEmitDuration2 = measureTime {
793 second.emit(3)
794 runCurrent()
795 }
796
797 assertEquals(3, stepResult)
798
799 val stopSecondDuration = measureTime {
800 stopSecond.emit(Unit)
801 runCurrent()
802 }
803
804 // assertEquals(0, second.subscriptionCount.value)
805 val testDeadEmitSecondDuration = measureTime {
806 second.emit(10)
807 runCurrent()
808 }
809
810 assertEquals(3, stepResult)
811
812 println(
813 """
814 setupDuration: ${setupDuration.toString(DurationUnit.MILLISECONDS, 2)}
815 firstEmitDuration: ${firstEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
816 secondEmitDuration: ${secondEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
817 stopFirstDuration: ${stopFirstDuration.toString(DurationUnit.MILLISECONDS, 2)}
818 testDeadEmitFirstDuration: ${
819 testDeadEmitFirstDuration.toString(
820 DurationUnit.MILLISECONDS,
821 2,
822 )
823 }
824 secondEmitDuration2: ${secondEmitDuration2.toString(DurationUnit.MILLISECONDS, 2)}
825 stopSecondDuration: ${stopSecondDuration.toString(DurationUnit.MILLISECONDS, 2)}
826 testDeadEmitSecondDuration: ${
827 testDeadEmitSecondDuration.toString(
828 DurationUnit.MILLISECONDS,
829 2,
830 )
831 }
832 """
833 .trimIndent()
834 )
835 }
836
837 @Test
838 fun sampleCancel() = runFrpTest { network ->
839 val updater = network.mutableEvents<Int>()
840 val stopUpdater = network.mutableEvents<Unit>()
841 val sampler = network.mutableEvents<Unit>()
842 val stopSampler = network.mutableEvents<Unit>()
843 var stepResult: Int? = null
844 val flow =
845 activateSpecWithResult(network) {
846 val stopSamplerFirst = stopSampler
847 val samplerS = sampler.takeUntil(stopSamplerFirst)
848 val stopUpdaterFirst = stopUpdater
849 val updaterS = updater.takeUntil(stopUpdaterFirst)
850 val sampledS = samplerS.sample(updaterS.holdState(0)) { _, b -> b }
851 sampledS.toSharedFlow()
852 }
853
854 backgroundScope.launch { flow.collect { stepResult = it } }
855 runCurrent()
856
857 updater.emit(1)
858 runCurrent()
859
860 sampler.emit(Unit)
861 runCurrent()
862
863 assertEquals(1, stepResult)
864
865 stopSampler.emit(Unit)
866 runCurrent()
867
868 // assertEquals(0, updater.subscriptionCount.value)
869 // assertEquals(0, sampler.subscriptionCount.value)
870 updater.emit(10)
871 runCurrent()
872
873 sampler.emit(Unit)
874 runCurrent()
875
876 assertEquals(1, stepResult)
877 }
878
879 @Test
880 fun combineStates_differentUpstreams() = runFrpTest { network ->
881 val a = network.mutableEvents<Int>()
882 val b = network.mutableEvents<Int>()
883 var observed: Pair<Int, Int>? = null
884 val state =
885 activateSpecWithResult(network) {
886 val state = combine(a.holdState(0), b.holdState(0)) { a, b -> Pair(a, b) }
887 state.changes.observe { observed = it }
888 state
889 }
890 assertEquals(0 to 0, network.transact { state.sample() })
891 assertEquals(null, observed)
892 a.emit(5)
893 assertEquals(5 to 0, observed)
894 assertEquals(5 to 0, network.transact { state.sample() })
895 b.emit(3)
896 assertEquals(5 to 3, observed)
897 assertEquals(5 to 3, network.transact { state.sample() })
898 }
899
900 @Test
901 fun sampleCombinedStates() = runFrpTest { network ->
902 val updater = network.mutableEvents<Int>()
903 val emitter = network.mutableEvents<Unit>()
904
905 val result =
906 activateSpecWithResult(network) {
907 val bA = updater.map { it * 2 }.holdState(0)
908 val bB = updater.holdState(0)
909 val combineD: State<Pair<Int, Int>> = bA.combine(bB) { a, b -> a to b }
910 val sampleS = emitter.sample(combineD) { _, b -> b }
911 sampleS.nextDeferred()
912 }
913 println("launching")
914 runCurrent()
915
916 println("emitting update")
917 updater.emit(10)
918 runCurrent()
919
920 println("emitting sampler")
921 emitter.emit(Unit)
922 runCurrent()
923
924 println("asserting")
925 assertEquals(20 to 10, result.await())
926 }
927
928 @Test
929 fun switchMapPromptly() = runFrpTest { network ->
930 val emitter = network.mutableEvents<Unit>()
931 val result =
932 activateSpecWithResult(network) {
933 emitter
934 .map { emitter.map { 1 }.map { it + 1 }.map { it * 2 } }
935 .holdState(emptyEvents)
936 .switchEventsPromptly()
937 .nextDeferred()
938 }
939 runCurrent()
940
941 emitter.emit(Unit)
942 runCurrent()
943
944 assertTrue("Not complete", result.isCompleted)
945 assertEquals(4, result.await())
946 }
947
948 @Test
949 fun switchDeeper() = runFrpTest { network ->
950 val emitter = network.mutableEvents<Unit>()
951 val e2 = network.mutableEvents<Unit>()
952 val result =
953 activateSpecWithResult(network) {
954 val tres =
955 merge(e2.map { 1 }, e2.map { 2 }, transformCoincidence = { a, b -> a + b })
956 tres.observeBuild()
957 val switch = emitter.map { tres }.flatten()
958 merge(switch, e2.map { null }, transformCoincidence = { a, _ -> a })
959 .filterNotNull()
960 .nextDeferred()
961 }
962 runCurrent()
963
964 emitter.emit(Unit)
965 runCurrent()
966
967 e2.emit(Unit)
968 runCurrent()
969
970 assertTrue("Not complete", result.isCompleted)
971 assertEquals(3, result.await())
972 }
973
974 @Test
975 fun recursionBasic() = runFrpTest { network ->
976 val add1 = network.mutableEvents<Unit>()
977 val sub1 = network.mutableEvents<Unit>()
978 val stepResult: StateFlow<Int> =
979 activateSpecWithResult(network) {
980 val dSum = StateLoop<Int>()
981 val sAdd1 = add1.sample(dSum) { _, sum -> sum + 1 }
982 val sMinus1 = sub1.sample(dSum) { _, sum -> sum - 1 }
983 dSum.loopback = sAdd1.mergeWith(sMinus1) { a, _ -> a }.holdState(0)
984 dSum.toStateFlow()
985 }
986 runCurrent()
987
988 add1.emit(Unit)
989 runCurrent()
990
991 assertEquals(1, stepResult.value)
992
993 add1.emit(Unit)
994 runCurrent()
995
996 assertEquals(2, stepResult.value)
997
998 sub1.emit(Unit)
999 runCurrent()
1000
1001 assertEquals(1, stepResult.value)
1002 }
1003
1004 @Test
1005 fun recursiveState() = runFrpTest { network ->
1006 val e = network.mutableEvents<Unit>()
1007 var changes = 0
1008 val state =
1009 activateSpecWithResult(network) {
1010 val s = EventsLoop<Unit>()
1011 val deferred = s.map { stateOf(null) }
1012 val e3 = e.map { stateOf(Unit) }
1013 val flattened =
1014 e3.mergeWith(deferred) { a, _ -> a }.holdState(stateOf(null)).flatten()
1015 s.loopback = emptyEvents
1016 flattened.toStateFlow()
1017 }
1018
1019 backgroundScope.launch { state.collect { changes++ } }
1020 runCurrent()
1021 }
1022
1023 @Test
1024 fun fanOut() = runFrpTest { network ->
1025 val e = network.mutableEvents<Map<String, Int>>()
1026 val (fooFlow, barFlow) =
1027 activateSpecWithResult(network) {
1028 val selector = e.groupByKey()
1029 val foos = selector.eventsForKey("foo")
1030 val bars = selector.eventsForKey("bar")
1031 foos.toSharedFlow() to bars.toSharedFlow()
1032 }
1033 val stateFlow = fooFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1034 backgroundScope.launch { barFlow.collect { error("unexpected bar") } }
1035 runCurrent()
1036
1037 assertEquals(null, stateFlow.value)
1038
1039 e.emit(mapOf("foo" to 1))
1040 runCurrent()
1041
1042 assertEquals(1, stateFlow.value)
1043 }
1044
1045 @Test
1046 fun propagateError() {
1047 try {
1048 runFrpTest { network ->
1049 runCurrent()
1050 try {
1051 network.transact<Unit> { error("message") }
1052 fail("caller did not throw exception")
1053 } catch (_: IllegalStateException) {}
1054 }
1055 fail("scheduler did not throw exception")
1056 } catch (_: IllegalStateException) {}
1057 }
1058
1059 @Test
1060 fun fanOutLateSubscribe() = runFrpTest { network ->
1061 val e = network.mutableEvents<Map<String, Int>>()
1062 val barFlow =
1063 activateSpecWithResult(network) {
1064 val selector = e.groupByKey()
1065 selector
1066 .eventsForKey("foo")
1067 .map { selector.eventsForKey("bar") }
1068 .holdState(emptyEvents)
1069 .switchEventsPromptly()
1070 .toSharedFlow()
1071 }
1072 val stateFlow = barFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1073 runCurrent()
1074
1075 assertEquals(null, stateFlow.value)
1076
1077 e.emit(mapOf("foo" to 0, "bar" to 1))
1078 runCurrent()
1079
1080 assertEquals(1, stateFlow.value)
1081 }
1082
1083 @Test
1084 fun inputEventsCompleted() = runFrpTest { network ->
1085 val results = mutableListOf<Int>()
1086 val e = network.mutableEvents<Int>()
1087 activateSpec(network) { e.nextOnly().observe { results.add(it) } }
1088 runCurrent()
1089
1090 e.emit(10)
1091 runCurrent()
1092
1093 assertEquals(listOf(10), results)
1094
1095 e.emit(20)
1096 runCurrent()
1097 assertEquals(listOf(10), results)
1098 }
1099
1100 @Test
1101 fun fanOutThenMergeIncrementally() = runFrpTest { network ->
1102 // A events of group updates, where a group is a events of child updates, where a child is a
1103 // stateflow
1104 val e = network.mutableEvents<Map<Int, Maybe<Events<Map<Int, Maybe<StateFlow<String>>>>>>>()
1105 println("fanOutMergeInc START")
1106 val state =
1107 activateSpecWithResult(network) {
1108 // Convert nested Flows to nested Events/State
1109 val emitter: Events<Map<Int, Maybe<Events<Map<Int, Maybe<State<String>>>>>>> =
1110 e.mapBuild { m ->
1111 m.mapValues { (_, mFlow) ->
1112 mFlow.map {
1113 it.mapBuild { m2 ->
1114 println("m2: $m2")
1115 m2.mapValues { (_, mState) ->
1116 mState.map { stateFlow -> stateFlow.toState() }
1117 }
1118 }
1119 }
1120 }
1121 }
1122 // Accumulate all of our updates into a single State
1123 val accState: State<Map<Int, Map<Int, String>>> =
1124 emitter
1125 .mapStateful {
1126 changeMap: Map<Int, Maybe<Events<Map<Int, Maybe<State<String>>>>>> ->
1127 changeMap.mapValues { (groupId, mGroupChanges) ->
1128 mGroupChanges.map {
1129 groupChanges: Events<Map<Int, Maybe<State<String>>>> ->
1130 // New group
1131 val childChangeById = groupChanges.groupByKey()
1132 val map: Events<Map<Int, Maybe<Events<Maybe<State<String>>>>>> =
1133 groupChanges.mapStateful {
1134 gChangeMap: Map<Int, Maybe<State<String>>> ->
1135 println("gChangeMap: $gChangeMap")
1136 gChangeMap.mapValues { (childId, mChild) ->
1137 mChild.map { child: State<String> ->
1138 println("new child $childId in the house")
1139 // New child
1140 val eRemoved =
1141 childChangeById
1142 .eventsForKey(childId)
1143 .filter { it === Absent }
1144 .onEach {
1145 println(
1146 "removing? (groupId=$groupId, childId=$childId)"
1147 )
1148 }
1149 .nextOnly()
1150
1151 val addChild: Events<Maybe<State<String>>> =
1152 now.map { mChild }
1153 .onEach {
1154 println(
1155 "addChild (groupId=$groupId, childId=$childId) ${child.sample()}"
1156 )
1157 }
1158
1159 val removeChild: Events<Maybe<State<String>>> =
1160 eRemoved
1161 .onEach {
1162 println(
1163 "removeChild (groupId=$groupId, childId=$childId)"
1164 )
1165 }
1166 .map { Maybe.absent() }
1167
1168 addChild.mergeWith(removeChild) { _, _ ->
1169 error("unexpected coincidence")
1170 }
1171 }
1172 }
1173 }
1174 val mergeIncrementally: Events<Map<Int, Maybe<State<String>>>> =
1175 map.onEach { println("merge patch: $it") }
1176 .mergeEventsIncrementallyPromptly()
1177 mergeIncrementally
1178 .onEach { println("foldmap patch: $it") }
1179 .foldStateMapIncrementally()
1180 .flatMap { it.combine() }
1181 }
1182 }
1183 }
1184 .onEach { println("fold patch: $it") }
1185 .foldStateMapIncrementally()
1186 .flatMap { it.combine() }
1187
1188 accState.toStateFlow()
1189 }
1190 runCurrent()
1191
1192 assertEquals(emptyMap(), state.value)
1193
1194 val emitter2 = network.mutableEvents<Map<Int, Maybe<StateFlow<String>>>>()
1195 println()
1196 println("init outer 0")
1197 e.emit(mapOf(0 to Maybe.present(emitter2.onEach { println("emitter2 emit: $it") })))
1198 runCurrent()
1199
1200 assertEquals(mapOf(0 to emptyMap()), state.value)
1201
1202 println()
1203 println("init inner 10")
1204 emitter2.emit(mapOf(10 to Maybe.present(MutableStateFlow("(0, 10)"))))
1205 runCurrent()
1206
1207 assertEquals(mapOf(0 to mapOf(10 to "(0, 10)")), state.value)
1208
1209 // replace
1210 println()
1211 println("replace inner 10")
1212 emitter2.emit(mapOf(10 to Maybe.present(MutableStateFlow("(1, 10)"))))
1213 runCurrent()
1214
1215 assertEquals(mapOf(0 to mapOf(10 to "(1, 10)")), state.value)
1216
1217 // remove
1218 emitter2.emit(mapOf(10 to Maybe.absent()))
1219 runCurrent()
1220
1221 assertEquals(mapOf(0 to emptyMap()), state.value)
1222
1223 // add again
1224 emitter2.emit(mapOf(10 to Maybe.present(MutableStateFlow("(2, 10)"))))
1225 runCurrent()
1226
1227 assertEquals(mapOf(0 to mapOf(10 to "(2, 10)")), state.value)
1228
1229 // LogEnabled = true
1230
1231 println("batch update")
1232
1233 // batch update
1234 emitter2.emit(
1235 mapOf(
1236 10 to Maybe.absent(),
1237 11 to Maybe.present(MutableStateFlow("(0, 11)")),
1238 12 to Maybe.present(MutableStateFlow("(0, 12)")),
1239 )
1240 )
1241 runCurrent()
1242
1243 assertEquals(mapOf(0 to mapOf(11 to "(0, 11)", 12 to "(0, 12)")), state.value)
1244 }
1245
1246 @Test
1247 fun applyLatestNetworkChanges() = runFrpTest { network ->
1248 val newCount = network.mutableEvents<BuildSpec<Flow<Int>>>()
1249 val flowOfFlows: Flow<Flow<Int>> =
1250 activateSpecWithResult(network) { newCount.applyLatestSpec().toSharedFlow() }
1251 runCurrent()
1252
1253 val incCount = network.mutableEvents<Unit>()
1254 fun newFlow(): BuildSpec<SharedFlow<Int>> = buildSpec {
1255 launchEffect {
1256 try {
1257 println("new flow!")
1258 awaitCancellation()
1259 } finally {
1260 println("cancelling old flow")
1261 }
1262 }
1263 lateinit var count: State<Int>
1264 count =
1265 incCount
1266 .onEach { println("incrementing ${count.sample()}") }
1267 .foldState(0) { _, c -> c + 1 }
1268 count.changes.toSharedFlow()
1269 }
1270
1271 var outerCount = 0
1272 val lastEvent: StateFlow<Pair<StateFlow<Int?>, StateFlow<Int?>>> =
1273 flowOfFlows
1274 .map { it.stateIn(backgroundScope, SharingStarted.Eagerly, null) }
1275 .pairwise(MutableStateFlow(null))
1276 .onEach { outerCount++ }
1277 .stateIn(
1278 backgroundScope,
1279 SharingStarted.Eagerly,
1280 MutableStateFlow(null) to MutableStateFlow(null),
1281 )
1282
1283 runCurrent()
1284
1285 newCount.emit(newFlow())
1286 runCurrent()
1287
1288 assertEquals(1, outerCount)
1289 // assertEquals(1, incCount.subscriptionCount)
1290 assertNull(lastEvent.value.second.value)
1291
1292 incCount.emit(Unit)
1293 runCurrent()
1294
1295 println("checking")
1296 assertEquals(1, lastEvent.value.second.value)
1297
1298 incCount.emit(Unit)
1299 runCurrent()
1300
1301 assertEquals(2, lastEvent.value.second.value)
1302
1303 newCount.emit(newFlow())
1304 runCurrent()
1305 incCount.emit(Unit)
1306 runCurrent()
1307
1308 // verify old flow is not getting updates
1309 assertEquals(2, lastEvent.value.first.value)
1310 // but the new one is
1311 assertEquals(1, lastEvent.value.second.value)
1312 }
1313
1314 @Test
1315 fun buildScope_stateAccumulation() = runFrpTest { network ->
1316 val input = network.mutableEvents<Unit>()
1317 var observedCount: Int? = null
1318 activateSpec(network) {
1319 val (c, j) = asyncScope { input.foldState(0) { _, x -> x + 1 } }
1320 deferredBuildScopeAction { c.value.observe { observedCount = it } }
1321 }
1322 runCurrent()
1323 assertEquals(0, observedCount)
1324
1325 input.emit(Unit)
1326 runCurrent()
1327 assertEquals(1, observedCount)
1328
1329 input.emit(Unit)
1330 runCurrent()
1331 assertEquals(2, observedCount)
1332 }
1333
1334 @Test
1335 fun effect() = runFrpTest { network ->
1336 val input = network.mutableEvents<Unit>()
1337 var effectRunning = false
1338 var count = 0
1339 activateSpec(network) {
1340 val j = launchEffect {
1341 effectRunning = true
1342 try {
1343 awaitCancellation()
1344 } finally {
1345 effectRunning = false
1346 }
1347 }
1348 merge(emptyEvents, input.nextOnly()).observe {
1349 count++
1350 j.cancel()
1351 }
1352 }
1353 runCurrent()
1354 assertEquals(true, effectRunning)
1355 assertEquals(0, count)
1356
1357 println("1")
1358 input.emit(Unit)
1359 assertEquals(false, effectRunning)
1360 assertEquals(1, count)
1361
1362 println("2")
1363 input.emit(Unit)
1364 assertEquals(1, count)
1365 println("3")
1366 input.emit(Unit)
1367 assertEquals(1, count)
1368 }
1369
1370 @Test
1371 fun observeEffect_disposeHandle() = runFrpTest { network ->
1372 val input = network.mutableEvents<Unit>()
1373 val stopper = network.mutableEvents<Unit>()
1374 var runningCount = 0
1375 val specJob =
1376 activateSpec(network) {
1377 val handle =
1378 input.observe {
1379 launch {
1380 runningCount++
1381 awaitClose { runningCount-- }
1382 }
1383 }
1384 stopper.nextOnly().observe { handle.dispose() }
1385 }
1386 runCurrent()
1387 assertEquals(0, runningCount)
1388
1389 input.emit(Unit)
1390 assertEquals(1, runningCount)
1391
1392 input.emit(Unit)
1393 assertEquals(2, runningCount)
1394
1395 stopper.emit(Unit)
1396 assertEquals(2, runningCount)
1397
1398 input.emit(Unit)
1399 assertEquals(2, runningCount)
1400
1401 specJob.cancel()
1402 runCurrent()
1403 assertEquals(0, runningCount)
1404 }
1405
1406 @Test
1407 fun observeEffect_takeUntil() = runFrpTest { network ->
1408 val input = network.mutableEvents<Unit>()
1409 val stopper = network.mutableEvents<Unit>()
1410 var runningCount = 0
1411 val specJob =
1412 activateSpec(network) {
1413 input.takeUntil(stopper).observe {
1414 launch {
1415 runningCount++
1416 awaitClose { runningCount-- }
1417 }
1418 }
1419 }
1420 runCurrent()
1421 assertEquals(0, runningCount)
1422
1423 input.emit(Unit)
1424 assertEquals(1, runningCount)
1425
1426 input.emit(Unit)
1427 assertEquals(2, runningCount)
1428
1429 stopper.emit(Unit)
1430 assertEquals(2, runningCount)
1431
1432 input.emit(Unit)
1433 assertEquals(2, runningCount)
1434
1435 specJob.cancel()
1436 runCurrent()
1437 assertEquals(0, runningCount)
1438 }
1439
1440 private fun runFrpTest(
1441 timeout: Duration = 3.seconds,
1442 block: suspend TestScope.(KairosNetwork) -> Unit,
1443 ) {
1444 runTest(timeout = timeout) {
1445 val network = backgroundScope.launchKairosNetwork()
1446 runCurrent()
1447 block(network)
1448 }
1449 }
1450
1451 private fun TestScope.activateSpec(network: KairosNetwork, spec: BuildSpec<*>) =
1452 backgroundScope.launch { network.activateSpec(spec) }
1453
1454 private suspend fun <R> TestScope.activateSpecWithResult(
1455 network: KairosNetwork,
1456 spec: BuildSpec<R>,
1457 ): R =
1458 CompletableDeferred<R>()
1459 .apply { activateSpec(network) { complete(spec.applySpec()) } }
1460 .await()
1461 }
1462
assertEqualsnull1463 private fun <T> assertEquals(expected: T, actual: T) =
1464 org.junit.Assert.assertEquals(expected, actual)
1465
1466 private fun <A> Flow<A>.pairwise(init: A): Flow<Pair<A, A>> = flow {
1467 var prev = init
1468 collect {
1469 emit(prev to it)
1470 prev = it
1471 }
1472 }
1473