1 /* 2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 @file:OptIn(ExperimentalCoroutinesApi::class) 18 19 package com.android.test.tracing.coroutines 20 21 import android.platform.test.annotations.EnableFlags 22 import com.android.app.tracing.coroutines.CoroutineTraceName 23 import com.android.app.tracing.coroutines.createCoroutineTracingContext 24 import com.android.app.tracing.coroutines.flow.collectLatestTraced 25 import com.android.app.tracing.coroutines.flow.collectTraced 26 import com.android.app.tracing.coroutines.flow.filterTraced 27 import com.android.app.tracing.coroutines.flow.flowName 28 import com.android.app.tracing.coroutines.flow.mapLatestTraced 29 import com.android.app.tracing.coroutines.flow.mapTraced 30 import com.android.app.tracing.coroutines.flow.shareInTraced 31 import com.android.app.tracing.coroutines.flow.stateInTraced 32 import com.android.app.tracing.coroutines.flow.traceAs 33 import com.android.app.tracing.coroutines.launchInTraced 34 import com.android.app.tracing.coroutines.launchTraced 35 import com.android.app.tracing.coroutines.traceCoroutine 36 import com.android.systemui.Flags.FLAG_COROUTINE_TRACING 37 import kotlinx.coroutines.CompletableDeferred 38 import kotlinx.coroutines.CoroutineScope 39 import kotlinx.coroutines.ExperimentalCoroutinesApi 40 import kotlinx.coroutines.delay 41 import kotlinx.coroutines.flow.MutableStateFlow 42 import kotlinx.coroutines.flow.SharingStarted 43 import kotlinx.coroutines.flow.collect 44 import kotlinx.coroutines.flow.collectLatest 45 import kotlinx.coroutines.flow.filter 46 import kotlinx.coroutines.flow.flow 47 import kotlinx.coroutines.flow.flowOf 48 import kotlinx.coroutines.flow.flowOn 49 import kotlinx.coroutines.flow.launchIn 50 import kotlinx.coroutines.flow.map 51 import kotlinx.coroutines.flow.mapLatest 52 import kotlinx.coroutines.flow.onEach 53 import kotlinx.coroutines.flow.shareIn 54 import kotlinx.coroutines.job 55 import kotlinx.coroutines.plus 56 import kotlinx.coroutines.yield 57 import org.junit.Assert.assertEquals 58 import org.junit.Test 59 60 @EnableFlags(FLAG_COROUTINE_TRACING) 61 class FlowTracingTest : TestBase() { 62 63 @Test collectFlow_simplenull64 fun collectFlow_simple() { 65 val coldFlow = flow { 66 expect("1^main") 67 yield() 68 expect("1^main") 69 emit(42) 70 expect("1^main") 71 yield() 72 expect("1^main") 73 } 74 75 runTest(totalEvents = 8) { 76 expect("1^main") 77 coldFlow.collect { 78 assertEquals(42, it) 79 expect("1^main") 80 yield() 81 expect("1^main") 82 } 83 yield() 84 expect("1^main") 85 } 86 } 87 88 /** @see [CoroutineTracingTest.withContext_incorrectUsage] */ 89 @Test collectFlow_incorrectNameUsagenull90 fun collectFlow_incorrectNameUsage() = 91 runTest(totalEvents = 8) { 92 val coldFlow = 93 flow { 94 expect( 95 "1^main" 96 ) // <-- Trace section from before withContext is open until the 97 // first suspension 98 yield() 99 expect() 100 emit(42) 101 expect("1^main") // <-- context changed due to context of collector 102 yield() 103 expect() 104 } 105 .flowOn(CoroutineTraceName("new-name")) // <-- BAD, DON'T DO THIS 106 107 expect("1^main") 108 coldFlow.collect { 109 assertEquals(42, it) 110 expect("1^main") 111 yield() 112 expect("1^main") 113 } 114 expect() // <-- trace sections erased due to context of emitter 115 } 116 117 @Test collectFlow_correctNameUsagenull118 fun collectFlow_correctNameUsage() { 119 val coldFlow = 120 flow { 121 expect(2, "1^main", "collect:new-name") 122 yield() 123 expect(3, "1^main", "collect:new-name") 124 emit(42) 125 expect(6, "1^main", "collect:new-name") 126 yield() 127 expect(7, "1^main", "collect:new-name") 128 } 129 .flowName("new-name") 130 runTest(totalEvents = 8) { 131 expect(1, "1^main") 132 coldFlow.collect { 133 assertEquals(42, it) 134 expect(4, "1^main", "collect:new-name", "emit:new-name") 135 yield() 136 expect(5, "1^main", "collect:new-name", "emit:new-name") 137 } 138 expect(8, "1^main") 139 } 140 } 141 142 @Test collectFlow_shareInnull143 fun collectFlow_shareIn() { 144 val otherScope = 145 CoroutineScope( 146 createCoroutineTracingContext("other-scope", testMode = true) + 147 bgThread1 + 148 scope.coroutineContext.job 149 ) 150 val sharedFlow = 151 flow { 152 expect("1^new-name") 153 yield() 154 expect("1^new-name") 155 emit(42) 156 expect("1^new-name") 157 yield() 158 expect("1^new-name") 159 } 160 .shareInTraced("new-name", otherScope, SharingStarted.Eagerly, 5) 161 runTest(totalEvents = 9) { 162 yield() 163 expect("1^main") 164 val job = 165 launchTraced("launch-for-collect") { 166 expect("1^main:1^launch-for-collect") 167 sharedFlow.collect { 168 assertEquals(42, it) 169 expect("1^main:1^launch-for-collect", "collect:new-name", "emit:new-name") 170 yield() 171 expect("1^main:1^launch-for-collect", "collect:new-name", "emit:new-name") 172 } 173 } 174 yield() 175 expect("1^main") 176 yield() 177 job.cancel() 178 } 179 } 180 181 @Test collectFlow_launchInnull182 fun collectFlow_launchIn() { 183 val coldFlow = flow { 184 expectAny(arrayOf("1^main:1^"), arrayOf("1^main:2^launchIn-for-cold")) 185 yield() 186 expectAny(arrayOf("1^main:1^"), arrayOf("1^main:2^launchIn-for-cold")) 187 emit(42) 188 expectAny(arrayOf("1^main:1^"), arrayOf("1^main:2^launchIn-for-cold")) 189 yield() 190 expectAny(arrayOf("1^main:1^"), arrayOf("1^main:2^launchIn-for-cold")) 191 } 192 193 runTest(totalEvents = 10) { 194 val sharedFlow = coldFlow.shareIn(this, SharingStarted.Eagerly, 5) 195 yield() 196 expect("1^main") 197 coldFlow.launchInTraced("launchIn-for-cold", this) 198 val job = sharedFlow.launchIn(this) 199 yield() 200 expect("1^main") 201 job.cancel() 202 } 203 } 204 205 @Test collectFlow_launchIn_and_shareInnull206 fun collectFlow_launchIn_and_shareIn() { 207 val coldFlow = flow { 208 expectAny(arrayOf("1^main:1^shareIn-name"), arrayOf("1^main:2^launchIn-for-cold")) 209 yield() 210 expectAny(arrayOf("1^main:1^shareIn-name"), arrayOf("1^main:2^launchIn-for-cold")) 211 emit(42) 212 expectAny(arrayOf("1^main:1^shareIn-name"), arrayOf("1^main:2^launchIn-for-cold")) 213 yield() 214 expectAny(arrayOf("1^main:1^shareIn-name"), arrayOf("1^main:2^launchIn-for-cold")) 215 } 216 217 runTest(totalEvents = 12) { 218 val sharedFlow = coldFlow.shareInTraced("shareIn-name", this, SharingStarted.Eagerly, 5) 219 yield() 220 expect("1^main") 221 coldFlow 222 .onEach { expect("1^main:2^launchIn-for-cold") } 223 .launchInTraced("launchIn-for-cold", this) 224 val job = 225 sharedFlow 226 .onEach { 227 expect( 228 "1^main:3^launchIn-for-hot", 229 "collect:shareIn-name", 230 "emit:shareIn-name", 231 ) 232 } 233 .launchInTraced("launchIn-for-hot", this) 234 expect("1^main") 235 delay(10) 236 job.cancel() 237 } 238 } 239 240 @Test collectFlow_badUsageOfCoroutineTraceName_coldFlowOnDifferentThreadnull241 fun collectFlow_badUsageOfCoroutineTraceName_coldFlowOnDifferentThread() { 242 val thread1 = bgThread1 243 // Example of bad usage of CoroutineTraceName. CoroutineTraceName is an internal API. 244 // It should only be used during collection, or whenever a coroutine is launched. 245 // It should not be used as an intermediate operator. 246 val coldFlow = 247 flow { 248 expect("1^main:1^fused-name") 249 yield() 250 expect() // <-- empty due to CoroutineTraceName overwriting TraceContextElement 251 emit(21) 252 expect() // <-- empty due to CoroutineTraceName overwriting TraceContextElement 253 yield() 254 expect() // <-- empty due to CoroutineTraceName overwriting TraceContextElement 255 } 256 // "UNUSED_MIDDLE_NAME" is overwritten during operator fusion because the thread 257 // of the flow did not change, meaning no new coroutine needed to be created. 258 // However, using CoroutineTraceName("UNUSED_MIDDLE_NAME") is bad because it will 259 // replace CoroutineTracingContext on the resumed thread 260 .flowOn(CoroutineTraceName("UNUSED_MIDDLE_NAME") + thread1) 261 .map { 262 expect("1^main:1^fused-name") 263 it * 2 264 } 265 .flowOn(CoroutineTraceName("fused-name") + thread1) 266 267 runTest(totalEvents = 9) { 268 expect("1^main") 269 coldFlow.collect { 270 assertEquals(42, it) 271 expect("1^main") 272 yield() 273 expect("1^main") 274 } 275 expect("1^main") 276 } 277 } 278 279 @Test collectFlow_operatorFusion_preventedByTracingnull280 fun collectFlow_operatorFusion_preventedByTracing() { 281 val coldFlow = 282 flow { 283 expect("1^main:1^:1^", "collect:AAA") 284 yield() 285 expect("1^main:1^:1^", "collect:AAA") 286 emit(42) 287 expect("1^main:1^:1^", "collect:AAA") 288 yield() 289 expect("1^main:1^:1^", "collect:AAA") 290 } 291 .flowName("AAA") 292 .flowOn(bgThread1) 293 // because we added tracing, work unnecessarily runs on bgThread2. This would be 294 // like adding a `.transform{}` or `.onEach{}` call between `.flowOn()` operators. 295 // The problem is not unique to tracing, but this test is to show there is still 296 // overhead when tracing is disabled, so it should not be used everywhere. 297 .flowName("BBB") 298 .flowOn(bgThread2) 299 .flowName("CCC") 300 301 runTest(totalEvents = 8) { 302 expect("1^main") 303 coldFlow.collectTraced( 304 "DDD" 305 ) { // CCC and DDD aren't fused together like how contexts are in `.flowOn()` 306 assertEquals(42, it) 307 expect("1^main", "collect:DDD", "collect:CCC", "emit:CCC", "emit:DDD") 308 yield() 309 expect("1^main", "collect:DDD", "collect:CCC", "emit:CCC", "emit:DDD") 310 } 311 expect("1^main") 312 } 313 } 314 315 @Test collectFlow_operatorFusion_happensBecauseNoTracingnull316 fun collectFlow_operatorFusion_happensBecauseNoTracing() { 317 val coldFlow = 318 flow { 319 expect("1^main:1^") 320 yield() 321 expect("1^main:1^") 322 emit(42) 323 expect("1^main:1^") 324 yield() 325 expect("1^main:1^") 326 } 327 .flowOn(bgThread1) // Operators are fused, and nothing ever executes on bgThread2 328 .flowOn(bgThread2) 329 .flowName("FLOW_NAME") 330 331 runTest(totalEvents = 8) { 332 expect("1^main") 333 coldFlow.collectTraced( 334 "COLLECT_NAME" 335 ) { // FLOW_NAME and COLLECT_NAME aren't fused together like how contexts 336 // are in `.flowOn()` 337 assertEquals(42, it) 338 expect( 339 "1^main", 340 "collect:COLLECT_NAME", 341 "collect:FLOW_NAME", 342 "emit:FLOW_NAME", 343 "emit:COLLECT_NAME", 344 ) 345 yield() 346 expect( 347 "1^main", 348 "collect:COLLECT_NAME", 349 "collect:FLOW_NAME", 350 "emit:FLOW_NAME", 351 "emit:COLLECT_NAME", 352 ) 353 } 354 expect("1^main") 355 } 356 } 357 358 @Test collectFlow_flowOnTracednull359 fun collectFlow_flowOnTraced() { 360 val thread1 = bgThread1 361 val thread2 = bgThread2 362 // Example of bad usage of CoroutineTraceName. CoroutineTraceName is an internal API. 363 // It should only be used during collection, or whenever a coroutine is launched. 364 // It should not be used as an intermediate operator. 365 val op1 = flow { 366 expect("1^main:1^outer-name:1^inner-name") 367 yield() 368 expect() 369 emit(42) 370 expect() 371 yield() 372 expect() 373 } 374 val op2 = op1.flowOn(CoroutineTraceName("UNUSED_NAME") + thread2) 375 val op3 = op2.onEach { expect("1^main:1^outer-name:1^inner-name") } 376 val op4 = op3.flowOn(CoroutineTraceName("inner-name") + thread2) 377 val op5 = op4.onEach { expect("1^main:1^outer-name") } 378 val op6 = op5.flowOn(CoroutineTraceName("outer-name") + thread1) 379 380 runTest(totalEvents = 10) { 381 expect("1^main") 382 op6.collect { 383 assertEquals(42, it) 384 expect("1^main") 385 yield() 386 expect("1^main") 387 } 388 expect("1^main") 389 } 390 } 391 392 @Test collectFlow_coldFlowOnDifferentThreadnull393 fun collectFlow_coldFlowOnDifferentThread() { 394 val thread1 = bgThread1 395 val coldFlow = 396 flow { 397 expect("1^main:1^fused-name") 398 yield() 399 expect("1^main:1^fused-name") 400 emit(21) 401 expect("1^main:1^fused-name") 402 yield() 403 expect("1^main:1^fused-name") 404 } 405 .map { 406 expect("1^main:1^fused-name") 407 it * 2 408 } 409 .flowOn(CoroutineTraceName("fused-name") + thread1) 410 411 runTest(totalEvents = 9) { 412 expect("1^main") 413 coldFlow.collect { 414 assertEquals(42, it) 415 expect("1^main") 416 yield() 417 expect("1^main") 418 } 419 expect("1^main") 420 } 421 } 422 423 @Test collectTraced_coldFlowOnDifferentThreadnull424 fun collectTraced_coldFlowOnDifferentThread() { 425 val thread1 = bgThread1 426 val coldFlow = 427 flow { 428 expect("1^main:1^") 429 yield() 430 expect("1^main:1^") 431 emit(21) 432 expect("1^main:1^") 433 yield() 434 expect("1^main:1^") 435 } 436 .map { 437 expect("1^main:1^") 438 it * 2 439 } 440 .flowOn(thread1) 441 442 runTest(totalEvents = 9) { 443 expect("1^main") 444 coldFlow.collectTraced("coldFlow") { 445 assertEquals(42, it) 446 expect("1^main", "collect:coldFlow", "emit:coldFlow") 447 yield() 448 expect("1^main", "collect:coldFlow", "emit:coldFlow") 449 } 450 expect("1^main") 451 } 452 } 453 454 @Test collectTraced_collectWithTracedReceivernull455 fun collectTraced_collectWithTracedReceiver() { 456 val thread1 = bgThread1 457 val coldFlow = 458 flow { 459 expect("1^main:1^") 460 yield() 461 expect("1^main:1^") 462 emit(21) 463 expect("1^main:1^") 464 yield() 465 expect("1^main:1^") 466 } 467 .map { 468 expect("1^main:1^") 469 it * 2 470 } 471 .flowOn(thread1) 472 473 runTest(totalEvents = 9) { 474 expect("1^main") 475 coldFlow.traceCoroutine("AAA") { 476 collectTraced("coldFlow") { 477 assertEquals(42, it) 478 expect("1^main", "AAA", "collect:coldFlow", "emit:coldFlow") 479 yield() 480 expect("1^main", "AAA", "collect:coldFlow", "emit:coldFlow") 481 } 482 } 483 expect("1^main") 484 } 485 } 486 487 @Test collectFlow_nameBeforeDispatcherChangenull488 fun collectFlow_nameBeforeDispatcherChange() { 489 val thread1 = bgThread1 490 val coldFlow = 491 flow { 492 expect("1^main:1^new-name") 493 yield() 494 expect("1^main:1^new-name") 495 emit(42) 496 expect("1^main:1^new-name") 497 yield() 498 expect("1^main:1^new-name") 499 } 500 .flowOn(CoroutineTraceName("new-name")) 501 .flowOn(thread1) 502 runTest(totalEvents = 6) { 503 coldFlow.collect { 504 assertEquals(42, it) 505 expect("1^main") 506 yield() 507 expect("1^main") 508 } 509 } 510 } 511 512 @Test collectFlow_nameAfterDispatcherChangenull513 fun collectFlow_nameAfterDispatcherChange() { 514 val thread1 = bgThread1 515 val coldFlow = 516 flow { 517 expect("1^main:1^new-name") 518 yield() 519 expect("1^main:1^new-name") 520 emit(42) 521 expect("1^main:1^new-name") 522 yield() 523 expect("1^main:1^new-name") 524 } 525 .flowOn(thread1) 526 .flowOn(CoroutineTraceName("new-name")) 527 runTest(totalEvents = 6) { 528 coldFlow.collect { 529 assertEquals(42, it) 530 expect("1^main") 531 yield() 532 expect("1^main") 533 } 534 } 535 } 536 537 @Test collectFlow_nameBeforeAndAfterDispatcherChangenull538 fun collectFlow_nameBeforeAndAfterDispatcherChange() { 539 val thread1 = bgThread1 540 val coldFlow = 541 flow { 542 expect("1^main:1^new-name") 543 yield() 544 expect("1^main:1^new-name") 545 emit(42) 546 expect("1^main:1^new-name") 547 yield() 548 expect("1^main:1^new-name") 549 } 550 .flowOn(CoroutineTraceName("new-name")) 551 .flowOn(thread1) 552 // Unused because, when fused, the previous upstream context takes precedence 553 .flowOn(CoroutineTraceName("UNUSED_NAME")) 554 555 runTest { 556 coldFlow.collect { 557 assertEquals(42, it) 558 expect("1^main") 559 } 560 yield() 561 expect("1^main") 562 } 563 } 564 565 @Test collectTraced_mapLatestnull566 fun collectTraced_mapLatest() { 567 val coldFlow = 568 flow { 569 expect("1^main:1^:1^") 570 emit(1) 571 expect("1^main:1^:1^") 572 emit(21) 573 expect("1^main:1^:1^") 574 } 575 .filterTraced("mod2") { 576 // called twice because upstream has 2 emits 577 expect("1^main:1^:1^", "mod2") 578 true 579 } 580 .run { 581 traceCoroutine("CCC") { 582 mapLatest { 583 traceCoroutine("DDD") { 584 expectAny( 585 arrayOf("1^main:1^:1^", "1^main:1^:1^:1^", "DDD"), 586 arrayOf("1^main:1^:1^", "1^main:1^:1^:2^", "DDD"), 587 ) 588 it * 2 589 } 590 } 591 } 592 } 593 594 runTest(totalEvents = 10) { 595 expect("1^main") // top-level scope 596 traceCoroutine("AAA") { 597 coldFlow.collectLatest { 598 traceCoroutine("BBB") { 599 delay(50) 600 assertEquals(42, it) 601 expect("1^main:1^:3^", "BBB") 602 } 603 } 604 } 605 expect("1^main") 606 } 607 } 608 609 @Test collectFlow_badNameUsagenull610 fun collectFlow_badNameUsage() { 611 val barrier1 = CompletableDeferred<Unit>() 612 val barrier2 = CompletableDeferred<Unit>() 613 val thread1 = bgThread1 614 val thread2 = bgThread2 615 val thread3 = bgThread3 616 val coldFlow = 617 flow { 618 expect("1^main:1^name-for-filter:1^name-for-map:1^name-for-emit") 619 yield() 620 expect("1^main:1^name-for-filter:1^name-for-map:1^name-for-emit") 621 emit(42) 622 barrier1.await() 623 expect("1^main:1^name-for-filter:1^name-for-map:1^name-for-emit") 624 yield() 625 expect("1^main:1^name-for-filter:1^name-for-map:1^name-for-emit") 626 barrier2.complete(Unit) 627 } 628 .flowOn(CoroutineTraceName("name-for-emit")) 629 .flowOn(thread3) 630 .map { 631 expect("1^main:1^name-for-filter:1^name-for-map") 632 yield() 633 expect("1^main:1^name-for-filter:1^name-for-map") 634 it 635 } 636 .flowOn(CoroutineTraceName("name-for-map")) // <-- This only works because the 637 // dispatcher changes; this behavior should not be relied on. 638 .flowOn(thread2) 639 .flowOn(CoroutineTraceName("UNUSED_NAME")) // <-- Unused because, when fused, the 640 // previous upstream context takes precedence 641 .filter { 642 expect("1^main:1^name-for-filter") 643 yield() 644 expect("1^main:1^name-for-filter") 645 true 646 } 647 .flowOn(CoroutineTraceName("name-for-filter")) 648 .flowOn(thread1) 649 650 runTest(totalEvents = 11) { 651 expect("1^main") 652 coldFlow.collect { 653 assertEquals(42, it) 654 expect("1^main") 655 barrier1.complete(Unit) 656 } 657 barrier2.await() 658 expect("1^main") 659 } 660 } 661 662 @Test collectFlow_withIntermediateOperatorNamesnull663 fun collectFlow_withIntermediateOperatorNames() { 664 val coldFlow = 665 flow { 666 expect(2, "1^main", "collect:do-the-assert") 667 emit(21) // 42 / 2 = 21 668 expect(6, "1^main", "collect:do-the-assert") 669 } 670 .mapTraced("multiply-by-3") { 671 expect(3, "1^main", "collect:do-the-assert", "multiply-by-3") 672 it * 2 673 } 674 .filterTraced("mod-2") { 675 expect(4, "1^main", "collect:do-the-assert", "mod-2") 676 it % 2 == 0 677 } 678 runTest(totalEvents = 7) { 679 expect(1, "1^main") 680 681 coldFlow.collectTraced("do-the-assert") { 682 assertEquals(42, it) 683 expect(5, "1^main", "collect:do-the-assert", "emit:do-the-assert") 684 } 685 expect(7, "1^main") 686 } 687 } 688 689 @Test collectFlow_mapLatestnull690 fun collectFlow_mapLatest() { 691 val coldFlow = flowOf(1, 2, 3) 692 runTest(totalEvents = 6) { 693 expect("1^main") 694 coldFlow 695 .mapLatestTraced("AAA") { 696 expectAny( 697 arrayOf( 698 "1^main:1^", 699 "collect:mapLatest:AAA", 700 "emit:mapLatest:AAA", 701 "1^main:1^:1^", 702 "AAA", 703 ), 704 arrayOf( 705 "1^main:1^", 706 "collect:mapLatest:AAA", 707 "emit:mapLatest:AAA", 708 "1^main:1^:2^", 709 "AAA", 710 ), 711 arrayOf( 712 "1^main:1^", 713 "collect:mapLatest:AAA", 714 "emit:mapLatest:AAA", 715 "1^main:1^:3^", 716 "AAA", 717 ), 718 ) 719 delay(10) 720 expect("1^main:1^:3^", "AAA") 721 } 722 .collect() 723 expect("1^main") 724 } 725 } 726 727 @Test collectFlow_collectLatestnull728 fun collectFlow_collectLatest() { 729 val coldFlow = flowOf(1, 2, 3) 730 runTest(totalEvents = 6) { 731 expect("1^main") 732 coldFlow.collectLatestTraced("CCC") { 733 expectAny( 734 arrayOf( 735 "1^main:1^", 736 "collect:collectLatest:CCC", 737 "emit:collectLatest:CCC", 738 "1^main:1^:1^", 739 "CCC", 740 ), 741 arrayOf( 742 "1^main:1^", 743 "collect:collectLatest:CCC", 744 "emit:collectLatest:CCC", 745 "1^main:1^:2^", 746 "CCC", 747 ), 748 arrayOf( 749 "1^main:1^", 750 "collect:collectLatest:CCC", 751 "emit:collectLatest:CCC", 752 "1^main:1^:3^", 753 "CCC", 754 ), 755 ) 756 delay(10) 757 expect("1^main:1^:3^", "CCC") 758 } 759 expect("1^main") 760 } 761 } 762 763 @Test collectFlow_mapLatest_collectLatestnull764 fun collectFlow_mapLatest_collectLatest() { 765 val coldFlow = flowOf(1, 2, 3) 766 runTest(totalEvents = 7) { 767 expect("1^main") 768 coldFlow 769 .mapLatestTraced("AAA") { 770 expectAny( 771 arrayOf( 772 "1^main:1^:1^", 773 "collect:mapLatest:AAA", 774 "emit:mapLatest:AAA", 775 "1^main:1^:1^:1^", 776 "AAA", 777 ), 778 arrayOf( 779 "1^main:1^:1^", 780 "collect:mapLatest:AAA", 781 "emit:mapLatest:AAA", 782 "1^main:1^:1^:2^", 783 "AAA", 784 ), 785 arrayOf( 786 "1^main:1^:1^", 787 "collect:mapLatest:AAA", 788 "emit:mapLatest:AAA", 789 "1^main:1^:1^:3^", 790 "AAA", 791 ), 792 ) 793 delay(10) 794 expect("1^main:1^:1^:3^", "AAA") 795 } 796 .collectLatestTraced("CCC") { 797 expect( 798 "1^main:1^", 799 "collect:collectLatest:CCC", 800 "emit:collectLatest:CCC", 801 "1^main:1^:2^", 802 "CCC", 803 ) 804 } 805 expect("1^main") 806 } 807 } 808 809 @Test collectFlow_stateInnull810 fun collectFlow_stateIn() { 811 val otherScope = 812 CoroutineScope( 813 createCoroutineTracingContext("other-scope", testMode = true) + 814 bgThread1 + 815 scope.coroutineContext.job 816 ) 817 val coldFlow = 818 flowOf(1, 2) 819 .onEach { 820 delay(2) 821 expectAny(arrayOf("1^STATE_1"), arrayOf("2^STATE_2")) 822 } 823 .flowOn(bgThread2) 824 825 runTest(totalEvents = 10) { 826 expect("1^main") 827 828 val state1 = coldFlow.stateInTraced("STATE_1", otherScope.plus(bgThread2)) 829 val state2 = coldFlow.stateInTraced("STATE_2", otherScope, SharingStarted.Lazily, 42) 830 831 delay(20) 832 833 val job1 = 834 state1 835 .onEach { expect("1^main:1^LAUNCH_1", "collect:STATE_1", "emit:STATE_1") } 836 .launchInTraced("LAUNCH_1", this) 837 assertEquals(42, state2.value) 838 val job2 = 839 state2 840 .onEach { expect("1^main:2^LAUNCH_2", "collect:STATE_2", "emit:STATE_2") } 841 .launchInTraced("LAUNCH_2", this) 842 843 delay(10) 844 expect("1^main") 845 846 delay(10) 847 848 job1.cancel() 849 job2.cancel() 850 } 851 } 852 853 @Test tracedMutableStateFlow_collectionnull854 fun tracedMutableStateFlow_collection() { 855 val state = MutableStateFlow(1).traceAs("NAME") 856 857 runTest(totalEvents = 3) { 858 expect("1^main") 859 launchTraced("LAUNCH") { 860 delay(10) 861 state.value = 2 862 } 863 val job = 864 launchTraced("LAUNCH_FOR_COLLECT") { 865 state.collect { 866 expect("1^main:2^LAUNCH_FOR_COLLECT", "collect:NAME", "emit:NAME") 867 } 868 } 869 delay(100) 870 job.cancel() 871 } 872 } 873 } 874