1<!--- TEST_NAME SharedStateGuideTest --> 2 3**Table of contents** 4 5<!--- TOC --> 6 7* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency) 8 * [The problem](#the-problem) 9 * [Volatiles are of no help](#volatiles-are-of-no-help) 10 * [Thread-safe data structures](#thread-safe-data-structures) 11 * [Thread confinement fine-grained](#thread-confinement-fine-grained) 12 * [Thread confinement coarse-grained](#thread-confinement-coarse-grained) 13 * [Mutual exclusion](#mutual-exclusion) 14 * [Actors](#actors) 15 16<!--- END --> 17 18## Shared mutable state and concurrency 19 20Coroutines can be executed concurrently using a multi-threaded dispatcher like the [Dispatchers.Default]. It presents 21all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**. 22Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world, 23but others are unique. 24 25### The problem 26 27Let us launch a hundred coroutines all doing the same action a thousand times. 28We'll also measure their completion time for further comparisons: 29 30<div class="sample" markdown="1" theme="idea" data-highlight-only> 31 32```kotlin 33suspend fun massiveRun(action: suspend () -> Unit) { 34 val n = 100 // number of coroutines to launch 35 val k = 1000 // times an action is repeated by each coroutine 36 val time = measureTimeMillis { 37 coroutineScope { // scope for coroutines 38 repeat(n) { 39 launch { 40 repeat(k) { action() } 41 } 42 } 43 } 44 } 45 println("Completed ${n * k} actions in $time ms") 46} 47``` 48 49</div> 50 51We start with a very simple action that increments a shared mutable variable using 52multi-threaded [Dispatchers.Default]. 53 54<!--- CLEAR --> 55 56<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 57 58```kotlin 59import kotlinx.coroutines.* 60import kotlin.system.* 61 62suspend fun massiveRun(action: suspend () -> Unit) { 63 val n = 100 // number of coroutines to launch 64 val k = 1000 // times an action is repeated by each coroutine 65 val time = measureTimeMillis { 66 coroutineScope { // scope for coroutines 67 repeat(n) { 68 launch { 69 repeat(k) { action() } 70 } 71 } 72 } 73 } 74 println("Completed ${n * k} actions in $time ms") 75} 76 77//sampleStart 78var counter = 0 79 80fun main() = runBlocking { 81 withContext(Dispatchers.Default) { 82 massiveRun { 83 counter++ 84 } 85 } 86 println("Counter = $counter") 87} 88//sampleEnd 89``` 90 91</div> 92 93> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt). 94 95<!--- TEST LINES_START 96Completed 100000 actions in 97Counter = 98--> 99 100What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a hundred coroutines 101increment the `counter` concurrently from multiple threads without any synchronization. 102 103### Volatiles are of no help 104 105There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it: 106 107<!--- CLEAR --> 108 109<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 110 111```kotlin 112import kotlinx.coroutines.* 113import kotlin.system.* 114 115suspend fun massiveRun(action: suspend () -> Unit) { 116 val n = 100 // number of coroutines to launch 117 val k = 1000 // times an action is repeated by each coroutine 118 val time = measureTimeMillis { 119 coroutineScope { // scope for coroutines 120 repeat(n) { 121 launch { 122 repeat(k) { action() } 123 } 124 } 125 } 126 } 127 println("Completed ${n * k} actions in $time ms") 128} 129 130//sampleStart 131@Volatile // in Kotlin `volatile` is an annotation 132var counter = 0 133 134fun main() = runBlocking { 135 withContext(Dispatchers.Default) { 136 massiveRun { 137 counter++ 138 } 139 } 140 println("Counter = $counter") 141} 142//sampleEnd 143``` 144 145</div> 146 147> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt). 148 149<!--- TEST LINES_START 150Completed 100000 actions in 151Counter = 152--> 153 154This code works slower, but we still don't get "Counter = 100000" at the end, because volatile variables guarantee 155linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but 156do not provide atomicity of larger actions (increment in our case). 157 158### Thread-safe data structures 159 160The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized, 161linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding 162operations that needs to be performed on a shared state. 163In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations: 164 165<!--- CLEAR --> 166 167<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 168 169```kotlin 170import kotlinx.coroutines.* 171import java.util.concurrent.atomic.* 172import kotlin.system.* 173 174suspend fun massiveRun(action: suspend () -> Unit) { 175 val n = 100 // number of coroutines to launch 176 val k = 1000 // times an action is repeated by each coroutine 177 val time = measureTimeMillis { 178 coroutineScope { // scope for coroutines 179 repeat(n) { 180 launch { 181 repeat(k) { action() } 182 } 183 } 184 } 185 } 186 println("Completed ${n * k} actions in $time ms") 187} 188 189//sampleStart 190val counter = AtomicInteger() 191 192fun main() = runBlocking { 193 withContext(Dispatchers.Default) { 194 massiveRun { 195 counter.incrementAndGet() 196 } 197 } 198 println("Counter = $counter") 199} 200//sampleEnd 201``` 202 203</div> 204 205> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt). 206 207<!--- TEST ARBITRARY_TIME 208Completed 100000 actions in xxx ms 209Counter = 100000 210--> 211 212This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other 213standard data structures and basic operations on them. However, it does not easily scale to complex 214state or to complex operations that do not have ready-to-use thread-safe implementations. 215 216### Thread confinement fine-grained 217 218_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared 219state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to 220the single event-dispatch/application thread. It is easy to apply with coroutines by using a 221single-threaded context. 222 223<!--- CLEAR --> 224 225<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 226 227```kotlin 228import kotlinx.coroutines.* 229import kotlin.system.* 230 231suspend fun massiveRun(action: suspend () -> Unit) { 232 val n = 100 // number of coroutines to launch 233 val k = 1000 // times an action is repeated by each coroutine 234 val time = measureTimeMillis { 235 coroutineScope { // scope for coroutines 236 repeat(n) { 237 launch { 238 repeat(k) { action() } 239 } 240 } 241 } 242 } 243 println("Completed ${n * k} actions in $time ms") 244} 245 246//sampleStart 247val counterContext = newSingleThreadContext("CounterContext") 248var counter = 0 249 250fun main() = runBlocking { 251 withContext(Dispatchers.Default) { 252 massiveRun { 253 // confine each increment to a single-threaded context 254 withContext(counterContext) { 255 counter++ 256 } 257 } 258 } 259 println("Counter = $counter") 260} 261//sampleEnd 262``` 263 264</div> 265 266> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt). 267 268<!--- TEST ARBITRARY_TIME 269Completed 100000 actions in xxx ms 270Counter = 100000 271--> 272 273This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches 274from multi-threaded [Dispatchers.Default] context to the single-threaded context using 275[withContext(counterContext)][withContext] block. 276 277### Thread confinement coarse-grained 278 279In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic 280are confined to the single thread. The following example does it like that, running each coroutine in 281the single-threaded context to start with. 282 283<!--- CLEAR --> 284 285<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 286 287```kotlin 288import kotlinx.coroutines.* 289import kotlin.system.* 290 291suspend fun massiveRun(action: suspend () -> Unit) { 292 val n = 100 // number of coroutines to launch 293 val k = 1000 // times an action is repeated by each coroutine 294 val time = measureTimeMillis { 295 coroutineScope { // scope for coroutines 296 repeat(n) { 297 launch { 298 repeat(k) { action() } 299 } 300 } 301 } 302 } 303 println("Completed ${n * k} actions in $time ms") 304} 305 306//sampleStart 307val counterContext = newSingleThreadContext("CounterContext") 308var counter = 0 309 310fun main() = runBlocking { 311 // confine everything to a single-threaded context 312 withContext(counterContext) { 313 massiveRun { 314 counter++ 315 } 316 } 317 println("Counter = $counter") 318} 319//sampleEnd 320``` 321 322</div> 323 324> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt). 325 326<!--- TEST ARBITRARY_TIME 327Completed 100000 actions in xxx ms 328Counter = 100000 329--> 330 331This now works much faster and produces correct result. 332 333### Mutual exclusion 334 335Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_ 336that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that. 337Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to 338delimit a critical section. The key difference is that `Mutex.lock()` is a suspending function. It does not block a thread. 339 340There is also [withLock] extension function that conveniently represents 341`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern: 342 343<!--- CLEAR --> 344 345<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 346 347```kotlin 348import kotlinx.coroutines.* 349import kotlinx.coroutines.sync.* 350import kotlin.system.* 351 352suspend fun massiveRun(action: suspend () -> Unit) { 353 val n = 100 // number of coroutines to launch 354 val k = 1000 // times an action is repeated by each coroutine 355 val time = measureTimeMillis { 356 coroutineScope { // scope for coroutines 357 repeat(n) { 358 launch { 359 repeat(k) { action() } 360 } 361 } 362 } 363 } 364 println("Completed ${n * k} actions in $time ms") 365} 366 367//sampleStart 368val mutex = Mutex() 369var counter = 0 370 371fun main() = runBlocking { 372 withContext(Dispatchers.Default) { 373 massiveRun { 374 // protect each increment with lock 375 mutex.withLock { 376 counter++ 377 } 378 } 379 } 380 println("Counter = $counter") 381} 382//sampleEnd 383``` 384 385</div> 386 387> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt). 388 389<!--- TEST ARBITRARY_TIME 390Completed 100000 actions in xxx ms 391Counter = 100000 392--> 393 394The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations 395where you absolutely must modify some shared state periodically, but there is no natural thread that this state 396is confined to. 397 398### Actors 399 400An [actor](https://en.wikipedia.org/wiki/Actor_model) is an entity made up of a combination of a coroutine, 401the state that is confined and encapsulated into this coroutine, 402and a channel to communicate with other coroutines. A simple actor can be written as a function, 403but an actor with a complex state is better suited for a class. 404 405There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its 406scope to receive messages from and combines the send channel into the resulting job object, so that a 407single reference to the actor can be carried around as its handle. 408 409The first step of using an actor is to define a class of messages that an actor is going to process. 410Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose. 411We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message 412to get its value. The later needs to send a response. A [CompletableDeferred] communication 413primitive, that represents a single value that will be known (communicated) in the future, 414is used here for that purpose. 415 416<div class="sample" markdown="1" theme="idea" data-highlight-only> 417 418```kotlin 419// Message types for counterActor 420sealed class CounterMsg 421object IncCounter : CounterMsg() // one-way message to increment counter 422class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply 423``` 424 425</div> 426 427Then we define a function that launches an actor using an [actor] coroutine builder: 428 429<div class="sample" markdown="1" theme="idea" data-highlight-only> 430 431```kotlin 432// This function launches a new counter actor 433fun CoroutineScope.counterActor() = actor<CounterMsg> { 434 var counter = 0 // actor state 435 for (msg in channel) { // iterate over incoming messages 436 when (msg) { 437 is IncCounter -> counter++ 438 is GetCounter -> msg.response.complete(counter) 439 } 440 } 441} 442``` 443 444</div> 445 446The main code is straightforward: 447 448<!--- CLEAR --> 449 450<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3"> 451 452```kotlin 453import kotlinx.coroutines.* 454import kotlinx.coroutines.channels.* 455import kotlin.system.* 456 457suspend fun massiveRun(action: suspend () -> Unit) { 458 val n = 100 // number of coroutines to launch 459 val k = 1000 // times an action is repeated by each coroutine 460 val time = measureTimeMillis { 461 coroutineScope { // scope for coroutines 462 repeat(n) { 463 launch { 464 repeat(k) { action() } 465 } 466 } 467 } 468 } 469 println("Completed ${n * k} actions in $time ms") 470} 471 472// Message types for counterActor 473sealed class CounterMsg 474object IncCounter : CounterMsg() // one-way message to increment counter 475class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply 476 477// This function launches a new counter actor 478fun CoroutineScope.counterActor() = actor<CounterMsg> { 479 var counter = 0 // actor state 480 for (msg in channel) { // iterate over incoming messages 481 when (msg) { 482 is IncCounter -> counter++ 483 is GetCounter -> msg.response.complete(counter) 484 } 485 } 486} 487 488//sampleStart 489fun main() = runBlocking<Unit> { 490 val counter = counterActor() // create the actor 491 withContext(Dispatchers.Default) { 492 massiveRun { 493 counter.send(IncCounter) 494 } 495 } 496 // send a message to get a counter value from an actor 497 val response = CompletableDeferred<Int>() 498 counter.send(GetCounter(response)) 499 println("Counter = ${response.await()}") 500 counter.close() // shutdown the actor 501} 502//sampleEnd 503``` 504 505</div> 506 507> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-07.kt). 508 509<!--- TEST ARBITRARY_TIME 510Completed 100000 actions in xxx ms 511Counter = 100000 512--> 513 514It does not matter (for correctness) what context the actor itself is executed in. An actor is 515a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine 516works as a solution to the problem of shared mutable state. Indeed, actors may modify their own private state, 517but can only affect each other through messages (avoiding the need for any locks). 518 519Actor is more efficient than locking under load, because in this case it always has work to do and it does not 520have to switch to a different context at all. 521 522> Note that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated 523 with the channel that it receives messages from, while a producer is associated with the channel that it 524 sends elements to. 525 526<!--- MODULE kotlinx-coroutines-core --> 527<!--- INDEX kotlinx.coroutines --> 528[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 529[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html 530[CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html 531<!--- INDEX kotlinx.coroutines.sync --> 532[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html 533[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html 534[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html 535[withLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html 536<!--- INDEX kotlinx.coroutines.channels --> 537[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html 538[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html 539<!--- END --> 540