• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package com.android.systemui.kairos
2 
3 import com.android.systemui.kairos.util.Either
4 import com.android.systemui.kairos.util.Either.First
5 import com.android.systemui.kairos.util.Either.Second
6 import com.android.systemui.kairos.util.Maybe
7 import com.android.systemui.kairos.util.Maybe.Absent
8 import com.android.systemui.kairos.util.map
9 import com.android.systemui.kairos.util.maybe
10 import kotlin.time.Duration
11 import kotlin.time.Duration.Companion.seconds
12 import kotlin.time.DurationUnit
13 import kotlin.time.measureTime
14 import kotlinx.coroutines.CompletableDeferred
15 import kotlinx.coroutines.ExperimentalCoroutinesApi
16 import kotlinx.coroutines.async
17 import kotlinx.coroutines.awaitCancellation
18 import kotlinx.coroutines.flow.Flow
19 import kotlinx.coroutines.flow.MutableSharedFlow
20 import kotlinx.coroutines.flow.MutableStateFlow
21 import kotlinx.coroutines.flow.SharedFlow
22 import kotlinx.coroutines.flow.SharingStarted
23 import kotlinx.coroutines.flow.StateFlow
24 import kotlinx.coroutines.flow.first
25 import kotlinx.coroutines.flow.flow
26 import kotlinx.coroutines.flow.map
27 import kotlinx.coroutines.flow.onEach
28 import kotlinx.coroutines.flow.stateIn
29 import kotlinx.coroutines.flow.toCollection
30 import kotlinx.coroutines.launch
31 import kotlinx.coroutines.test.TestScope
32 import kotlinx.coroutines.test.runCurrent
33 import kotlinx.coroutines.test.runTest
34 import org.junit.Assert.assertNull
35 import org.junit.Assert.assertTrue
36 import org.junit.Assert.fail
37 import org.junit.Test
38 
39 @OptIn(ExperimentalCoroutinesApi::class)
40 class KairosTests {
41 
42     @Test
43     fun basic() = runFrpTest { network ->
44         val emitter = network.mutableEvents<Int>()
45         var result: Int? = null
46         activateSpec(network) { emitter.observe { result = it } }
47         runCurrent()
48         emitter.emit(3)
49         runCurrent()
50         assertEquals(3, result)
51         runCurrent()
52     }
53 
54     @Test
55     fun basicEvents() = runFrpTest { network ->
56         val emitter = network.mutableEvents<Int>()
57         println("starting network")
58         val result = activateSpecWithResult(network) { emitter.nextDeferred() }
59         runCurrent()
60         println("emitting")
61         emitter.emit(3)
62         runCurrent()
63         println("awaiting")
64         assertEquals(3, result.await())
65         runCurrent()
66     }
67 
68     @Test
69     fun basicState() = runFrpTest { network ->
70         val emitter = network.mutableEvents<Int>()
71         val result = activateSpecWithResult(network) { emitter.holdState(0).changes.nextDeferred() }
72         runCurrent()
73 
74         emitter.emit(3)
75         runCurrent()
76 
77         assertEquals(3, result.await())
78     }
79 
80     @Test
81     fun basicEvent() = runFrpTest { network ->
82         val emitter = MutableSharedFlow<Int>()
83         val result = activateSpecWithResult(network) { async { emitter.first() } }
84         runCurrent()
85         emitter.emit(1)
86         runCurrent()
87         assertTrue("Result eventual has not completed.", result.isCompleted)
88         assertEquals(1, result.await())
89     }
90 
91     @Test
92     fun basicTransactional() = runFrpTest { network ->
93         var value: Int? = null
94         var bSource = 1
95         val emitter = network.mutableEvents<Unit>()
96         // Sampling this transactional will increment the source count.
97         val transactional = transactionally { bSource++ }
98         measureTime {
99                 activateSpecWithResult(network) {
100                         // Two different flows that sample the same transactional.
101                         (0 until 2).map {
102                             val sampled = emitter.sample(transactional) { _, v -> v }
103                             sampled.toSharedFlow()
104                         }
105                     }
106                     .forEach { backgroundScope.launch { it.collect { value = it } } }
107                 runCurrent()
108             }
109             .also { println("setup: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
110 
111         measureTime {
112                 emitter.emit(Unit)
113                 runCurrent()
114             }
115             .also { println("emit 1: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
116 
117         // Even though the transactional would be sampled twice, the first result is cached.
118         assertEquals(2, bSource)
119         assertEquals(1, value)
120 
121         measureTime {
122                 bSource = 10
123                 emitter.emit(Unit)
124                 runCurrent()
125             }
126             .also { println("emit 2: ${it.toString(DurationUnit.MILLISECONDS, 2)}") }
127 
128         assertEquals(11, bSource)
129         assertEquals(10, value)
130     }
131 
132     @Test
133     fun diamondGraph() = runFrpTest { network ->
134         val flow = network.mutableEvents<Int>()
135         val ouevents =
136             activateSpecWithResult(network) {
137                 // map Events like we map Flow
138                 val left = flow.map { "left" to it }.onEach { println("left: $it") }
139                 val right = flow.map { "right" to it }.onEach { println("right: $it") }
140 
141                 // convert Eventss to States so that they can be combined
142                 val combined =
143                     left.holdState("left" to 0).combine(right.holdState("right" to 0)) { l, r ->
144                         l to r
145                     }
146                 combined.changes // get State changes
147                     .onEach { println("merged: $it") }
148                     .toSharedFlow() // convert back to Flow
149             }
150         runCurrent()
151 
152         val results = mutableListOf<Pair<Pair<String, Int>, Pair<String, Int>>>()
153         backgroundScope.launch { ouevents.toCollection(results) }
154         runCurrent()
155 
156         flow.emit(1)
157         runCurrent()
158 
159         flow.emit(2)
160         runCurrent()
161 
162         assertEquals(
163             listOf(("left" to 1) to ("right" to 1), ("left" to 2) to ("right" to 2)),
164             results,
165         )
166     }
167 
168     @Test
169     fun staticNetwork() = runFrpTest { network ->
170         var finalSum: Int? = null
171 
172         val intEmitter = network.mutableEvents<Int>()
173         val sampleEmitter = network.mutableEvents<Unit>()
174 
175         activateSpecWithResult(network) {
176                 val updates = intEmitter.map { a -> { b: Int -> a + b } }
177 
178                 val sumD =
179                     StateLoop<Int>().apply {
180                         loopback =
181                             updates
182                                 .sample(this) { f, sum -> f(sum) }
183                                 .onEach { println("sum update: $it") }
184                                 .holdState(0)
185                     }
186                 sampleEmitter
187                     .onEach { println("sampleEmitter emitted") }
188                     .sample(sumD) { _, sum -> sum }
189                     .onEach { println("sampled: $it") }
190                     .nextDeferred()
191             }
192             .let { launch { finalSum = it.await() } }
193 
194         runCurrent()
195 
196         (1..5).forEach { i ->
197             println("emitting: $i")
198             intEmitter.emit(i)
199             runCurrent()
200         }
201         runCurrent()
202 
203         sampleEmitter.emit(Unit)
204         runCurrent()
205 
206         assertEquals(15, finalSum)
207     }
208 
209     @Test
210     fun recursiveDefinition() = runFrpTest { network ->
211         var wasSold = false
212         var currentAmt: Int? = null
213 
214         val coin = network.mutableEvents<Unit>()
215         val price = 50
216         val buildSpec = buildSpec {
217             val eSold = EventsLoop<Unit>()
218 
219             val eInsert =
220                 coin.map {
221                     { runningTotal: Int ->
222                         println("TEST: $runningTotal - 10 = ${runningTotal - 10}")
223                         runningTotal - 10
224                     }
225                 }
226 
227             val eReset =
228                 eSold.map {
229                     { _: Int ->
230                         println("TEST: Resetting")
231                         price
232                     }
233                 }
234 
235             val eUpdate = eInsert.mergeWith(eReset) { f, g -> { a -> g(f(a)) } }
236 
237             val dTotal = StateLoop<Int>()
238             dTotal.loopback = eUpdate.sample(dTotal) { f, total -> f(total) }.holdState(price)
239 
240             val eAmt = dTotal.changes
241             val bAmt = transactionally { dTotal.sample() }
242             eSold.loopback =
243                 coin
244                     .sample(bAmt) { coin, total -> coin to total }
245                     .mapMaybe { (_, total) -> maybe { guard { total <= 10 } } }
246 
247             val amts = eAmt.filter { amt -> amt >= 0 }
248 
249             amts.observe { currentAmt = it }
250             eSold.observe { wasSold = true }
251 
252             eSold.nextDeferred()
253         }
254 
255         activateSpec(network) { buildSpec.applySpec() }
256 
257         runCurrent()
258 
259         println()
260         println()
261         coin.emit(Unit)
262         runCurrent()
263 
264         assertEquals(40, currentAmt)
265 
266         println()
267         println()
268         coin.emit(Unit)
269         runCurrent()
270 
271         assertEquals(30, currentAmt)
272 
273         println()
274         println()
275         coin.emit(Unit)
276         runCurrent()
277 
278         assertEquals(20, currentAmt)
279 
280         println()
281         println()
282         coin.emit(Unit)
283         runCurrent()
284 
285         assertEquals(10, currentAmt)
286         assertEquals(false, wasSold)
287 
288         println()
289         println()
290         coin.emit(Unit)
291         runCurrent()
292 
293         assertEquals(true, wasSold)
294         assertEquals(50, currentAmt)
295     }
296 
297     @Test
298     fun promptCleanup() = runFrpTest { network ->
299         val emitter = network.mutableEvents<Int>()
300         val stopper = network.mutableEvents<Unit>()
301 
302         var result: Int? = null
303 
304         val flow = activateSpecWithResult(network) { emitter.takeUntil(stopper).toSharedFlow() }
305         backgroundScope.launch { flow.collect { result = it } }
306         runCurrent()
307 
308         emitter.emit(2)
309         runCurrent()
310 
311         assertEquals(2, result)
312 
313         stopper.emit(Unit)
314         runCurrent()
315     }
316 
317     @Test
318     fun switchEvents() = runFrpTest { network ->
319         var currentSum: Int? = null
320 
321         val switchHandler = network.mutableEvents<Pair<Events<Int>, String>>()
322         val aHandler = network.mutableEvents<Int>()
323         val stopHandler = network.mutableEvents<Unit>()
324         val bHandler = network.mutableEvents<Int>()
325 
326         val sumFlow =
327             activateSpecWithResult(network) {
328                 val switchE = EventsLoop<Events<Int>>()
329                 switchE.loopback =
330                     switchHandler.mapStateful { (inevents, name) ->
331                         println("[onEach] Switching to: $name")
332                         val nextSwitch =
333                             switchE.skipNext().onEach { println("[onEach] switched-out") }
334                         val stopEvent =
335                             stopHandler
336                                 .onEach { println("[onEach] stopped") }
337                                 .mergeWith(nextSwitch) { _, b -> b }
338                         inevents.takeUntil(stopEvent)
339                     }
340 
341                 val adderE: Events<(Int) -> Int> =
342                     switchE.holdState(emptyEvents).switchEvents().map { a ->
343                         println("[onEach] new number $a")
344                         ({ sum: Int ->
345                             println("$a+$sum=${a + sum}")
346                             sum + a
347                         })
348                     }
349 
350                 val sumD = StateLoop<Int>()
351                 sumD.loopback =
352                     adderE
353                         .sample(sumD) { f, sum -> f(sum) }
354                         .onEach { println("[onEach] writing sum: $it") }
355                         .holdState(0)
356                 val sumE = sumD.changes
357 
358                 sumE.toSharedFlow()
359             }
360 
361         runCurrent()
362 
363         backgroundScope.launch { sumFlow.collect { currentSum = it } }
364 
365         runCurrent()
366 
367         switchHandler.emit(aHandler to "A")
368         runCurrent()
369 
370         aHandler.emit(1)
371         runCurrent()
372 
373         assertEquals(1, currentSum)
374 
375         aHandler.emit(2)
376         runCurrent()
377 
378         assertEquals(3, currentSum)
379 
380         aHandler.emit(3)
381         runCurrent()
382 
383         assertEquals(6, currentSum)
384 
385         aHandler.emit(4)
386         runCurrent()
387 
388         assertEquals(10, currentSum)
389 
390         aHandler.emit(5)
391         runCurrent()
392 
393         assertEquals(15, currentSum)
394 
395         switchHandler.emit(bHandler to "B")
396         runCurrent()
397 
398         aHandler.emit(6)
399         runCurrent()
400 
401         assertEquals(15, currentSum)
402 
403         bHandler.emit(6)
404         runCurrent()
405 
406         assertEquals(21, currentSum)
407 
408         bHandler.emit(7)
409         runCurrent()
410 
411         assertEquals(28, currentSum)
412 
413         bHandler.emit(8)
414         runCurrent()
415 
416         assertEquals(36, currentSum)
417 
418         bHandler.emit(9)
419         runCurrent()
420 
421         assertEquals(45, currentSum)
422 
423         bHandler.emit(10)
424         runCurrent()
425 
426         assertEquals(55, currentSum)
427 
428         println()
429         println("Stopping: B")
430         stopHandler.emit(Unit) // bHandler.complete()
431         runCurrent()
432 
433         bHandler.emit(20)
434         runCurrent()
435 
436         assertEquals(55, currentSum)
437 
438         println()
439         println("Switching to: A2")
440         switchHandler.emit(aHandler to "A2")
441         runCurrent()
442 
443         println("aHandler.emit(11)")
444         aHandler.emit(11)
445         runCurrent()
446 
447         assertEquals(66, currentSum)
448 
449         aHandler.emit(12)
450         runCurrent()
451 
452         assertEquals(78, currentSum)
453 
454         aHandler.emit(13)
455         runCurrent()
456 
457         assertEquals(91, currentSum)
458 
459         aHandler.emit(14)
460         runCurrent()
461 
462         assertEquals(105, currentSum)
463 
464         aHandler.emit(15)
465         runCurrent()
466 
467         assertEquals(120, currentSum)
468 
469         stopHandler.emit(Unit)
470         runCurrent()
471 
472         aHandler.emit(100)
473         runCurrent()
474 
475         assertEquals(120, currentSum)
476     }
477 
478     @Test
479     fun switchIndirect() = runFrpTest { network ->
480         val emitter = network.mutableEvents<Unit>()
481         activateSpec(network) {
482             emptyEvents.map { emitter.map { 1 } }.flatten().map { "$it" }.observe()
483         }
484         runCurrent()
485     }
486 
487     @Test
488     fun switchInWithResult() = runFrpTest { network ->
489         val emitter = network.mutableEvents<Unit>()
490         val out =
491             activateSpecWithResult(network) {
492                 emitter.map { emitter.map { 1 } }.flatten().toSharedFlow()
493             }
494         val result = out.stateIn(backgroundScope, SharingStarted.Eagerly, null)
495         runCurrent()
496         emitter.emit(Unit)
497         runCurrent()
498         assertEquals(null, result.value)
499     }
500 
501     @Test
502     fun switchInCompleted() = runFrpTest { network ->
503         val outputs = mutableListOf<Int>()
504 
505         val switchAH = network.mutableEvents<Unit>()
506         val intAH = network.mutableEvents<Int>()
507         val stopEmitter = network.mutableEvents<Unit>()
508 
509         val top = buildSpec {
510             val intS = intAH.takeUntil(stopEmitter)
511             val switched = switchAH.map { intS }.flatten()
512             switched.toSharedFlow()
513         }
514         val flow = activateSpecWithResult(network) { top.applySpec() }
515         backgroundScope.launch { flow.collect { outputs.add(it) } }
516         runCurrent()
517 
518         switchAH.emit(Unit)
519         runCurrent()
520 
521         stopEmitter.emit(Unit)
522         runCurrent()
523 
524         // assertEquals(0, intAH.subscriptionCount.value)
525         intAH.emit(10)
526         runCurrent()
527 
528         assertEquals(true, outputs.isEmpty())
529 
530         switchAH.emit(Unit)
531         runCurrent()
532 
533         // assertEquals(0, intAH.subscriptionCount.value)
534         intAH.emit(10)
535         runCurrent()
536 
537         assertEquals(true, outputs.isEmpty())
538     }
539 
540     @Test
541     fun switchEvents_outerCompletesFirst() = runFrpTest { network ->
542         var stepResult: Int? = null
543 
544         val switchAH = network.mutableEvents<Unit>()
545         val switchStopEmitter = network.mutableEvents<Unit>()
546         val intStopEmitter = network.mutableEvents<Unit>()
547         val intAH = network.mutableEvents<Int>()
548         val flow =
549             activateSpecWithResult(network) {
550                 val intS = intAH.takeUntil(intStopEmitter)
551                 val switchS = switchAH.takeUntil(switchStopEmitter)
552 
553                 val switched = switchS.map { intS }.flatten()
554                 switched.toSharedFlow()
555             }
556         backgroundScope.launch { flow.collect { stepResult = it } }
557         runCurrent()
558 
559         // assertEquals(0, intAH.subscriptionCount.value)
560         intAH.emit(100)
561         runCurrent()
562 
563         assertEquals(null, stepResult)
564 
565         switchAH.emit(Unit)
566         runCurrent()
567 
568         //            assertEquals(1, intAH.subscriptionCount.value)
569 
570         intAH.emit(5)
571         runCurrent()
572 
573         assertEquals(5, stepResult)
574 
575         println("stop outer")
576         switchStopEmitter.emit(Unit) // switchAH.complete()
577         runCurrent()
578 
579         // assertEquals(1, intAH.subscriptionCount.value)
580         // assertEquals(0, switchAH.subscriptionCount.value)
581 
582         intAH.emit(10)
583         runCurrent()
584 
585         assertEquals(10, stepResult)
586 
587         println("stop inner")
588         intStopEmitter.emit(Unit) // intAH.complete()
589         runCurrent()
590 
591         // assertEquals(present(10), network.await())
592     }
593 
594     @Test
595     fun mapEvents() = runFrpTest { network ->
596         val emitter = network.mutableEvents<Int>()
597         var stepResult: Int? = null
598 
599         val flow =
600             activateSpecWithResult(network) {
601                 val mappedS = emitter.map { it * it }
602                 mappedS.toSharedFlow()
603             }
604 
605         backgroundScope.launch { flow.collect { stepResult = it } }
606         runCurrent()
607 
608         emitter.emit(1)
609         runCurrent()
610 
611         assertEquals(1, stepResult)
612 
613         emitter.emit(2)
614         runCurrent()
615 
616         assertEquals(4, stepResult)
617 
618         emitter.emit(10)
619         runCurrent()
620 
621         assertEquals(100, stepResult)
622     }
623 
624     @Test
625     fun mapTransactional() = runFrpTest { network ->
626         var doubledResult: Int? = null
627         var pullValue = 0
628         val a = transactionally { pullValue }
629         val b = transactionally { a.sample() * 2 }
630         val emitter = network.mutableEvents<Unit>()
631         val flow =
632             activateSpecWithResult(network) {
633                 val sampleB = emitter.sample(b) { _, b -> b }
634                 sampleB.toSharedFlow()
635             }
636 
637         backgroundScope.launch { flow.collect { doubledResult = it } }
638 
639         runCurrent()
640 
641         emitter.emit(Unit)
642         runCurrent()
643 
644         assertEquals(0, doubledResult)
645 
646         pullValue = 5
647         emitter.emit(Unit)
648         runCurrent()
649 
650         assertEquals(10, doubledResult)
651     }
652 
653     @Test
654     fun mapState() = runFrpTest { network ->
655         val emitter = network.mutableEvents<Int>()
656         var stepResult: Int? = null
657         val flow =
658             activateSpecWithResult(network) {
659                 val state = emitter.holdState(0).map { it + 2 }
660                 val stateCurrent = transactionally { state.sample() }
661                 val stateChanges = state.changes
662                 val sampleState = emitter.sample(stateCurrent) { _, b -> b }
663                 val merge = stateChanges.mergeWith(sampleState) { a, b -> a + b }
664                 merge.toSharedFlow()
665             }
666         backgroundScope.launch { flow.collect { stepResult = it } }
667         runCurrent()
668 
669         emitter.emit(1)
670         runCurrent()
671 
672         assertEquals(5, stepResult)
673 
674         emitter.emit(10)
675         runCurrent()
676 
677         assertEquals(15, stepResult)
678     }
679 
680     @Test
681     fun partitionEither() = runFrpTest { network ->
682         val emitter = network.mutableEvents<Either<Int, Int>>()
683         val result =
684             activateSpecWithResult(network) {
685                 val (l, r) = emitter.partitionEither()
686                 val pDiamond =
687                     l.map { it * 2 }
688                         .mergeWith(r.map { it * -1 }) { _, _ -> error("unexpected coincidence") }
689                 pDiamond.holdState(null).toStateFlow()
690             }
691         runCurrent()
692 
693         emitter.emit(First(10))
694         runCurrent()
695 
696         assertEquals(20, result.value)
697 
698         emitter.emit(Second(30))
699         runCurrent()
700 
701         assertEquals(-30, result.value)
702     }
703 
704     @Test
705     fun accumState() = runFrpTest { network ->
706         val emitter = network.mutableEvents<Int>()
707         val sampler = network.mutableEvents<Unit>()
708         var stepResult: Int? = null
709         val flow =
710             activateSpecWithResult(network) {
711                 val sumState =
712                     emitter.map { a -> { b: Int -> a + b } }.foldState(0) { f, a -> f(a) }
713 
714                 sumState.changes
715                     .mergeWith(sampler.sample(sumState) { _, sum -> sum }) { _, _ ->
716                         error("Unexpected coincidence")
717                     }
718                     .toSharedFlow()
719             }
720 
721         backgroundScope.launch { flow.collect { stepResult = it } }
722         runCurrent()
723 
724         emitter.emit(5)
725         runCurrent()
726         assertEquals(5, stepResult)
727 
728         emitter.emit(10)
729         runCurrent()
730         assertEquals(15, stepResult)
731 
732         sampler.emit(Unit)
733         runCurrent()
734         assertEquals(15, stepResult)
735     }
736 
737     @Test
738     fun mergeEventss() = runFrpTest { network ->
739         val first = network.mutableEvents<Int>()
740         val stopFirst = network.mutableEvents<Unit>()
741         val second = network.mutableEvents<Int>()
742         val stopSecond = network.mutableEvents<Unit>()
743         var stepResult: Int? = null
744 
745         val flow: SharedFlow<Int>
746         val setupDuration = measureTime {
747             flow =
748                 activateSpecWithResult(network) {
749                     val firstS = first.takeUntil(stopFirst)
750                     val secondS = second.takeUntil(stopSecond)
751                     val mergedS =
752                         firstS.mergeWith(secondS) { _, _ -> error("Unexpected coincidence") }
753                     mergedS.toSharedFlow()
754                     // mergedS.last("onComplete")
755                 }
756             backgroundScope.launch { flow.collect { stepResult = it } }
757             runCurrent()
758         }
759 
760         //            assertEquals(1, first.subscriptionCount.value)
761         //            assertEquals(1, second.subscriptionCount.value)
762 
763         val firstEmitDuration = measureTime {
764             first.emit(1)
765             runCurrent()
766         }
767 
768         assertEquals(1, stepResult)
769 
770         val secondEmitDuration = measureTime {
771             second.emit(2)
772             runCurrent()
773         }
774 
775         assertEquals(2, stepResult)
776 
777         val stopFirstDuration = measureTime {
778             stopFirst.emit(Unit)
779             runCurrent()
780         }
781 
782         // assertEquals(0, first.subscriptionCount.value)
783         val testDeadEmitFirstDuration = measureTime {
784             first.emit(10)
785             runCurrent()
786         }
787 
788         assertEquals(2, stepResult)
789 
790         //            assertEquals(1, second.subscriptionCount.value)
791 
792         val secondEmitDuration2 = measureTime {
793             second.emit(3)
794             runCurrent()
795         }
796 
797         assertEquals(3, stepResult)
798 
799         val stopSecondDuration = measureTime {
800             stopSecond.emit(Unit)
801             runCurrent()
802         }
803 
804         // assertEquals(0, second.subscriptionCount.value)
805         val testDeadEmitSecondDuration = measureTime {
806             second.emit(10)
807             runCurrent()
808         }
809 
810         assertEquals(3, stepResult)
811 
812         println(
813             """
814                 setupDuration: ${setupDuration.toString(DurationUnit.MILLISECONDS, 2)}
815                 firstEmitDuration: ${firstEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
816                 secondEmitDuration: ${secondEmitDuration.toString(DurationUnit.MILLISECONDS, 2)}
817                 stopFirstDuration: ${stopFirstDuration.toString(DurationUnit.MILLISECONDS, 2)}
818                 testDeadEmitFirstDuration: ${
819         testDeadEmitFirstDuration.toString(
820           DurationUnit.MILLISECONDS,
821           2,
822         )
823       }
824                 secondEmitDuration2: ${secondEmitDuration2.toString(DurationUnit.MILLISECONDS, 2)}
825                 stopSecondDuration: ${stopSecondDuration.toString(DurationUnit.MILLISECONDS, 2)}
826                 testDeadEmitSecondDuration: ${
827         testDeadEmitSecondDuration.toString(
828           DurationUnit.MILLISECONDS,
829           2,
830         )
831       }
832             """
833                 .trimIndent()
834         )
835     }
836 
837     @Test
838     fun sampleCancel() = runFrpTest { network ->
839         val updater = network.mutableEvents<Int>()
840         val stopUpdater = network.mutableEvents<Unit>()
841         val sampler = network.mutableEvents<Unit>()
842         val stopSampler = network.mutableEvents<Unit>()
843         var stepResult: Int? = null
844         val flow =
845             activateSpecWithResult(network) {
846                 val stopSamplerFirst = stopSampler
847                 val samplerS = sampler.takeUntil(stopSamplerFirst)
848                 val stopUpdaterFirst = stopUpdater
849                 val updaterS = updater.takeUntil(stopUpdaterFirst)
850                 val sampledS = samplerS.sample(updaterS.holdState(0)) { _, b -> b }
851                 sampledS.toSharedFlow()
852             }
853 
854         backgroundScope.launch { flow.collect { stepResult = it } }
855         runCurrent()
856 
857         updater.emit(1)
858         runCurrent()
859 
860         sampler.emit(Unit)
861         runCurrent()
862 
863         assertEquals(1, stepResult)
864 
865         stopSampler.emit(Unit)
866         runCurrent()
867 
868         // assertEquals(0, updater.subscriptionCount.value)
869         // assertEquals(0, sampler.subscriptionCount.value)
870         updater.emit(10)
871         runCurrent()
872 
873         sampler.emit(Unit)
874         runCurrent()
875 
876         assertEquals(1, stepResult)
877     }
878 
879     @Test
880     fun combineStates_differentUpstreams() = runFrpTest { network ->
881         val a = network.mutableEvents<Int>()
882         val b = network.mutableEvents<Int>()
883         var observed: Pair<Int, Int>? = null
884         val state =
885             activateSpecWithResult(network) {
886                 val state = combine(a.holdState(0), b.holdState(0)) { a, b -> Pair(a, b) }
887                 state.changes.observe { observed = it }
888                 state
889             }
890         assertEquals(0 to 0, network.transact { state.sample() })
891         assertEquals(null, observed)
892         a.emit(5)
893         assertEquals(5 to 0, observed)
894         assertEquals(5 to 0, network.transact { state.sample() })
895         b.emit(3)
896         assertEquals(5 to 3, observed)
897         assertEquals(5 to 3, network.transact { state.sample() })
898     }
899 
900     @Test
901     fun sampleCombinedStates() = runFrpTest { network ->
902         val updater = network.mutableEvents<Int>()
903         val emitter = network.mutableEvents<Unit>()
904 
905         val result =
906             activateSpecWithResult(network) {
907                 val bA = updater.map { it * 2 }.holdState(0)
908                 val bB = updater.holdState(0)
909                 val combineD: State<Pair<Int, Int>> = bA.combine(bB) { a, b -> a to b }
910                 val sampleS = emitter.sample(combineD) { _, b -> b }
911                 sampleS.nextDeferred()
912             }
913         println("launching")
914         runCurrent()
915 
916         println("emitting update")
917         updater.emit(10)
918         runCurrent()
919 
920         println("emitting sampler")
921         emitter.emit(Unit)
922         runCurrent()
923 
924         println("asserting")
925         assertEquals(20 to 10, result.await())
926     }
927 
928     @Test
929     fun switchMapPromptly() = runFrpTest { network ->
930         val emitter = network.mutableEvents<Unit>()
931         val result =
932             activateSpecWithResult(network) {
933                 emitter
934                     .map { emitter.map { 1 }.map { it + 1 }.map { it * 2 } }
935                     .holdState(emptyEvents)
936                     .switchEventsPromptly()
937                     .nextDeferred()
938             }
939         runCurrent()
940 
941         emitter.emit(Unit)
942         runCurrent()
943 
944         assertTrue("Not complete", result.isCompleted)
945         assertEquals(4, result.await())
946     }
947 
948     @Test
949     fun switchDeeper() = runFrpTest { network ->
950         val emitter = network.mutableEvents<Unit>()
951         val e2 = network.mutableEvents<Unit>()
952         val result =
953             activateSpecWithResult(network) {
954                 val tres =
955                     merge(e2.map { 1 }, e2.map { 2 }, transformCoincidence = { a, b -> a + b })
956                 tres.observeBuild()
957                 val switch = emitter.map { tres }.flatten()
958                 merge(switch, e2.map { null }, transformCoincidence = { a, _ -> a })
959                     .filterNotNull()
960                     .nextDeferred()
961             }
962         runCurrent()
963 
964         emitter.emit(Unit)
965         runCurrent()
966 
967         e2.emit(Unit)
968         runCurrent()
969 
970         assertTrue("Not complete", result.isCompleted)
971         assertEquals(3, result.await())
972     }
973 
974     @Test
975     fun recursionBasic() = runFrpTest { network ->
976         val add1 = network.mutableEvents<Unit>()
977         val sub1 = network.mutableEvents<Unit>()
978         val stepResult: StateFlow<Int> =
979             activateSpecWithResult(network) {
980                 val dSum = StateLoop<Int>()
981                 val sAdd1 = add1.sample(dSum) { _, sum -> sum + 1 }
982                 val sMinus1 = sub1.sample(dSum) { _, sum -> sum - 1 }
983                 dSum.loopback = sAdd1.mergeWith(sMinus1) { a, _ -> a }.holdState(0)
984                 dSum.toStateFlow()
985             }
986         runCurrent()
987 
988         add1.emit(Unit)
989         runCurrent()
990 
991         assertEquals(1, stepResult.value)
992 
993         add1.emit(Unit)
994         runCurrent()
995 
996         assertEquals(2, stepResult.value)
997 
998         sub1.emit(Unit)
999         runCurrent()
1000 
1001         assertEquals(1, stepResult.value)
1002     }
1003 
1004     @Test
1005     fun recursiveState() = runFrpTest { network ->
1006         val e = network.mutableEvents<Unit>()
1007         var changes = 0
1008         val state =
1009             activateSpecWithResult(network) {
1010                 val s = EventsLoop<Unit>()
1011                 val deferred = s.map { stateOf(null) }
1012                 val e3 = e.map { stateOf(Unit) }
1013                 val flattened =
1014                     e3.mergeWith(deferred) { a, _ -> a }.holdState(stateOf(null)).flatten()
1015                 s.loopback = emptyEvents
1016                 flattened.toStateFlow()
1017             }
1018 
1019         backgroundScope.launch { state.collect { changes++ } }
1020         runCurrent()
1021     }
1022 
1023     @Test
1024     fun fanOut() = runFrpTest { network ->
1025         val e = network.mutableEvents<Map<String, Int>>()
1026         val (fooFlow, barFlow) =
1027             activateSpecWithResult(network) {
1028                 val selector = e.groupByKey()
1029                 val foos = selector.eventsForKey("foo")
1030                 val bars = selector.eventsForKey("bar")
1031                 foos.toSharedFlow() to bars.toSharedFlow()
1032             }
1033         val stateFlow = fooFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1034         backgroundScope.launch { barFlow.collect { error("unexpected bar") } }
1035         runCurrent()
1036 
1037         assertEquals(null, stateFlow.value)
1038 
1039         e.emit(mapOf("foo" to 1))
1040         runCurrent()
1041 
1042         assertEquals(1, stateFlow.value)
1043     }
1044 
1045     @Test
1046     fun propagateError() {
1047         try {
1048             runFrpTest { network ->
1049                 runCurrent()
1050                 try {
1051                     network.transact<Unit> { error("message") }
1052                     fail("caller did not throw exception")
1053                 } catch (_: IllegalStateException) {}
1054             }
1055             fail("scheduler did not throw exception")
1056         } catch (_: IllegalStateException) {}
1057     }
1058 
1059     @Test
1060     fun fanOutLateSubscribe() = runFrpTest { network ->
1061         val e = network.mutableEvents<Map<String, Int>>()
1062         val barFlow =
1063             activateSpecWithResult(network) {
1064                 val selector = e.groupByKey()
1065                 selector
1066                     .eventsForKey("foo")
1067                     .map { selector.eventsForKey("bar") }
1068                     .holdState(emptyEvents)
1069                     .switchEventsPromptly()
1070                     .toSharedFlow()
1071             }
1072         val stateFlow = barFlow.stateIn(backgroundScope, SharingStarted.Eagerly, null)
1073         runCurrent()
1074 
1075         assertEquals(null, stateFlow.value)
1076 
1077         e.emit(mapOf("foo" to 0, "bar" to 1))
1078         runCurrent()
1079 
1080         assertEquals(1, stateFlow.value)
1081     }
1082 
1083     @Test
1084     fun inputEventsCompleted() = runFrpTest { network ->
1085         val results = mutableListOf<Int>()
1086         val e = network.mutableEvents<Int>()
1087         activateSpec(network) { e.nextOnly().observe { results.add(it) } }
1088         runCurrent()
1089 
1090         e.emit(10)
1091         runCurrent()
1092 
1093         assertEquals(listOf(10), results)
1094 
1095         e.emit(20)
1096         runCurrent()
1097         assertEquals(listOf(10), results)
1098     }
1099 
1100     @Test
1101     fun fanOutThenMergeIncrementally() = runFrpTest { network ->
1102         // A events of group updates, where a group is a events of child updates, where a child is a
1103         // stateflow
1104         val e = network.mutableEvents<Map<Int, Maybe<Events<Map<Int, Maybe<StateFlow<String>>>>>>>()
1105         println("fanOutMergeInc START")
1106         val state =
1107             activateSpecWithResult(network) {
1108                 // Convert nested Flows to nested Events/State
1109                 val emitter: Events<Map<Int, Maybe<Events<Map<Int, Maybe<State<String>>>>>>> =
1110                     e.mapBuild { m ->
1111                         m.mapValues { (_, mFlow) ->
1112                             mFlow.map {
1113                                 it.mapBuild { m2 ->
1114                                     println("m2: $m2")
1115                                     m2.mapValues { (_, mState) ->
1116                                         mState.map { stateFlow -> stateFlow.toState() }
1117                                     }
1118                                 }
1119                             }
1120                         }
1121                     }
1122                 // Accumulate all of our updates into a single State
1123                 val accState: State<Map<Int, Map<Int, String>>> =
1124                     emitter
1125                         .mapStateful {
1126                             changeMap: Map<Int, Maybe<Events<Map<Int, Maybe<State<String>>>>>> ->
1127                             changeMap.mapValues { (groupId, mGroupChanges) ->
1128                                 mGroupChanges.map {
1129                                     groupChanges: Events<Map<Int, Maybe<State<String>>>> ->
1130                                     // New group
1131                                     val childChangeById = groupChanges.groupByKey()
1132                                     val map: Events<Map<Int, Maybe<Events<Maybe<State<String>>>>>> =
1133                                         groupChanges.mapStateful {
1134                                             gChangeMap: Map<Int, Maybe<State<String>>> ->
1135                                             println("gChangeMap: $gChangeMap")
1136                                             gChangeMap.mapValues { (childId, mChild) ->
1137                                                 mChild.map { child: State<String> ->
1138                                                     println("new child $childId in the house")
1139                                                     // New child
1140                                                     val eRemoved =
1141                                                         childChangeById
1142                                                             .eventsForKey(childId)
1143                                                             .filter { it === Absent }
1144                                                             .onEach {
1145                                                                 println(
1146                                                                     "removing? (groupId=$groupId, childId=$childId)"
1147                                                                 )
1148                                                             }
1149                                                             .nextOnly()
1150 
1151                                                     val addChild: Events<Maybe<State<String>>> =
1152                                                         now.map { mChild }
1153                                                             .onEach {
1154                                                                 println(
1155                                                                     "addChild (groupId=$groupId, childId=$childId) ${child.sample()}"
1156                                                                 )
1157                                                             }
1158 
1159                                                     val removeChild: Events<Maybe<State<String>>> =
1160                                                         eRemoved
1161                                                             .onEach {
1162                                                                 println(
1163                                                                     "removeChild (groupId=$groupId, childId=$childId)"
1164                                                                 )
1165                                                             }
1166                                                             .map { Maybe.absent() }
1167 
1168                                                     addChild.mergeWith(removeChild) { _, _ ->
1169                                                         error("unexpected coincidence")
1170                                                     }
1171                                                 }
1172                                             }
1173                                         }
1174                                     val mergeIncrementally: Events<Map<Int, Maybe<State<String>>>> =
1175                                         map.onEach { println("merge patch: $it") }
1176                                             .mergeEventsIncrementallyPromptly()
1177                                     mergeIncrementally
1178                                         .onEach { println("foldmap patch: $it") }
1179                                         .foldStateMapIncrementally()
1180                                         .flatMap { it.combine() }
1181                                 }
1182                             }
1183                         }
1184                         .onEach { println("fold patch: $it") }
1185                         .foldStateMapIncrementally()
1186                         .flatMap { it.combine() }
1187 
1188                 accState.toStateFlow()
1189             }
1190         runCurrent()
1191 
1192         assertEquals(emptyMap(), state.value)
1193 
1194         val emitter2 = network.mutableEvents<Map<Int, Maybe<StateFlow<String>>>>()
1195         println()
1196         println("init outer 0")
1197         e.emit(mapOf(0 to Maybe.present(emitter2.onEach { println("emitter2 emit: $it") })))
1198         runCurrent()
1199 
1200         assertEquals(mapOf(0 to emptyMap()), state.value)
1201 
1202         println()
1203         println("init inner 10")
1204         emitter2.emit(mapOf(10 to Maybe.present(MutableStateFlow("(0, 10)"))))
1205         runCurrent()
1206 
1207         assertEquals(mapOf(0 to mapOf(10 to "(0, 10)")), state.value)
1208 
1209         // replace
1210         println()
1211         println("replace inner 10")
1212         emitter2.emit(mapOf(10 to Maybe.present(MutableStateFlow("(1, 10)"))))
1213         runCurrent()
1214 
1215         assertEquals(mapOf(0 to mapOf(10 to "(1, 10)")), state.value)
1216 
1217         // remove
1218         emitter2.emit(mapOf(10 to Maybe.absent()))
1219         runCurrent()
1220 
1221         assertEquals(mapOf(0 to emptyMap()), state.value)
1222 
1223         // add again
1224         emitter2.emit(mapOf(10 to Maybe.present(MutableStateFlow("(2, 10)"))))
1225         runCurrent()
1226 
1227         assertEquals(mapOf(0 to mapOf(10 to "(2, 10)")), state.value)
1228 
1229         // LogEnabled = true
1230 
1231         println("batch update")
1232 
1233         // batch update
1234         emitter2.emit(
1235             mapOf(
1236                 10 to Maybe.absent(),
1237                 11 to Maybe.present(MutableStateFlow("(0, 11)")),
1238                 12 to Maybe.present(MutableStateFlow("(0, 12)")),
1239             )
1240         )
1241         runCurrent()
1242 
1243         assertEquals(mapOf(0 to mapOf(11 to "(0, 11)", 12 to "(0, 12)")), state.value)
1244     }
1245 
1246     @Test
1247     fun applyLatestNetworkChanges() = runFrpTest { network ->
1248         val newCount = network.mutableEvents<BuildSpec<Flow<Int>>>()
1249         val flowOfFlows: Flow<Flow<Int>> =
1250             activateSpecWithResult(network) { newCount.applyLatestSpec().toSharedFlow() }
1251         runCurrent()
1252 
1253         val incCount = network.mutableEvents<Unit>()
1254         fun newFlow(): BuildSpec<SharedFlow<Int>> = buildSpec {
1255             launchEffect {
1256                 try {
1257                     println("new flow!")
1258                     awaitCancellation()
1259                 } finally {
1260                     println("cancelling old flow")
1261                 }
1262             }
1263             lateinit var count: State<Int>
1264             count =
1265                 incCount
1266                     .onEach { println("incrementing ${count.sample()}") }
1267                     .foldState(0) { _, c -> c + 1 }
1268             count.changes.toSharedFlow()
1269         }
1270 
1271         var outerCount = 0
1272         val lastEvent: StateFlow<Pair<StateFlow<Int?>, StateFlow<Int?>>> =
1273             flowOfFlows
1274                 .map { it.stateIn(backgroundScope, SharingStarted.Eagerly, null) }
1275                 .pairwise(MutableStateFlow(null))
1276                 .onEach { outerCount++ }
1277                 .stateIn(
1278                     backgroundScope,
1279                     SharingStarted.Eagerly,
1280                     MutableStateFlow(null) to MutableStateFlow(null),
1281                 )
1282 
1283         runCurrent()
1284 
1285         newCount.emit(newFlow())
1286         runCurrent()
1287 
1288         assertEquals(1, outerCount)
1289         //        assertEquals(1, incCount.subscriptionCount)
1290         assertNull(lastEvent.value.second.value)
1291 
1292         incCount.emit(Unit)
1293         runCurrent()
1294 
1295         println("checking")
1296         assertEquals(1, lastEvent.value.second.value)
1297 
1298         incCount.emit(Unit)
1299         runCurrent()
1300 
1301         assertEquals(2, lastEvent.value.second.value)
1302 
1303         newCount.emit(newFlow())
1304         runCurrent()
1305         incCount.emit(Unit)
1306         runCurrent()
1307 
1308         // verify old flow is not getting updates
1309         assertEquals(2, lastEvent.value.first.value)
1310         // but the new one is
1311         assertEquals(1, lastEvent.value.second.value)
1312     }
1313 
1314     @Test
1315     fun buildScope_stateAccumulation() = runFrpTest { network ->
1316         val input = network.mutableEvents<Unit>()
1317         var observedCount: Int? = null
1318         activateSpec(network) {
1319             val (c, j) = asyncScope { input.foldState(0) { _, x -> x + 1 } }
1320             deferredBuildScopeAction { c.value.observe { observedCount = it } }
1321         }
1322         runCurrent()
1323         assertEquals(0, observedCount)
1324 
1325         input.emit(Unit)
1326         runCurrent()
1327         assertEquals(1, observedCount)
1328 
1329         input.emit(Unit)
1330         runCurrent()
1331         assertEquals(2, observedCount)
1332     }
1333 
1334     @Test
1335     fun effect() = runFrpTest { network ->
1336         val input = network.mutableEvents<Unit>()
1337         var effectRunning = false
1338         var count = 0
1339         activateSpec(network) {
1340             val j = launchEffect {
1341                 effectRunning = true
1342                 try {
1343                     awaitCancellation()
1344                 } finally {
1345                     effectRunning = false
1346                 }
1347             }
1348             merge(emptyEvents, input.nextOnly()).observe {
1349                 count++
1350                 j.cancel()
1351             }
1352         }
1353         runCurrent()
1354         assertEquals(true, effectRunning)
1355         assertEquals(0, count)
1356 
1357         println("1")
1358         input.emit(Unit)
1359         assertEquals(false, effectRunning)
1360         assertEquals(1, count)
1361 
1362         println("2")
1363         input.emit(Unit)
1364         assertEquals(1, count)
1365         println("3")
1366         input.emit(Unit)
1367         assertEquals(1, count)
1368     }
1369 
1370     @Test
1371     fun observeEffect_disposeHandle() = runFrpTest { network ->
1372         val input = network.mutableEvents<Unit>()
1373         val stopper = network.mutableEvents<Unit>()
1374         var runningCount = 0
1375         val specJob =
1376             activateSpec(network) {
1377                 val handle =
1378                     input.observe {
1379                         launch {
1380                             runningCount++
1381                             awaitClose { runningCount-- }
1382                         }
1383                     }
1384                 stopper.nextOnly().observe { handle.dispose() }
1385             }
1386         runCurrent()
1387         assertEquals(0, runningCount)
1388 
1389         input.emit(Unit)
1390         assertEquals(1, runningCount)
1391 
1392         input.emit(Unit)
1393         assertEquals(2, runningCount)
1394 
1395         stopper.emit(Unit)
1396         assertEquals(2, runningCount)
1397 
1398         input.emit(Unit)
1399         assertEquals(2, runningCount)
1400 
1401         specJob.cancel()
1402         runCurrent()
1403         assertEquals(0, runningCount)
1404     }
1405 
1406     @Test
1407     fun observeEffect_takeUntil() = runFrpTest { network ->
1408         val input = network.mutableEvents<Unit>()
1409         val stopper = network.mutableEvents<Unit>()
1410         var runningCount = 0
1411         val specJob =
1412             activateSpec(network) {
1413                 input.takeUntil(stopper).observe {
1414                     launch {
1415                         runningCount++
1416                         awaitClose { runningCount-- }
1417                     }
1418                 }
1419             }
1420         runCurrent()
1421         assertEquals(0, runningCount)
1422 
1423         input.emit(Unit)
1424         assertEquals(1, runningCount)
1425 
1426         input.emit(Unit)
1427         assertEquals(2, runningCount)
1428 
1429         stopper.emit(Unit)
1430         assertEquals(2, runningCount)
1431 
1432         input.emit(Unit)
1433         assertEquals(2, runningCount)
1434 
1435         specJob.cancel()
1436         runCurrent()
1437         assertEquals(0, runningCount)
1438     }
1439 
1440     private fun runFrpTest(
1441         timeout: Duration = 3.seconds,
1442         block: suspend TestScope.(KairosNetwork) -> Unit,
1443     ) {
1444         runTest(timeout = timeout) {
1445             val network = backgroundScope.launchKairosNetwork()
1446             runCurrent()
1447             block(network)
1448         }
1449     }
1450 
1451     private fun TestScope.activateSpec(network: KairosNetwork, spec: BuildSpec<*>) =
1452         backgroundScope.launch { network.activateSpec(spec) }
1453 
1454     private suspend fun <R> TestScope.activateSpecWithResult(
1455         network: KairosNetwork,
1456         spec: BuildSpec<R>,
1457     ): R =
1458         CompletableDeferred<R>()
1459             .apply { activateSpec(network) { complete(spec.applySpec()) } }
1460             .await()
1461 }
1462 
assertEqualsnull1463 private fun <T> assertEquals(expected: T, actual: T) =
1464     org.junit.Assert.assertEquals(expected, actual)
1465 
1466 private fun <A> Flow<A>.pairwise(init: A): Flow<Pair<A, A>> = flow {
1467     var prev = init
1468     collect {
1469         emit(prev to it)
1470         prev = it
1471     }
1472 }
1473