1[//]: # (title: Coroutines and channels − tutorial) 2 3In this tutorial, you'll learn how to use coroutines in IntelliJ IDEA to perform network requests without blocking the 4underlying thread or callbacks. 5 6> No prior knowledge of coroutines is required, but you're expected to be familiar with basic Kotlin syntax. 7> 8{type="tip"} 9 10You'll learn: 11 12* Why and how to use suspending functions to perform network requests. 13* How to send requests concurrently using coroutines. 14* How to share information between different coroutines using channels. 15 16For network requests, you'll need the [Retrofit](https://square.github.io/retrofit/) library, but the approach shown in 17this tutorial works similarly for any other libraries that support coroutines. 18 19> You can find solutions for all of the tasks on the `solutions` branch of the [project's repository](http://github.com/kotlin-hands-on/intro-coroutines). 20> 21{type="tip"} 22 23## Before you start 24 251. Download and install the latest version of [IntelliJ IDEA](https://www.jetbrains.com/idea/download/index.html). 262. Clone the [project template](http://github.com/kotlin-hands-on/intro-coroutines) by choosing **Get from VCS** on the 27 Welcome screen or selecting **File | New | Project from Version Control**. 28 29 You can also clone it from the command line: 30 31 ```Bash 32 git clone https://github.com/kotlin-hands-on/intro-coroutines 33 ``` 34 35### Generate a GitHub developer token 36 37You'll be using the GitHub API in your project. To get access, provide your GitHub account name and either a password or a 38token. If you have two-factor authentication enabled, a token will be enough. 39 40Generate a new GitHub token to use the GitHub API with [your account](https://github.com/settings/tokens/new): 41 421. Specify the name of your token, for example, `coroutines-tutorial`: 43 44 {width=700} 45 462. Do not select any scopes. Click **Generate token** at the bottom of the page. 473. Copy the generated token. 48 49### Run the code 50 51The program loads the contributors for all of the repositories under the given organization (named “kotlin” by default). 52Later you'll add logic to sort the users by the number of their contributions. 53 541. Open the `src/contributors/main.kt` file and run the `main()` function. You'll see the following window: 55 56 {width=500} 57 58 If the font is too small, adjust it by changing the value of `setDefaultFontSize(18f)` in the `main()` function. 59 602. Provide your GitHub username and token (or password) in the corresponding fields. 613. Make sure that the _BLOCKING_ option is selected in the _Variant_ dropdown menu. 624. Click _Load contributors_. The UI should freeze for some time and then show the list of contributors. 635. Open the program output to ensure the data has been loaded. The list of contributors is logged after each successful request. 64 65There are different ways of implementing this logic: by using [blocking requests](#blocking-requests) 66or [callbacks](#callbacks). You'll compare these solutions with one that uses [coroutines](#coroutines) and see how 67[channels](#channels) can be used to share information between different coroutines. 68 69## Blocking requests 70 71You will use the [Retrofit](https://square.github.io/retrofit/) library to perform HTTP requests to GitHub. It allows 72requesting the list of repositories under the given organization and the list of contributors for each repository: 73 74```kotlin 75interface GitHubService { 76 @GET("orgs/{org}/repos?per_page=100") 77 fun getOrgReposCall( 78 @Path("org") org: String 79 ): Call<List<Repo>> 80 81 @GET("repos/{owner}/{repo}/contributors?per_page=100") 82 fun getRepoContributorsCall( 83 @Path("owner") owner: String, 84 @Path("repo") repo: String 85 ): Call<List<User>> 86} 87``` 88 89This API is used by the `loadContributorsBlocking()` function to fetch the list of contributors for the given organization. 90 911. Open `src/tasks/Request1Blocking.kt` to see its implementation: 92 93 ```kotlin 94 fun loadContributorsBlocking(service: GitHubService, req: RequestData): List<User> { 95 val repos = service 96 .getOrgReposCall(req.org) // #1 97 .execute() // #2 98 .also { logRepos(req, it) } // #3 99 .body() ?: emptyList() // #4 100 101 return repos.flatMap { repo -> 102 service 103 .getRepoContributorsCall(req.org, repo.name) // #1 104 .execute() // #2 105 .also { logUsers(repo, it) } // #3 106 .bodyList() // #4 107 }.aggregate() 108 } 109 ``` 110 111 * At first, you get a list of the repositories under the given organization and store it in the `repos` list. Then for 112 each repository, the list of contributors is requested, and all of the lists are merged into one final list of 113 contributors. 114 * `getOrgReposCall()` and `getRepoContributorsCall()` both return an instance of the `*Call` class (`#1`). At this point, 115 no request is sent. 116 * `*Call.execute()` is then invoked to perform the request (`#2`). `execute()` is a synchronous call that blocks the 117 underlying thread. 118 * When you get the response, the result is logged by calling the specific `logRepos()` and `logUsers()` functions (`#3`). 119 If the HTTP response contains an error, this error will be logged here. 120 * Finally, get the response's body, which contains the data you need. For this tutorial, you'll use an empty list as a 121 result in case there is an error, and you'll log the corresponding error (`#4`). 122 1232. To avoid repeating `.body() ?: emptyList()`, an extension function `bodyList()` is declared: 124 125 ```kotlin 126 fun <T> Response<List<T>>.bodyList(): List<T> { 127 return body() ?: emptyList() 128 } 129 ``` 130 1313. Run the program again and take a look at the system output in IntelliJ IDEA. It should have something like this: 132 133 ```text 134 1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos 135 2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors 136 2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors 137 ... 138 ``` 139 140 * The first item on each line is the number of milliseconds that have passed since the program started, then the thread 141 name in square brackets. You can see from which thread the loading request is called. 142 * The final item on each line is the actual message: how many repositories or contributors were loaded. 143 144 This log output demonstrates that all of the results were logged from the main thread. When you run the code with a _BLOCKING_ 145 option, the window freezes and doesn't react to input until the loading is finished. All of the requests are executed from 146 the same thread as the one called `loadContributorsBlocking()` is from, which is the main UI thread (in Swing, it's an AWT 147 event dispatching thread). This main thread becomes blocked, and that's why the UI is frozen: 148 149 {width=700} 150 151 After the list of contributors has loaded, the result is updated. 152 1534. In `src/contributors/Contributors.kt`, find the `loadContributors()` function responsible for choosing how 154 the contributors are loaded and look at how `loadContributorsBlocking()` is called: 155 156 ```kotlin 157 when (getSelectedVariant()) { 158 BLOCKING -> { // Blocking UI thread 159 val users = loadContributorsBlocking(service, req) 160 updateResults(users, startTime) 161 } 162 } 163 ``` 164 165 * The `updateResults()` call goes right after the `loadContributorsBlocking()` call. 166 * `updateResults()` updates the UI, so it must always be called from the UI thread. 167 * Since `loadContributorsBlocking()` is also called from the UI thread, the UI thread becomes blocked and the UI is 168 frozen. 169 170### Task 1 171 172The first task helps you familiarize yourself with the task domain. Currently, each contributor's name is repeated 173several times, once for every project they have taken part in. Implement the `aggregate()` function combining the users 174so that each contributor is added only once. The `User.contributions` property should contain the total number of 175contributions of the given user to _all_ the projects. The resulting list should be sorted in descending order according 176to the number of contributions. 177 178Open `src/tasks/Aggregation.kt` and implement the `List<User>.aggregate()` function. Users should be sorted by the total 179number of their contributions. 180 181The corresponding test file `test/tasks/AggregationKtTest.kt` shows an example of the expected result. 182 183> You can jump between the source code and the test class automatically by using the [IntelliJ IDEA shortcut](https://www.jetbrains.com/help/idea/create-tests.html#test-code-navigation) 184> `Ctrl+Shift+T` / `⇧ ⌘ T`. 185> 186{type="tip"} 187 188After implementing this task, the resulting list for the "kotlin" organization should be similar to the following: 189 190{width=500} 191 192#### Solution for task 1 {initial-collapse-state="collapsed"} 193 1941. To group users by login, use [`groupBy()`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/group-by.html), 195 which returns a map from a login to all occurrences of the user with this login in different repositories. 1962. For each map entry, count the total number of contributions for each user and create a new instance of the `User` class 197 by the given name and total of contributions. 1983. Sort the resulting list in descending order: 199 200 ```kotlin 201 fun List<User>.aggregate(): List<User> = 202 groupBy { it.login } 203 .map { (login, group) -> User(login, group.sumOf { it.contributions }) } 204 .sortedByDescending { it.contributions } 205 ``` 206 207An alternative solution is to use the [`groupingBy()`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/grouping-by.html) 208function instead of `groupBy()`. 209 210## Callbacks 211 212The previous solution works, but it blocks the thread and therefore freezes the UI. A traditional approach that avoids this 213is to use _callbacks_. 214 215Instead of calling the code that should be invoked right after the operation is completed, you can extract it 216into a separate callback, often a lambda, and pass that lambda to the caller in order for it to be called later. 217 218To make the UI responsive, you can either move the whole computation to a separate thread or switch to the Retrofit API 219which uses callbacks instead of blocking calls. 220 221### Use a background thread 222 2231. Open `src/tasks/Request2Background.kt` and see its implementation. First, the whole computation is moved to a different 224 thread. The `thread()` function starts a new thread: 225 226 ```kotlin 227 thread { 228 loadContributorsBlocking(service, req) 229 } 230 ``` 231 232 Now that all of the loading has been moved to a separate thread, the main thread is free and can be occupied by other 233 tasks: 234 235 {width=700} 236 2372. The signature of the `loadContributorsBackground()` function changes. It takes an `updateResults()` 238 callback as the last argument to call it after all the loading completes: 239 240 ```kotlin 241 fun loadContributorsBackground( 242 service: GitHubService, req: RequestData, 243 updateResults: (List<User>) -> Unit 244 ) 245 ``` 246 2473. Now when the `loadContributorsBackground()` is called, the `updateResults()` call goes in the callback, not immediately 248 afterward as it did before: 249 250 ```kotlin 251 loadContributorsBackground(service, req) { users -> 252 SwingUtilities.invokeLater { 253 updateResults(users, startTime) 254 } 255 } 256 ``` 257 258 By calling `SwingUtilities.invokeLater`, you ensure that the `updateResults()` call, which updates the results, 259 happens on the main UI thread (AWT event dispatching thread). 260 261However, if you try to load the contributors via the `BACKGROUND` option, you can see that the list is updated but 262nothing changes. 263 264### Task 2 265 266Fix the `loadContributorsBackground()` function in `src/tasks/Request2Background.kt` so that the resulting list is shown 267in the UI. 268 269#### Solution for task 2 {initial-collapse-state="collapsed"} 270 271If you try to load the contributors, you can see in the log that the contributors are loaded but the result isn't displayed. 272To fix this, call `updateResults()` on the resulting list of users: 273 274```kotlin 275thread { 276 updateResults(loadContributorsBlocking(service, req)) 277} 278``` 279 280Make sure to call the logic passed in the callback explicitly. Otherwise, nothing will happen. 281 282### Use the Retrofit callback API 283 284In the previous solution, the whole loading logic is moved to the background thread, but that still isn't the best use of 285resources. All of the loading requests go sequentially and the thread is blocked while waiting for the loading result, 286while it could have been occupied by other tasks. Specifically, the thread could start loading another request to 287receive the entire result earlier. 288 289Handling the data for each repository should then be divided into two parts: loading and processing the 290resulting response. The second _processing_ part should be extracted into a callback. 291 292The loading for each repository can then be started before the result for the previous repository is received (and the 293corresponding callback is called): 294 295{width=700} 296 297The Retrofit callback API can help achieve this. The `Call.enqueue()` function starts an HTTP request and takes a 298callback as an argument. In this callback, you need to specify what needs to be done after each request. 299 300Open `src/tasks/Request3Callbacks.kt` and see the implementation of `loadContributorsCallbacks()` that uses this API: 301 302```kotlin 303fun loadContributorsCallbacks( 304 service: GitHubService, req: RequestData, 305 updateResults: (List<User>) -> Unit 306) { 307 service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1 308 logRepos(req, responseRepos) 309 val repos = responseRepos.bodyList() 310 311 val allUsers = mutableListOf<User>() 312 for (repo in repos) { 313 service.getRepoContributorsCall(req.org, repo.name) 314 .onResponse { responseUsers -> // #2 315 logUsers(repo, responseUsers) 316 val users = responseUsers.bodyList() 317 allUsers += users 318 } 319 } 320 } 321 // TODO: Why doesn't this code work? How to fix that? 322 updateResults(allUsers.aggregate()) 323 } 324``` 325 326* For convenience, this code fragment uses the `onResponse()` extension function declared in the same file. It takes a 327 lambda as an argument rather than an object expression. 328* The logic for handling the responses is extracted into callbacks: the corresponding lambdas start at lines `#1` and `#2`. 329 330However, the provided solution doesn't work. If you run the program and load contributors by choosing the _CALLBACKS_ 331option, you'll see that nothing is shown. However, the tests that immediately return the result pass. 332 333Think about why the given code doesn't work as expected and try to fix it, or see the solutions below. 334 335### Task 3 (optional) 336 337Rewrite the code in the `src/tasks/Request3Callbacks.kt` file so that the loaded list of contributors is shown. 338 339#### The first attempted solution for task 3 {initial-collapse-state="collapsed"} 340 341In the current solution, many requests are started concurrently, which decreases the total loading time. However, 342the result isn't loaded. This is because the `updateResults()` callback is called right after all of the loading requests are started, 343before the `allUsers` list has been filled with the data. 344 345You could try to fix this with a change like the following: 346 347```kotlin 348val allUsers = mutableListOf<User>() 349for ((index, repo) in repos.withIndex()) { // #1 350 service.getRepoContributorsCall(req.org, repo.name) 351 .onResponse { responseUsers -> 352 logUsers(repo, responseUsers) 353 val users = responseUsers.bodyList() 354 allUsers += users 355 if (index == repos.lastIndex) { // #2 356 updateResults(allUsers.aggregate()) 357 } 358 } 359} 360``` 361 362* First, you iterate over the list of repos with an index (`#1`). 363* Then, from each callback, you check whether it's the last iteration (`#2`). 364* And if that's the case, the result is updated. 365 366However, this code also fails to achieve our objective. Try to find the answer yourself, or see the solution below. 367 368#### The second attempted solution for task 3 {initial-collapse-state="collapsed"} 369 370Since the loading requests are started concurrently, there's no guarantee that the result for the last one comes last. The 371results can come in any order. 372 373Thus, if you compare the current index with the `lastIndex` as a condition for completion, you risk losing the results for 374some repos. 375 376If the request that processes the last repo returns faster than some prior requests (which is likely to happen), all of the 377results for requests that take more time will be lost. 378 379One way to fix this is to introduce an index and check whether all of the repositories have already been processed: 380 381```kotlin 382val allUsers = Collections.synchronizedList(mutableListOf<User>()) 383val numberOfProcessed = AtomicInteger() 384for (repo in repos) { 385 service.getRepoContributorsCall(req.org, repo.name) 386 .onResponse { responseUsers -> 387 logUsers(repo, responseUsers) 388 val users = responseUsers.bodyList() 389 allUsers += users 390 if (numberOfProcessed.incrementAndGet() == repos.size) { 391 updateResults(allUsers.aggregate()) 392 } 393 } 394} 395``` 396 397This code uses a synchronized version of the list and `AtomicInteger()` because, in general, there's no guarantee that 398different callbacks that process `getRepoContributors()` requests will always be called from the same thread. 399 400#### The third attempted solution for task 3 {initial-collapse-state="collapsed"} 401 402An even better solution is to use the `CountDownLatch` class. It stores a counter initialized with the number of 403repositories. This counter is decremented after processing each repository. It then waits until the latch is counted 404down to zero before updating the results: 405 406```kotlin 407val countDownLatch = CountDownLatch(repos.size) 408for (repo in repos) { 409 service.getRepoContributorsCall(req.org, repo.name) 410 .onResponse { responseUsers -> 411 // processing repository 412 countDownLatch.countDown() 413 } 414} 415countDownLatch.await() 416updateResults(allUsers.aggregate()) 417``` 418 419The result is then updated from the main thread. This is more direct than delegating the logic to the child threads. 420 421After reviewing these three attempts at a solution, you can see that writing correct code with callbacks is non-trivial 422and error-prone, especially when several underlying threads and synchronization occur. 423 424> As an additional exercise, you can implement the same logic using a reactive approach with the RxJava library. All of the 425> necessary dependencies and solutions for using RxJava can be found in a separate `rx` branch. It is also possible to 426> complete this tutorial and implement or check the proposed Rx versions for a proper comparison. 427> 428{type="tip"} 429 430## Suspending functions 431 432You can implement the same logic using suspending functions. Instead of returning `Call<List<Repo>>`, define the API 433call as a [suspending function](composing-suspending-functions.md) as follows: 434 435```kotlin 436interface GitHubService { 437 @GET("orgs/{org}/repos?per_page=100") 438 suspend fun getOrgRepos( 439 @Path("org") org: String 440 ): List<Repo> 441} 442``` 443 444* `getOrgRepos()` is defined as a `suspend` function. When you use a suspending function to perform a request, the 445 underlying thread isn't blocked. More details about how this works will come in later sections. 446* `getOrgRepos()` returns the result directly instead of returning a `Call`. If the result is unsuccessful, an 447 exception is thrown. 448 449Alternatively, Retrofit allows returning the result wrapped in `Response`. In this case, the result body is 450provided, and it is possible to check for errors manually. This tutorial uses the versions that return `Response`. 451 452In `src/contributors/GitHubService.kt`, add the following declarations to the `GitHubService` interface: 453 454```kotlin 455interface GitHubService { 456 // getOrgReposCall & getRepoContributorsCall declarations 457 458 @GET("orgs/{org}/repos?per_page=100") 459 suspend fun getOrgRepos( 460 @Path("org") org: String 461 ): Response<List<Repo>> 462 463 @GET("repos/{owner}/{repo}/contributors?per_page=100") 464 suspend fun getRepoContributors( 465 @Path("owner") owner: String, 466 @Path("repo") repo: String 467 ): Response<List<User>> 468} 469``` 470 471### Task 4 472 473Your task is to change the code of the function that loads contributors to make use of two new suspending functions, 474`getOrgRepos()` and `getRepoContributors()`. The new `loadContributorsSuspend()` function is marked as `suspend` to use the 475new API. 476 477> Suspending functions can't be called everywhere. Calling a suspending function from `loadContributorsBlocking()` will 478> result in an error with the message "Suspend function 'getOrgRepos' should be called only from a coroutine or another 479> suspend function". 480> 481{type="note"} 482 4831. Copy the implementation of `loadContributorsBlocking()` that is defined in `src/tasks/Request1Blocking.kt` 484 into the `loadContributorsSuspend()` that is defined in `src/tasks/Request4Suspend.kt`. 4852. Modify the code so that the new suspending functions are used instead of the ones that return `Call`s. 4863. Run the program by choosing the _SUSPEND_ option and ensure that the UI is still responsive while the GitHub requests 487 are performed. 488 489#### Solution for task 4 {initial-collapse-state="collapsed"} 490 491Replace `.getOrgReposCall(req.org).execute()` with `.getOrgRepos(req.org)` and repeat the same replacement for the 492second "contributors" request: 493 494```kotlin 495suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> { 496 val repos = service 497 .getOrgRepos(req.org) 498 .also { logRepos(req, it) } 499 .bodyList() 500 501 return repos.flatMap { repo -> 502 service.getRepoContributors(req.org, repo.name) 503 .also { logUsers(repo, it) } 504 .bodyList() 505 }.aggregate() 506} 507``` 508 509* `loadContributorsSuspend()` should be defined as a `suspend` function. 510* You no longer need to call `execute`, which returned the `Response` before, because now the API functions return 511 the `Response` directly. Note that this detail is specific to the Retrofit library. With other libraries, the API will be different, 512 but the concept is the same. 513 514## Coroutines 515 516The code with suspending functions looks similar to the "blocking" version. The major difference from the blocking version 517is that instead of blocking the thread, the coroutine is suspended: 518 519```text 520block -> suspend 521thread -> coroutine 522``` 523 524> Coroutines are often called lightweight threads because you can run code on coroutines, similar to how you run code on 525> threads. The operations that were blocking before (and had to be avoided) can now suspend the coroutine instead. 526> 527{type="note"} 528 529### Starting a new coroutine 530 531If you look at how `loadContributorsSuspend()` is used in `src/contributors/Contributors.kt`, you can see that it's 532called inside `launch`. `launch` is a library function that takes a lambda as an argument: 533 534```kotlin 535launch { 536 val users = loadContributorsSuspend(req) 537 updateResults(users, startTime) 538} 539``` 540 541Here `launch` starts a new computation that is responsible for loading the data and showing the results. The computation 542is suspendable – when performing network requests, it is suspended and releases the underlying thread. 543When the network request returns the result, the computation is resumed. 544 545Such a suspendable computation is called a _coroutine_. So, in this case, `launch` _starts a new coroutine_ responsible 546for loading data and showing the results. 547 548Coroutines run on top of threads and can be suspended. When a coroutine is suspended, the 549corresponding computation is paused, removed from the thread, and stored in memory. Meanwhile, the thread is free to be 550occupied by other tasks: 551 552{width=700} 553 554When the computation is ready to be continued, it is returned to a thread (not necessarily the same one). 555 556In the `loadContributorsSuspend()` example, each "contributors" request now waits for the result using the suspension 557mechanism. First, the new request is sent. Then, while waiting for the response, the whole "load contributors" coroutine 558that was started by the `launch` function is suspended. 559 560The coroutine resumes only after the corresponding response is received: 561 562{width=700} 563 564While the response is waiting to be received, the thread is free to be occupied by other tasks. The UI stays responsive, 565despite all the requests taking place on the main UI thread: 566 5671. Run the program using the _SUSPEND_ option. The log confirms that all of the requests are sent to the main UI thread: 568 569 ```text 570 2538 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 571 2729 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - ts2kt: loaded 11 contributors 572 3029 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-koans: loaded 45 contributors 573 ... 574 11252 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-coroutines-workshop: loaded 1 contributors 575 ``` 576 5772. The log can show you which coroutine the corresponding code is running on. To enable it, open **Run | Edit configurations** 578 and add the `-Dkotlinx.coroutines.debug` VM option: 579 580 {width=500} 581 582 The coroutine name will be attached to the thread name while `main()` is run with this option. You can also 583 modify the template for running all of the Kotlin files and enable this option by default. 584 585Now all of the code runs on one coroutine, the "load contributors" coroutine mentioned above, denoted as `@coroutine#1`. 586While waiting for the result, you shouldn't reuse the thread for sending other requests because the code is 587written sequentially. The new request is sent only when the previous result is received. 588 589Suspending functions treat the thread fairly and don't block it for "waiting". However, this doesn't yet bring any concurrency 590into the picture. 591 592## Concurrency 593 594Kotlin coroutines are much less resource-intensive than threads. 595Each time you want to start a new computation asynchronously, you can create a new coroutine instead. 596 597To start a new coroutine, use one of the main _coroutine builders_: `launch`, `async`, or `runBlocking`. Different 598libraries can define additional coroutine builders. 599 600`async` starts a new coroutine and returns a `Deferred` object. `Deferred` represents a concept known by other names 601such as `Future` or `Promise`. It stores a computation, but it _defers_ the moment you get the final result; 602it _promises_ the result sometime in the _future_. 603 604The main difference between `async` and `launch` is that `launch` is used to start a computation that isn't expected to 605return a specific result. `launch` returns a `Job` that represents the coroutine. It is possible to wait until it completes 606by calling `Job.join()`. 607 608`Deferred` is a generic type that extends `Job`. An `async` call can return a `Deferred<Int>` or a `Deferred<CustomType>`, 609depending on what the lambda returns (the last expression inside the lambda is the result). 610 611To get the result of a coroutine, you can call `await()` on the `Deferred` instance. While waiting for the result, 612the coroutine that this `await()` is called from is suspended: 613 614```kotlin 615import kotlinx.coroutines.* 616 617fun main() = runBlocking { 618 val deferred: Deferred<Int> = async { 619 loadData() 620 } 621 println("waiting...") 622 println(deferred.await()) 623} 624 625suspend fun loadData(): Int { 626 println("loading...") 627 delay(1000L) 628 println("loaded!") 629 return 42 630} 631``` 632 633`runBlocking` is used as a bridge between regular and suspending functions, or between the blocking and non-blocking worlds. It works 634as an adaptor for starting the top-level main coroutine. It is intended primarily to be used in `main()` functions and 635tests. 636 637> Watch [this video](https://www.youtube.com/watch?v=zEZc5AmHQhk) for a better understanding of coroutines. 638> 639{type="tip"} 640 641If there is a list of deferred objects, you can call `awaitAll()` to await the results of all of them: 642 643```kotlin 644import kotlinx.coroutines.* 645 646fun main() = runBlocking { 647 val deferreds: List<Deferred<Int>> = (1..3).map { 648 async { 649 delay(1000L * it) 650 println("Loading $it") 651 it 652 } 653 } 654 val sum = deferreds.awaitAll().sum() 655 println("$sum") 656} 657``` 658 659When each "contributors" request is started in a new coroutine, all of the requests are started asynchronously. A new request 660can be sent before the result for the previous one is received: 661 662{width=700} 663 664The total loading time is approximately the same as in the _CALLBACKS_ version, but it doesn't need any callbacks. 665What's more, `async` explicitly emphasizes which parts run concurrently in the code. 666 667### Task 5 668 669In the `Request5Concurrent.kt` file, implement a `loadContributorsConcurrent()` function by using the 670previous `loadContributorsSuspend()` function. 671 672#### Tip for task 5 {initial-collapse-state="collapsed"} 673 674You can only start a new coroutine inside a coroutine scope. Copy the content 675from `loadContributorsSuspend()` to the `coroutineScope` call so that you can call `async` functions there: 676 677```kotlin 678suspend fun loadContributorsConcurrent( 679 service: GitHubService, 680 req: RequestData 681): List<User> = coroutineScope { 682 // ... 683} 684``` 685 686Base your solution on the following scheme: 687 688```kotlin 689val deferreds: List<Deferred<List<User>>> = repos.map { repo -> 690 async { 691 // load contributors for each repo 692 } 693} 694deferreds.awaitAll() // List<List<User>> 695``` 696 697#### Solution for task 5 {initial-collapse-state="collapsed"} 698 699Wrap each "contributors" request with `async` to create as many coroutines as there are repositories. `async` 700returns `Deferred<List<User>>`. This is not an issue because creating new coroutines is not very resource-intensive, so you can 701create as many as you need. 702 7031. You can no longer use `flatMap` because the `map` result is now a list of `Deferred` objects, not a list of lists. 704 `awaitAll()` returns `List<List<User>>`, so call `flatten().aggregate()` to get the result: 705 706 ```kotlin 707 suspend fun loadContributorsConcurrent( 708 service: GitHubService, 709 req: RequestData 710 ): List<User> = coroutineScope { 711 val repos = service 712 .getOrgRepos(req.org) 713 .also { logRepos(req, it) } 714 .bodyList() 715 716 val deferreds: List<Deferred<List<User>>> = repos.map { repo -> 717 async { 718 service.getRepoContributors(req.org, repo.name) 719 .also { logUsers(repo, it) } 720 .bodyList() 721 } 722 } 723 deferreds.awaitAll().flatten().aggregate() 724 } 725 ``` 726 7272. Run the code and check the log. All of the coroutines still run on the main UI thread because 728 multithreading hasn't been employed yet, but you can already see the benefits of running coroutines concurrently. 7293. To change this code to run "contributors" coroutines on different threads from the common thread pool, 730 specify `Dispatchers.Default` as the context argument for the `async` function: 731 732 ```kotlin 733 async(Dispatchers.Default) { } 734 ``` 735 736 * `CoroutineDispatcher` determines what thread or threads the corresponding coroutine should be run on. If you don't 737 specify one as an argument, `async` will use the dispatcher from the outer scope. 738 * `Dispatchers.Default` represents a shared pool of threads on the JVM. This pool provides a means for parallel execution. 739 It consists of as many threads as there are CPU cores available, but it will still have two threads if there's only one core. 740 7414. Modify the code in the `loadContributorsConcurrent()` function to start new coroutines on different threads from the 742 common thread pool. Also, add additional logging before sending the request: 743 744 ```kotlin 745 async(Dispatchers.Default) { 746 log("starting loading for ${repo.name}") 747 service.getRepoContributors(req.org, repo.name) 748 .also { logUsers(repo, it) } 749 .bodyList() 750 } 751 ``` 752 7535. Run the program once again. In the log, you can see that each coroutine can be started on one thread from the 754 thread pool and resumed on another: 755 756 ```text 757 1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 758 1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO Contributors - starting loading for dokka 759 1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO Contributors - starting loading for ts2kt 760 ... 761 2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors 762 2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO Contributors - dokka: loaded 36 contributors 763 2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO Contributors - ts2kt: loaded 11 contributors 764 ``` 765 766 For instance, in this log excerpt, `coroutine#4` is started on the `worker-2` thread and continued on the 767 `worker-1` thread. 768 769In `src/contributors/Contributors.kt`, check the implementation of the _CONCURRENT_ option: 770 7711. To run the coroutine only on the main UI thread, specify `Dispatchers.Main` as an argument: 772 773 ```kotlin 774 launch(Dispatchers.Main) { 775 updateResults() 776 } 777 ``` 778 779 * If the main thread is busy when you start a new coroutine on it, 780 the coroutine becomes suspended and scheduled for execution on this thread. The coroutine will only resume when the 781 thread becomes free. 782 * It's considered good practice to use the dispatcher from the outer scope rather than explicitly specifying it on each 783 end-point. If you define `loadContributorsConcurrent()` without passing `Dispatchers.Default` as an 784 argument, you can call this function in any context: with a `Default` dispatcher, with 785 the main UI thread, or with a custom dispatcher. 786 * As you'll see later, when calling `loadContributorsConcurrent()` from tests, you can call it in the context 787 with `TestDispatcher`, which simplifies testing. That makes this solution much more flexible. 788 7892. To specify the dispatcher on the caller side, apply the following change to the project while 790 letting `loadContributorsConcurrent` start coroutines in the inherited context: 791 792 ```kotlin 793 launch(Dispatchers.Default) { 794 val users = loadContributorsConcurrent(service, req) 795 withContext(Dispatchers.Main) { 796 updateResults(users, startTime) 797 } 798 } 799 ``` 800 801 * `updateResults()` should be called on the main UI thread, so you call it with the context of `Dispatchers.Main`. 802 * `withContext()` calls the given code with the specified coroutine context, is suspended until it completes, and returns 803 the result. An alternative but more verbose way to express this would be to start a new coroutine and explicitly 804 wait (by suspending) until it completes: `launch(context) { ... }.join()`. 805 8063. Run the code and ensure that the coroutines are executed on the threads from the thread pool. 807 808## Structured concurrency 809 810* The _coroutine scope_ is responsible for the structure and parent-child relationships between different coroutines. New 811 coroutines usually need to be started inside a scope. 812* The _coroutine context_ stores additional technical information used to run a given coroutine, like the coroutine custom 813 name, or the dispatcher specifying the threads the coroutine should be scheduled on. 814 815When `launch`, `async`, or `runBlocking` are used to start a new coroutine, they automatically create the corresponding 816scope. All of these functions take a lambda with a receiver as an argument, and `CoroutineScope` is the implicit receiver type: 817 818```kotlin 819launch { /* this: CoroutineScope */ } 820``` 821 822* New coroutines can only be started inside a scope. 823* `launch` and `async` are declared as extensions to `CoroutineScope`, so an implicit or explicit receiver must always 824 be passed when you call them. 825* The coroutine started by `runBlocking` is the only exception because `runBlocking` is defined as a top-level function. 826 But because it blocks the current thread, it's intended primarily to be used in `main()` functions and tests as a bridge 827 function. 828 829A new coroutine inside `runBlocking`, `launch`, or `async` is started automatically inside the scope: 830 831```kotlin 832import kotlinx.coroutines.* 833 834fun main() = runBlocking { /* this: CoroutineScope */ 835 launch { /* ... */ } 836 // the same as: 837 this.launch { /* ... */ } 838} 839``` 840 841When you call `launch` inside `runBlocking`, it's called as an extension to the implicit receiver of 842the `CoroutineScope` type. Alternatively, you could explicitly write `this.launch`. 843 844The nested coroutine (started by `launch` in this example) can be considered as a child of the outer coroutine (started 845by `runBlocking`). This "parent-child" relationship works through scopes; the child coroutine is started from the scope 846corresponding to the parent coroutine. 847 848It's possible to create a new scope without starting a new coroutine, by using the `coroutineScope` function. 849To start new coroutines in a structured way inside a `suspend` function without access to the outer scope, you can create 850a new coroutine scope that automatically becomes a child of the outer scope that this `suspend` function is called from. 851`loadContributorsConcurrent()`is a good example. 852 853You can also start a new coroutine from the global scope using `GlobalScope.async` or `GlobalScope.launch`. 854This will create a top-level "independent" coroutine. 855 856The mechanism behind the structure of the coroutines is called _structured concurrency_. It provides the following 857benefits over global scopes: 858 859* The scope is generally responsible for child coroutines, whose lifetime is attached to the lifetime of the scope. 860* The scope can automatically cancel child coroutines if something goes wrong or a user changes their mind and decides 861 to revoke the operation. 862* The scope automatically waits for the completion of all child coroutines. 863 Therefore, if the scope corresponds to a coroutine, the parent coroutine does not complete until all the coroutines 864 launched in its scope have completed. 865 866When using `GlobalScope.async`, there is no structure that binds several coroutines to a smaller scope. 867Coroutines started from the global scope are all independent – their lifetime is limited only by the lifetime of the 868whole application. It's possible to store a reference to the coroutine started from the global scope and wait for its 869completion or cancel it explicitly, but that won't happen automatically as it would with structured concurrency. 870 871### Canceling the loading of contributors 872 873Create two versions of the function that loads the list of contributors. Compare how both versions behave when you try to 874cancel the parent coroutine. The first version will use `coroutineScope` to start all of the child coroutines, 875whereas the second will use `GlobalScope`. 876 8771. In `Request5Concurrent.kt`, add a 3-second delay to the `loadContributorsConcurrent()` function: 878 879 ```kotlin 880 suspend fun loadContributorsConcurrent( 881 service: GitHubService, 882 req: RequestData 883 ): List<User> = coroutineScope { 884 // ... 885 async { 886 log("starting loading for ${repo.name}") 887 delay(3000) 888 // load repo contributors 889 } 890 // ... 891 } 892 ``` 893 894 The delay affects all of the coroutines that send requests, so that there's enough time to cancel the loading 895 after the coroutines are started but before the requests are sent. 896 8972. Create the second version of the loading function: copy the implementation of `loadContributorsConcurrent()` to 898 `loadContributorsNotCancellable()` in `Request5NotCancellable.kt` and then remove the creation of a new `coroutineScope`. 8993. The `async` calls now fail to resolve, so start them by using `GlobalScope.async`: 900 901 ```kotlin 902 suspend fun loadContributorsNotCancellable( 903 service: GitHubService, 904 req: RequestData 905 ): List<User> { // #1 906 // ... 907 GlobalScope.async { // #2 908 log("starting loading for ${repo.name}") 909 // load repo contributors 910 } 911 // ... 912 return deferreds.awaitAll().flatten().aggregate() // #3 913 } 914 ``` 915 916 * The function now returns the result directly, not as the last expression inside the lambda (lines `#1` and `#3`). 917 * All of the "contributors" coroutines are started inside the `GlobalScope`, not as children of the coroutine scope 918 (line `#2`). 919 9204. Run the program and choose the _CONCURRENT_ option to load the contributors. 9215. Wait until all of the "contributors" coroutines are started, and then click _Cancel_. The log shows no new results, 922 which means that all of the requests were indeed canceled: 923 924 ```text 925 2896 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 40 repos 926 2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 927 ... 928 2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO Contributors - starting loading for mpp-example 929 /* click on 'cancel' */ 930 /* no requests are sent */ 931 ``` 932 9336. Repeat step 5, but this time choose the `NOT_CANCELLABLE` option: 934 935 ```text 936 2570 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 937 2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - starting loading for kotlin-koans 938 ... 939 2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO Contributors - starting loading for mpp-example 940 /* click on 'cancel' */ 941 /* but all the requests are still sent: */ 942 6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors 943 ... 944 9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO Contributors - mpp-example: loaded 8 contributors 945 ``` 946 947 In this case, no coroutines are canceled, and all the requests are still sent. 948 9497. Check how the cancellation is triggered in the "contributors" program. When the _Cancel_ button is clicked, 950 the main "loading" coroutine is explicitly canceled and the child coroutines are canceled automatically: 951 952 ```kotlin 953 interface Contributors { 954 955 fun loadContributors() { 956 // ... 957 when (getSelectedVariant()) { 958 CONCURRENT -> { 959 launch { 960 val users = loadContributorsConcurrent(service, req) 961 updateResults(users, startTime) 962 }.setUpCancellation() // #1 963 } 964 } 965 } 966 967 private fun Job.setUpCancellation() { 968 val loadingJob = this // #2 969 970 // cancel the loading job if the 'cancel' button was clicked: 971 val listener = ActionListener { 972 loadingJob.cancel() // #3 973 updateLoadingStatus(CANCELED) 974 } 975 // add a listener to the 'cancel' button: 976 addCancelListener(listener) 977 978 // update the status and remove the listener 979 // after the loading job is completed 980 } 981 } 982 ``` 983 984The `launch` function returns an instance of `Job`. `Job` stores a reference to the "loading coroutine", which loads 985all of the data and updates the results. You can call the `setUpCancellation()` extension function on it (line `#1`), 986passing an instance of `Job` as a receiver. 987 988Another way you could express this would be to explicitly write: 989 990```kotlin 991val job = launch { } 992job.setUpCancellation() 993``` 994 995* For readability, you could refer to the `setUpCancellation()` function receiver inside the function with the 996 new `loadingJob` variable (line `#2`). 997* Then you could add a listener to the _Cancel_ button so that when it's clicked, the `loadingJob` is canceled (line `#3`). 998 999With structured concurrency, you only need to cancel the parent coroutine and this automatically propagates cancellation 1000to all of the child coroutines. 1001 1002### Using the outer scope's context 1003 1004When you start new coroutines inside the given scope, it's much easier to ensure that all of them run with the same 1005context. It is also much easier to replace the context if needed. 1006 1007Now it's time to learn how using the dispatcher from the outer scope works. The new scope created by 1008the `coroutineScope` or by the coroutine builders always inherits the context from the outer scope. In this case, the 1009outer scope is the scope the `suspend loadContributorsConcurrent()` function was called from: 1010 1011```kotlin 1012launch(Dispatchers.Default) { // outer scope 1013 val users = loadContributorsConcurrent(service, req) 1014 // ... 1015} 1016``` 1017 1018All of the nested coroutines are automatically started with the inherited context. The dispatcher is a part of this 1019context. That's why all of the coroutines started by `async` are started with the context of the default dispatcher: 1020 1021```kotlin 1022suspend fun loadContributorsConcurrent( 1023 service: GitHubService, req: RequestData 1024): List<User> = coroutineScope { 1025 // this scope inherits the context from the outer scope 1026 // ... 1027 async { // nested coroutine started with the inherited context 1028 // ... 1029 } 1030 // ... 1031} 1032``` 1033 1034With structured concurrency, you can specify the major context elements (like dispatcher) once, when creating the 1035top-level coroutine. All the nested coroutines then inherit the context and modify it only if needed. 1036 1037> When you write code with coroutines for UI applications, for example Android ones, it's a common practice to 1038> use `CoroutineDispatchers.Main` by default for the top coroutine and then to explicitly put a different dispatcher when 1039> you need to run the code on a different thread. 1040> 1041{type="tip"} 1042 1043## Showing progress 1044 1045Despite the information for some repositories being loaded rather quickly, the user only sees the resulting list after all of 1046the data has been loaded. Until then, the loader icon runs showing the progress, but there's no information about the current 1047state or what contributors are already loaded. 1048 1049You can show the intermediate results earlier and display all of the contributors after loading the data for each of the 1050repositories: 1051 1052{width=500} 1053 1054To implement this functionality, in the `src/tasks/Request6Progress.kt`, you'll need to pass the logic updating the UI 1055as a callback, so that it's called on each intermediate state: 1056 1057```kotlin 1058suspend fun loadContributorsProgress( 1059 service: GitHubService, 1060 req: RequestData, 1061 updateResults: suspend (List<User>, completed: Boolean) -> Unit 1062) { 1063 // loading the data 1064 // calling `updateResults()` on intermediate states 1065} 1066``` 1067 1068On the call site in `Contributors.kt`, the callback is passed to update the results from the `Main` thread for 1069the _PROGRESS_ option: 1070 1071```kotlin 1072launch(Dispatchers.Default) { 1073 loadContributorsProgress(service, req) { users, completed -> 1074 withContext(Dispatchers.Main) { 1075 updateResults(users, startTime, completed) 1076 } 1077 } 1078} 1079``` 1080 1081* The `updateResults()` parameter is declared as `suspend` in `loadContributorsProgress()`. It's necessary to call 1082 `withContext`, which is a `suspend` function inside the corresponding lambda argument. 1083* `updateResults()` callback takes an additional Boolean parameter as an argument specifying whether the loading has 1084 completed and the results are final. 1085 1086### Task 6 1087 1088In the `Request6Progress.kt` file, implement the `loadContributorsProgress()` function that shows the intermediate 1089progress. Base it on the `loadContributorsSuspend()` function from `Request4Suspend.kt`. 1090 1091* Use a simple version without concurrency; you'll add it later in the next section. 1092* The intermediate list of contributors should be shown in an "aggregated" state, not just the list of users loaded for 1093 each repository. 1094* The total number of contributions for each user should be increased when the data for each new 1095 repository is loaded. 1096 1097#### Solution for task 6 {initial-collapse-state="collapsed"} 1098 1099To store the intermediate list of loaded contributors in the "aggregated" state, define an `allUsers` variable which 1100stores the list of users, and then update it after contributors for each new repository are loaded: 1101 1102```kotlin 1103suspend fun loadContributorsProgress( 1104 service: GitHubService, 1105 req: RequestData, 1106 updateResults: suspend (List<User>, completed: Boolean) -> Unit 1107) { 1108 val repos = service 1109 .getOrgRepos(req.org) 1110 .also { logRepos(req, it) } 1111 .bodyList() 1112 1113 var allUsers = emptyList<User>() 1114 for ((index, repo) in repos.withIndex()) { 1115 val users = service.getRepoContributors(req.org, repo.name) 1116 .also { logUsers(repo, it) } 1117 .bodyList() 1118 1119 allUsers = (allUsers + users).aggregate() 1120 updateResults(allUsers, index == repos.lastIndex) 1121 } 1122} 1123``` 1124 1125#### Consecutive vs concurrent 1126 1127An `updateResults()` callback is called after each request is completed: 1128 1129{width=700} 1130 1131This code doesn't include concurrency. It's sequential, so you don't need synchronization. 1132 1133The best option would be to send requests concurrently and update the intermediate results after getting the response 1134for each repository: 1135 1136{width=700} 1137 1138To add concurrency, use _channels_. 1139 1140## Channels 1141 1142Writing code with a shared mutable state is quite difficult and error-prone (like in the solution using callbacks). 1143A simpler way is to share information by communication rather than by using a common mutable state. 1144Coroutines can communicate with each other through _channels_. 1145 1146Channels are communication primitives that allow data to be passed between coroutines. One coroutine can _send_ 1147some information to a channel, while another can _receive_ that information from it: 1148 1149 1150 1151A coroutine that sends (produces) information is often called a producer, and a coroutine that receives (consumes) 1152information is called a consumer. One or multiple coroutines can send information to the same channel, and one or multiple 1153coroutines can receive data from it: 1154 1155 1156 1157When many coroutines receive information from the same channel, each element is handled only once by one of the 1158consumers. Once an element is handled, it is immediately removed from the channel. 1159 1160You can think of a channel as similar to a collection of elements, or more precisely, a queue, in which elements are added 1161to one end and received from the other. However, there's an important difference: unlike collections, even in their 1162synchronized versions, a channel can _suspend_ `send()`and `receive()` operations. This happens when the channel is empty 1163or full. The channel can be full if the channel size has an upper bound. 1164 1165`Channel` is represented by three different interfaces: `SendChannel`, `ReceiveChannel`, and `Channel`, with the latter 1166extending the first two. You usually create a channel and give it to producers as a `SendChannel` instance so that only 1167they can send information to the channel. 1168You give a channel to consumers as a `ReceiveChannel` instance so that only they can receive from it. Both `send` 1169and `receive` methods are declared as `suspend`: 1170 1171```kotlin 1172interface SendChannel<in E> { 1173 suspend fun send(element: E) 1174 fun close(): Boolean 1175} 1176 1177interface ReceiveChannel<out E> { 1178 suspend fun receive(): E 1179} 1180 1181interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 1182``` 1183 1184The producer can close a channel to indicate that no more elements are coming. 1185 1186Several types of channels are defined in the library. They differ in how many elements they can internally store and 1187whether the `send()` call can be suspended or not. 1188For all of the channel types, the `receive()` call behaves similarly: it receives an element if the channel is not empty; 1189otherwise, it is suspended. 1190 1191<deflist collapsible="true"> 1192 <def title="Unlimited channel"> 1193 <p>An unlimited channel is the closest analog to a queue: producers can send elements to this channel and it will 1194keep growing indefinitely. The <code>send()</code> call will never be suspended. 1195If the program runs out of memory, you'll get an <code>OutOfMemoryException</code>. 1196The difference between an unlimited channel and a queue is that when a consumer tries to receive from an empty channel, 1197it becomes suspended until some new elements are sent.</p> 1198 <img src="unlimited-channel.png" alt="Unlimited channel" width="500"/> 1199 </def> 1200 <def title="Buffered channel"> 1201 <p>The size of a buffered channel is constrained by the specified number. 1202Producers can send elements to this channel until the size limit is reached. All of the elements are internally stored. 1203When the channel is full, the next `send` call on it is suspended until more free space becomes available.</p> 1204 <img src="buffered-channel.png" alt="Buffered channel" width="500"/> 1205 </def> 1206 <def title="Rendezvous channel"> 1207 <p>The "Rendezvous" channel is a channel without a buffer, the same as a buffered channel with zero size. 1208One of the functions (<code>send()</code> or <code>receive()</code>) is always suspended until the other is called. </p> 1209 <p>If the <code>send()</code> function is called and there's no suspended <code>receive</code> call ready to process the element, then <code>send()</code> 1210is suspended. Similarly, if the <code>receive</code> function is called and the channel is empty or, in other words, there's no 1211suspended <code>send()</code> call ready to send the element, the <code>receive()</code> call is suspended. </p> 1212 <p>The "rendezvous" name ("a meeting at an agreed time and place") refers to the fact that <code>send()</code> and <code>receive()</code> 1213should "meet on time".</p> 1214 <img src="rendezvous-channel.png" alt="Rendezvous channel" width="500"/> 1215 </def> 1216 <def title="Conflated channel"> 1217 <p>A new element sent to the conflated channel will overwrite the previously sent element, so the receiver will always 1218get only the latest element. The <code>send()</code> call is never suspended.</p> 1219 <img src="conflated-channel.gif" alt="Conflated channel" width="500"/> 1220 </def> 1221</deflist> 1222 1223When you create a channel, specify its type or the buffer size (if you need a buffered one): 1224 1225```kotlin 1226val rendezvousChannel = Channel<String>() 1227val bufferedChannel = Channel<String>(10) 1228val conflatedChannel = Channel<String>(CONFLATED) 1229val unlimitedChannel = Channel<String>(UNLIMITED) 1230``` 1231 1232By default, a "Rendezvous" channel is created. 1233 1234In the following task, you'll create a "Rendezvous" channel, two producer coroutines, and a consumer coroutine: 1235 1236```kotlin 1237import kotlinx.coroutines.channels.Channel 1238import kotlinx.coroutines.* 1239 1240fun main() = runBlocking<Unit> { 1241 val channel = Channel<String>() 1242 launch { 1243 channel.send("A1") 1244 channel.send("A2") 1245 log("A done") 1246 } 1247 launch { 1248 channel.send("B1") 1249 log("B done") 1250 } 1251 launch { 1252 repeat(3) { 1253 val x = channel.receive() 1254 log(x) 1255 } 1256 } 1257} 1258 1259fun log(message: Any?) { 1260 println("[${Thread.currentThread().name}] $message") 1261} 1262``` 1263 1264> Watch [this video](https://www.youtube.com/watch?v=HpWQUoVURWQ) for a better understanding of channels. 1265> 1266{type="tip"} 1267 1268### Task 7 1269 1270In `src/tasks/Request7Channels.kt`, implement the function `loadContributorsChannels()` that requests all of the GitHub 1271contributors concurrently and shows intermediate progress at the same time. 1272 1273Use the previous functions, `loadContributorsConcurrent()` from `Request5Concurrent.kt` 1274and `loadContributorsProgress()` from `Request6Progress.kt`. 1275 1276#### Tip for task 7 {initial-collapse-state="collapsed"} 1277 1278Different coroutines that concurrently receive contributor lists for different repositories can send all of the received 1279results to the same channel: 1280 1281```kotlin 1282val channel = Channel<List<User>>() 1283for (repo in repos) { 1284 launch { 1285 val users = TODO() 1286 // ... 1287 channel.send(users) 1288 } 1289} 1290``` 1291 1292Then the elements from this channel can be received one by one and processed: 1293 1294```kotlin 1295repeat(repos.size) { 1296 val users = channel.receive() 1297 // ... 1298} 1299``` 1300 1301Since the `receive()` calls are sequential, no additional synchronization is needed. 1302 1303#### Solution for task 7 {initial-collapse-state="collapsed"} 1304 1305As with the `loadContributorsProgress()` function, you can create an `allUsers` variable to store the intermediate 1306states of the "all contributors" list. 1307Each new list received from the channel is added to the list of all users. You aggregate the result and update the state 1308using the `updateResults` callback: 1309 1310```kotlin 1311suspend fun loadContributorsChannels( 1312 service: GitHubService, 1313 req: RequestData, 1314 updateResults: suspend (List<User>, completed: Boolean) -> Unit 1315) = coroutineScope { 1316 1317 val repos = service 1318 .getOrgRepos(req.org) 1319 .also { logRepos(req, it) } 1320 .bodyList() 1321 1322 val channel = Channel<List<User>>() 1323 for (repo in repos) { 1324 launch { 1325 val users = service.getRepoContributors(req.org, repo.name) 1326 .also { logUsers(repo, it) } 1327 .bodyList() 1328 channel.send(users) 1329 } 1330 } 1331 var allUsers = emptyList<User>() 1332 repeat(repos.size) { 1333 val users = channel.receive() 1334 allUsers = (allUsers + users).aggregate() 1335 updateResults(allUsers, it == repos.lastIndex) 1336 } 1337} 1338``` 1339 1340* Results for different repositories are added to the channel as soon as they are ready. At first, when all of the requests 1341 are sent, and no data is received, the `receive()` call is suspended. In this case, the whole "load contributors" coroutine 1342 is suspended. 1343* Then, when the list of users is sent to the channel, the "load contributors" coroutine resumes, the `receive()` call 1344 returns this list, and the results are immediately updated. 1345 1346You can now run the program and choose the _CHANNELS_ option to load the contributors and see the result. 1347 1348Although neither coroutines nor channels completely remove the complexity that comes with concurrency, 1349they make life easier when you need to understand what's going on. 1350 1351## Testing coroutines 1352 1353Let's now test all solutions to check that the solution with concurrent coroutines is faster than the solution with 1354the `suspend` functions, and check that the solution with channels is faster than the simple "progress" one. 1355 1356In the following task, you'll compare the total running time of the solutions. You'll mock a GitHub service and make 1357this service return results after the given timeouts: 1358 1359```text 1360repos request - returns an answer within 1000 ms delay 1361repo-1 - 1000 ms delay 1362repo-2 - 1200 ms delay 1363repo-3 - 800 ms delay 1364``` 1365 1366The sequential solution with the `suspend` functions should take around 4000 ms (4000 = 1000 + (1000 + 1200 + 800)). 1367The concurrent solution should take around 2200 ms (2200 = 1000 + max(1000, 1200, 800)). 1368 1369For the solutions that show progress, you can also check the intermediate results with timestamps. 1370 1371The corresponding test data is defined in `test/contributors/testData.kt`, and the files `Request4SuspendKtTest`, 1372`Request7ChannelsKtTest`, and so on contain the straightforward tests that use mock service calls. 1373 1374However, there are two problems here: 1375 1376* These tests take too long to run. Each test takes around 2 to 4 seconds, and you need to wait for the results each 1377 time. It's not very efficient. 1378* You can't rely on the exact time the solution runs because it still takes additional time to prepare and run the code. 1379 You could add a constant, but then the time would differ from machine to machine. The mock service delays 1380 should be higher than this constant so you can see a difference. If the constant is 0.5 sec, making the delays 1381 0.1 sec won't be enough. 1382 1383A better way would be to use special frameworks to test the timing while running the same code several times (which increases 1384the total time even more), but that is complicated to learn and set up. 1385 1386To solve these problems and make sure that solutions with provided test delays behave as expected, one faster than the other, 1387use _virtual_ time with a special test dispatcher. This dispatcher keeps track of the virtual time passed from 1388the start and runs everything immediately in real time. When you run coroutines on this dispatcher, 1389the `delay` will return immediately and advance the virtual time. 1390 1391Tests that use this mechanism run fast, but you can still check what happens at different moments in virtual time. The 1392total running time drastically decreases: 1393 1394{width=700} 1395 1396To use virtual time, replace the `runBlocking` invocation with a `runTest`. `runTest` takes an 1397extension lambda to `TestScope` as an argument. 1398When you call `delay` in a `suspend` function inside this special scope, `delay` will increase the virtual time instead 1399of delaying in real time: 1400 1401```kotlin 1402@Test 1403fun testDelayInSuspend() = runTest { 1404 val realStartTime = System.currentTimeMillis() 1405 val virtualStartTime = currentTime 1406 1407 foo() 1408 println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms 1409 println("${currentTime - virtualStartTime} ms") // 1000 ms 1410} 1411 1412suspend fun foo() { 1413 delay(1000) // auto-advances without delay 1414 println("foo") // executes eagerly when foo() is called 1415} 1416``` 1417 1418You can check the current virtual time using the `currentTime` property of `TestScope`. 1419 1420The actual running time in this example is several milliseconds, whereas virtual time equals the delay argument, which 1421is 1000 milliseconds. 1422 1423To get the full effect of "virtual" `delay` in child coroutines, 1424start all of the child coroutines with `TestDispatcher`. Otherwise, it won't work. This dispatcher is 1425automatically inherited from the other `TestScope`, unless you provide a different dispatcher: 1426 1427```kotlin 1428@Test 1429fun testDelayInLaunch() = runTest { 1430 val realStartTime = System.currentTimeMillis() 1431 val virtualStartTime = currentTime 1432 1433 bar() 1434 1435 println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms 1436 println("${currentTime - virtualStartTime} ms") // 1000 ms 1437} 1438 1439suspend fun bar() = coroutineScope { 1440 launch { 1441 delay(1000) // auto-advances without delay 1442 println("bar") // executes eagerly when bar() is called 1443 } 1444} 1445``` 1446 1447If `launch` is called with the context of `Dispatchers.Default` in the example above, the test will fail. You'll get an 1448exception saying that the job has not been completed yet. 1449 1450You can test the `loadContributorsConcurrent()` function this way only if it starts the child coroutines with the 1451inherited context, without modifying it using the `Dispatchers.Default` dispatcher. 1452 1453You can specify the context elements like the dispatcher when _calling_ a function rather than when _defining_ it, 1454which allows for more flexibility and easier testing. 1455 1456> The testing API that supports virtual time is [Experimental](components-stability.md) and may change in the future. 1457> 1458{type="warning"} 1459 1460By default, the compiler shows warnings if you use the experimental testing API. To suppress these warnings, annotate 1461the test function or the whole class containing the tests with `@OptIn(ExperimentalCoroutinesApi::class)`. 1462Add the compiler argument instructing the compiler that you're using the experimental API: 1463 1464```kotlin 1465compileTestKotlin { 1466 kotlinOptions { 1467 freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental" 1468 } 1469} 1470``` 1471 1472In the project corresponding to this tutorial, the compiler argument has already been added to the Gradle script. 1473 1474### Task 8 1475 1476Refactor the following tests in `tests/tasks/` to use virtual time instead of real time: 1477 1478* Request4SuspendKtTest.kt 1479* Request5ConcurrentKtTest.kt 1480* Request6ProgressKtTest.kt 1481* Request7ChannelsKtTest.kt 1482 1483Compare the total running times before and after applying your refactoring. 1484 1485#### Tip for task 8 {initial-collapse-state="collapsed"} 1486 14871. Replace the `runBlocking` invocation with `runTest`, and replace `System.currentTimeMillis()` with `currentTime`: 1488 1489 ```kotlin 1490 @Test 1491 fun test() = runTest { 1492 val startTime = currentTime 1493 // action 1494 val totalTime = currentTime - startTime 1495 // testing result 1496 } 1497 ``` 1498 14992. Uncomment the assertions that check the exact virtual time. 15003. Don't forget to add `@UseExperimental(ExperimentalCoroutinesApi::class)`. 1501 1502#### Solution for task 8 {initial-collapse-state="collapsed"} 1503 1504Here are the solutions for the concurrent and channels cases: 1505 1506```kotlin 1507fun testConcurrent() = runTest { 1508 val startTime = currentTime 1509 val result = loadContributorsConcurrent(MockGithubService, testRequestData) 1510 Assert.assertEquals("Wrong result for 'loadContributorsConcurrent'", expectedConcurrentResults.users, result) 1511 val totalTime = currentTime - startTime 1512 1513 Assert.assertEquals( 1514 "The calls run concurrently, so the total virtual time should be 2200 ms: " + 1515 "1000 for repos request plus max(1000, 1200, 800) = 1200 for concurrent contributors requests)", 1516 expectedConcurrentResults.timeFromStart, totalTime 1517 ) 1518} 1519``` 1520 1521First, check that the results are available exactly at the expected virtual time, and then check the results 1522themselves: 1523 1524```kotlin 1525fun testChannels() = runTest { 1526 val startTime = currentTime 1527 var index = 0 1528 loadContributorsChannels(MockGithubService, testRequestData) { users, _ -> 1529 val expected = concurrentProgressResults[index++] 1530 val time = currentTime - startTime 1531 Assert.assertEquals( 1532 "Expected intermediate results after ${expected.timeFromStart} ms:", 1533 expected.timeFromStart, time 1534 ) 1535 Assert.assertEquals("Wrong intermediate results after $time:", expected.users, users) 1536 } 1537} 1538``` 1539 1540The first intermediate result for the last version with channels becomes available sooner than the progress version, and you 1541can see the difference in tests that use virtual time. 1542 1543> The tests for the remaining "suspend" and "progress" tasks are very similar – you can find them in the project's 1544> `solutions` branch. 1545> 1546{type="tip"} 1547 1548## What's next 1549 1550* Check out the [Asynchronous Programming with Kotlin](https://kotlinconf.com/workshops/) workshop at KotlinConf. 1551* Find out more about using [virtual time and the experimental testing package](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-test/). 1552