1<!--- TEST_NAME SharedStateGuideTest --> 2 3[//]: # (title: Shared mutable state and concurrency) 4 5Coroutines can be executed parallelly using a multi-threaded dispatcher like the [Dispatchers.Default]. It presents 6all the usual parallelism problems. The main problem being synchronization of access to **shared mutable state**. 7Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world, 8but others are unique. 9 10## The problem 11 12Let us launch a hundred coroutines all doing the same action a thousand times. 13We'll also measure their completion time for further comparisons: 14 15```kotlin 16suspend fun massiveRun(action: suspend () -> Unit) { 17 val n = 100 // number of coroutines to launch 18 val k = 1000 // times an action is repeated by each coroutine 19 val time = measureTimeMillis { 20 coroutineScope { // scope for coroutines 21 repeat(n) { 22 launch { 23 repeat(k) { action() } 24 } 25 } 26 } 27 } 28 println("Completed ${n * k} actions in $time ms") 29} 30``` 31 32We start with a very simple action that increments a shared mutable variable using 33multi-threaded [Dispatchers.Default]. 34 35<!--- CLEAR --> 36 37```kotlin 38import kotlinx.coroutines.* 39import kotlin.system.* 40 41suspend fun massiveRun(action: suspend () -> Unit) { 42 val n = 100 // number of coroutines to launch 43 val k = 1000 // times an action is repeated by each coroutine 44 val time = measureTimeMillis { 45 coroutineScope { // scope for coroutines 46 repeat(n) { 47 launch { 48 repeat(k) { action() } 49 } 50 } 51 } 52 } 53 println("Completed ${n * k} actions in $time ms") 54} 55 56//sampleStart 57var counter = 0 58 59fun main() = runBlocking { 60 withContext(Dispatchers.Default) { 61 massiveRun { 62 counter++ 63 } 64 } 65 println("Counter = $counter") 66} 67//sampleEnd 68``` 69{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 70 71> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt). 72> 73{type="note"} 74 75<!--- TEST LINES_START 76Completed 100000 actions in 77Counter = 78--> 79 80What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a hundred coroutines 81increment the `counter` concurrently from multiple threads without any synchronization. 82 83## Volatiles are of no help 84 85There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it: 86 87<!--- CLEAR --> 88 89```kotlin 90import kotlinx.coroutines.* 91import kotlin.system.* 92 93suspend fun massiveRun(action: suspend () -> Unit) { 94 val n = 100 // number of coroutines to launch 95 val k = 1000 // times an action is repeated by each coroutine 96 val time = measureTimeMillis { 97 coroutineScope { // scope for coroutines 98 repeat(n) { 99 launch { 100 repeat(k) { action() } 101 } 102 } 103 } 104 } 105 println("Completed ${n * k} actions in $time ms") 106} 107 108//sampleStart 109@Volatile // in Kotlin `volatile` is an annotation 110var counter = 0 111 112fun main() = runBlocking { 113 withContext(Dispatchers.Default) { 114 massiveRun { 115 counter++ 116 } 117 } 118 println("Counter = $counter") 119} 120//sampleEnd 121``` 122{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 123 124> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt). 125> 126{type="note"} 127 128<!--- TEST LINES_START 129Completed 100000 actions in 130Counter = 131--> 132 133This code works slower, but we still don't get "Counter = 100000" at the end, because volatile variables guarantee 134linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but 135do not provide atomicity of larger actions (increment in our case). 136 137## Thread-safe data structures 138 139The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized, 140linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding 141operations that needs to be performed on a shared state. 142In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations: 143 144<!--- CLEAR --> 145 146```kotlin 147import kotlinx.coroutines.* 148import java.util.concurrent.atomic.* 149import kotlin.system.* 150 151suspend fun massiveRun(action: suspend () -> Unit) { 152 val n = 100 // number of coroutines to launch 153 val k = 1000 // times an action is repeated by each coroutine 154 val time = measureTimeMillis { 155 coroutineScope { // scope for coroutines 156 repeat(n) { 157 launch { 158 repeat(k) { action() } 159 } 160 } 161 } 162 } 163 println("Completed ${n * k} actions in $time ms") 164} 165 166//sampleStart 167val counter = AtomicInteger() 168 169fun main() = runBlocking { 170 withContext(Dispatchers.Default) { 171 massiveRun { 172 counter.incrementAndGet() 173 } 174 } 175 println("Counter = $counter") 176} 177//sampleEnd 178``` 179{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 180 181> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt). 182> 183{type="note"} 184 185<!--- TEST ARBITRARY_TIME 186Completed 100000 actions in xxx ms 187Counter = 100000 188--> 189 190This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other 191standard data structures and basic operations on them. However, it does not easily scale to complex 192state or to complex operations that do not have ready-to-use thread-safe implementations. 193 194## Thread confinement fine-grained 195 196_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared 197state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to 198the single event-dispatch/application thread. It is easy to apply with coroutines by using a 199single-threaded context. 200 201<!--- CLEAR --> 202 203```kotlin 204import kotlinx.coroutines.* 205import kotlin.system.* 206 207suspend fun massiveRun(action: suspend () -> Unit) { 208 val n = 100 // number of coroutines to launch 209 val k = 1000 // times an action is repeated by each coroutine 210 val time = measureTimeMillis { 211 coroutineScope { // scope for coroutines 212 repeat(n) { 213 launch { 214 repeat(k) { action() } 215 } 216 } 217 } 218 } 219 println("Completed ${n * k} actions in $time ms") 220} 221 222//sampleStart 223val counterContext = newSingleThreadContext("CounterContext") 224var counter = 0 225 226fun main() = runBlocking { 227 withContext(Dispatchers.Default) { 228 massiveRun { 229 // confine each increment to a single-threaded context 230 withContext(counterContext) { 231 counter++ 232 } 233 } 234 } 235 println("Counter = $counter") 236} 237//sampleEnd 238``` 239{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 240 241> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt). 242> 243{type="note"} 244 245<!--- TEST ARBITRARY_TIME 246Completed 100000 actions in xxx ms 247Counter = 100000 248--> 249 250This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches 251from multi-threaded [Dispatchers.Default] context to the single-threaded context using 252[withContext(counterContext)][withContext] block. 253 254## Thread confinement coarse-grained 255 256In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic 257are confined to the single thread. The following example does it like that, running each coroutine in 258the single-threaded context to start with. 259 260<!--- CLEAR --> 261 262```kotlin 263import kotlinx.coroutines.* 264import kotlin.system.* 265 266suspend fun massiveRun(action: suspend () -> Unit) { 267 val n = 100 // number of coroutines to launch 268 val k = 1000 // times an action is repeated by each coroutine 269 val time = measureTimeMillis { 270 coroutineScope { // scope for coroutines 271 repeat(n) { 272 launch { 273 repeat(k) { action() } 274 } 275 } 276 } 277 } 278 println("Completed ${n * k} actions in $time ms") 279} 280 281//sampleStart 282val counterContext = newSingleThreadContext("CounterContext") 283var counter = 0 284 285fun main() = runBlocking { 286 // confine everything to a single-threaded context 287 withContext(counterContext) { 288 massiveRun { 289 counter++ 290 } 291 } 292 println("Counter = $counter") 293} 294//sampleEnd 295``` 296{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 297 298> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt). 299> 300{type="note"} 301 302<!--- TEST ARBITRARY_TIME 303Completed 100000 actions in xxx ms 304Counter = 100000 305--> 306 307This now works much faster and produces correct result. 308 309## Mutual exclusion 310 311Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_ 312that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that. 313Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to 314delimit a critical section. The key difference is that `Mutex.lock()` is a suspending function. It does not block a thread. 315 316There is also [withLock] extension function that conveniently represents 317`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern: 318 319<!--- CLEAR --> 320 321```kotlin 322import kotlinx.coroutines.* 323import kotlinx.coroutines.sync.* 324import kotlin.system.* 325 326suspend fun massiveRun(action: suspend () -> Unit) { 327 val n = 100 // number of coroutines to launch 328 val k = 1000 // times an action is repeated by each coroutine 329 val time = measureTimeMillis { 330 coroutineScope { // scope for coroutines 331 repeat(n) { 332 launch { 333 repeat(k) { action() } 334 } 335 } 336 } 337 } 338 println("Completed ${n * k} actions in $time ms") 339} 340 341//sampleStart 342val mutex = Mutex() 343var counter = 0 344 345fun main() = runBlocking { 346 withContext(Dispatchers.Default) { 347 massiveRun { 348 // protect each increment with lock 349 mutex.withLock { 350 counter++ 351 } 352 } 353 } 354 println("Counter = $counter") 355} 356//sampleEnd 357``` 358{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 359 360> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt). 361> 362{type="note"} 363 364<!--- TEST ARBITRARY_TIME 365Completed 100000 actions in xxx ms 366Counter = 100000 367--> 368 369The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations 370where you absolutely must modify some shared state periodically, but there is no natural thread that this state 371is confined to. 372 373## Actors 374 375An [actor](https://en.wikipedia.org/wiki/Actor_model) is an entity made up of a combination of a coroutine, 376the state that is confined and encapsulated into this coroutine, 377and a channel to communicate with other coroutines. A simple actor can be written as a function, 378but an actor with a complex state is better suited for a class. 379 380There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its 381scope to receive messages from and combines the send channel into the resulting job object, so that a 382single reference to the actor can be carried around as its handle. 383 384The first step of using an actor is to define a class of messages that an actor is going to process. 385Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose. 386We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message 387to get its value. The latter needs to send a response. A [CompletableDeferred] communication 388primitive, that represents a single value that will be known (communicated) in the future, 389is used here for that purpose. 390 391```kotlin 392// Message types for counterActor 393sealed class CounterMsg 394object IncCounter : CounterMsg() // one-way message to increment counter 395class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply 396``` 397 398Then we define a function that launches an actor using an [actor] coroutine builder: 399 400```kotlin 401// This function launches a new counter actor 402fun CoroutineScope.counterActor() = actor<CounterMsg> { 403 var counter = 0 // actor state 404 for (msg in channel) { // iterate over incoming messages 405 when (msg) { 406 is IncCounter -> counter++ 407 is GetCounter -> msg.response.complete(counter) 408 } 409 } 410} 411``` 412 413The main code is straightforward: 414 415<!--- CLEAR --> 416 417```kotlin 418import kotlinx.coroutines.* 419import kotlinx.coroutines.channels.* 420import kotlin.system.* 421 422suspend fun massiveRun(action: suspend () -> Unit) { 423 val n = 100 // number of coroutines to launch 424 val k = 1000 // times an action is repeated by each coroutine 425 val time = measureTimeMillis { 426 coroutineScope { // scope for coroutines 427 repeat(n) { 428 launch { 429 repeat(k) { action() } 430 } 431 } 432 } 433 } 434 println("Completed ${n * k} actions in $time ms") 435} 436 437// Message types for counterActor 438sealed class CounterMsg 439object IncCounter : CounterMsg() // one-way message to increment counter 440class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply 441 442// This function launches a new counter actor 443fun CoroutineScope.counterActor() = actor<CounterMsg> { 444 var counter = 0 // actor state 445 for (msg in channel) { // iterate over incoming messages 446 when (msg) { 447 is IncCounter -> counter++ 448 is GetCounter -> msg.response.complete(counter) 449 } 450 } 451} 452 453//sampleStart 454fun main() = runBlocking<Unit> { 455 val counter = counterActor() // create the actor 456 withContext(Dispatchers.Default) { 457 massiveRun { 458 counter.send(IncCounter) 459 } 460 } 461 // send a message to get a counter value from an actor 462 val response = CompletableDeferred<Int>() 463 counter.send(GetCounter(response)) 464 println("Counter = ${response.await()}") 465 counter.close() // shutdown the actor 466} 467//sampleEnd 468``` 469{kotlin-runnable="true" kotlin-min-compiler-version="1.3"} 470 471> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-07.kt). 472> 473{type="note"} 474 475<!--- TEST ARBITRARY_TIME 476Completed 100000 actions in xxx ms 477Counter = 100000 478--> 479 480It does not matter (for correctness) what context the actor itself is executed in. An actor is 481a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine 482works as a solution to the problem of shared mutable state. Indeed, actors may modify their own private state, 483but can only affect each other through messages (avoiding the need for any locks). 484 485Actor is more efficient than locking under load, because in this case it always has work to do and it does not 486have to switch to a different context at all. 487 488> Note that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated 489> with the channel that it receives messages from, while a producer is associated with the channel that it 490> sends elements to. 491> 492{type="note"} 493 494<!--- MODULE kotlinx-coroutines-core --> 495<!--- INDEX kotlinx.coroutines --> 496 497[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html 498[withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html 499[CompletableDeferred]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html 500 501<!--- INDEX kotlinx.coroutines.sync --> 502 503[Mutex]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html 504[Mutex.lock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html 505[Mutex.unlock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html 506[withLock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html 507 508<!--- INDEX kotlinx.coroutines.channels --> 509 510[actor]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html 511[produce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html 512 513<!--- END --> 514