• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<!--- TEST_NAME SharedStateGuideTest -->
2
3[//]: # (title: Shared mutable state and concurrency)
4
5Coroutines can be executed parallelly using a multi-threaded dispatcher like the [Dispatchers.Default]. It presents
6all the usual parallelism problems. The main problem being synchronization of access to **shared mutable state**.
7Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
8but others are unique.
9
10## The problem
11
12Let us launch a hundred coroutines all doing the same action a thousand times.
13We'll also measure their completion time for further comparisons:
14
15```kotlin
16suspend fun massiveRun(action: suspend () -> Unit) {
17    val n = 100  // number of coroutines to launch
18    val k = 1000 // times an action is repeated by each coroutine
19    val time = measureTimeMillis {
20        coroutineScope { // scope for coroutines
21            repeat(n) {
22                launch {
23                    repeat(k) { action() }
24                }
25            }
26        }
27    }
28    println("Completed ${n * k} actions in $time ms")
29}
30```
31
32We start with a very simple action that increments a shared mutable variable using
33multi-threaded [Dispatchers.Default].
34
35<!--- CLEAR -->
36
37```kotlin
38import kotlinx.coroutines.*
39import kotlin.system.*
40
41suspend fun massiveRun(action: suspend () -> Unit) {
42    val n = 100  // number of coroutines to launch
43    val k = 1000 // times an action is repeated by each coroutine
44    val time = measureTimeMillis {
45        coroutineScope { // scope for coroutines
46            repeat(n) {
47                launch {
48                    repeat(k) { action() }
49                }
50            }
51        }
52    }
53    println("Completed ${n * k} actions in $time ms")
54}
55
56//sampleStart
57var counter = 0
58
59fun main() = runBlocking {
60    withContext(Dispatchers.Default) {
61        massiveRun {
62            counter++
63        }
64    }
65    println("Counter = $counter")
66}
67//sampleEnd
68```
69{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
70
71> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt).
72>
73{type="note"}
74
75<!--- TEST LINES_START
76Completed 100000 actions in
77Counter =
78-->
79
80What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a hundred coroutines
81increment the `counter` concurrently from multiple threads without any synchronization.
82
83## Volatiles are of no help
84
85There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
86
87<!--- CLEAR -->
88
89```kotlin
90import kotlinx.coroutines.*
91import kotlin.system.*
92
93suspend fun massiveRun(action: suspend () -> Unit) {
94    val n = 100  // number of coroutines to launch
95    val k = 1000 // times an action is repeated by each coroutine
96    val time = measureTimeMillis {
97        coroutineScope { // scope for coroutines
98            repeat(n) {
99                launch {
100                    repeat(k) { action() }
101                }
102            }
103        }
104    }
105    println("Completed ${n * k} actions in $time ms")
106}
107
108//sampleStart
109@Volatile // in Kotlin `volatile` is an annotation
110var counter = 0
111
112fun main() = runBlocking {
113    withContext(Dispatchers.Default) {
114        massiveRun {
115            counter++
116        }
117    }
118    println("Counter = $counter")
119}
120//sampleEnd
121```
122{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
123
124> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt).
125>
126{type="note"}
127
128<!--- TEST LINES_START
129Completed 100000 actions in
130Counter =
131-->
132
133This code works slower, but we still don't get "Counter = 100000" at the end, because volatile variables guarantee
134linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
135do not provide atomicity of larger actions (increment in our case).
136
137## Thread-safe data structures
138
139The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
140linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding
141operations that needs to be performed on a shared state.
142In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
143
144<!--- CLEAR -->
145
146```kotlin
147import kotlinx.coroutines.*
148import java.util.concurrent.atomic.*
149import kotlin.system.*
150
151suspend fun massiveRun(action: suspend () -> Unit) {
152    val n = 100  // number of coroutines to launch
153    val k = 1000 // times an action is repeated by each coroutine
154    val time = measureTimeMillis {
155        coroutineScope { // scope for coroutines
156            repeat(n) {
157                launch {
158                    repeat(k) { action() }
159                }
160            }
161        }
162    }
163    println("Completed ${n * k} actions in $time ms")
164}
165
166//sampleStart
167val counter = AtomicInteger()
168
169fun main() = runBlocking {
170    withContext(Dispatchers.Default) {
171        massiveRun {
172            counter.incrementAndGet()
173        }
174    }
175    println("Counter = $counter")
176}
177//sampleEnd
178```
179{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
180
181> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt).
182>
183{type="note"}
184
185<!--- TEST ARBITRARY_TIME
186Completed 100000 actions in xxx ms
187Counter = 100000
188-->
189
190This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
191standard data structures and basic operations on them. However, it does not easily scale to complex
192state or to complex operations that do not have ready-to-use thread-safe implementations.
193
194## Thread confinement fine-grained
195
196_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
197state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
198the single event-dispatch/application thread. It is easy to apply with coroutines by using a
199single-threaded context.
200
201<!--- CLEAR -->
202
203```kotlin
204import kotlinx.coroutines.*
205import kotlin.system.*
206
207suspend fun massiveRun(action: suspend () -> Unit) {
208    val n = 100  // number of coroutines to launch
209    val k = 1000 // times an action is repeated by each coroutine
210    val time = measureTimeMillis {
211        coroutineScope { // scope for coroutines
212            repeat(n) {
213                launch {
214                    repeat(k) { action() }
215                }
216            }
217        }
218    }
219    println("Completed ${n * k} actions in $time ms")
220}
221
222//sampleStart
223val counterContext = newSingleThreadContext("CounterContext")
224var counter = 0
225
226fun main() = runBlocking {
227    withContext(Dispatchers.Default) {
228        massiveRun {
229            // confine each increment to a single-threaded context
230            withContext(counterContext) {
231                counter++
232            }
233        }
234    }
235    println("Counter = $counter")
236}
237//sampleEnd
238```
239{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
240
241> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt).
242>
243{type="note"}
244
245<!--- TEST ARBITRARY_TIME
246Completed 100000 actions in xxx ms
247Counter = 100000
248-->
249
250This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
251from multi-threaded [Dispatchers.Default] context to the single-threaded context using
252[withContext(counterContext)][withContext] block.
253
254## Thread confinement coarse-grained
255
256In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
257are confined to the single thread. The following example does it like that, running each coroutine in
258the single-threaded context to start with.
259
260<!--- CLEAR -->
261
262```kotlin
263import kotlinx.coroutines.*
264import kotlin.system.*
265
266suspend fun massiveRun(action: suspend () -> Unit) {
267    val n = 100  // number of coroutines to launch
268    val k = 1000 // times an action is repeated by each coroutine
269    val time = measureTimeMillis {
270        coroutineScope { // scope for coroutines
271            repeat(n) {
272                launch {
273                    repeat(k) { action() }
274                }
275            }
276        }
277    }
278    println("Completed ${n * k} actions in $time ms")
279}
280
281//sampleStart
282val counterContext = newSingleThreadContext("CounterContext")
283var counter = 0
284
285fun main() = runBlocking {
286    // confine everything to a single-threaded context
287    withContext(counterContext) {
288        massiveRun {
289            counter++
290        }
291    }
292    println("Counter = $counter")
293}
294//sampleEnd
295```
296{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
297
298> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt).
299>
300{type="note"}
301
302<!--- TEST ARBITRARY_TIME
303Completed 100000 actions in xxx ms
304Counter = 100000
305-->
306
307This now works much faster and produces correct result.
308
309## Mutual exclusion
310
311Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
312that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
313Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
314delimit a critical section. The key difference is that `Mutex.lock()` is a suspending function. It does not block a thread.
315
316There is also [withLock] extension function that conveniently represents
317`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern:
318
319<!--- CLEAR -->
320
321```kotlin
322import kotlinx.coroutines.*
323import kotlinx.coroutines.sync.*
324import kotlin.system.*
325
326suspend fun massiveRun(action: suspend () -> Unit) {
327    val n = 100  // number of coroutines to launch
328    val k = 1000 // times an action is repeated by each coroutine
329    val time = measureTimeMillis {
330        coroutineScope { // scope for coroutines
331            repeat(n) {
332                launch {
333                    repeat(k) { action() }
334                }
335            }
336        }
337    }
338    println("Completed ${n * k} actions in $time ms")
339}
340
341//sampleStart
342val mutex = Mutex()
343var counter = 0
344
345fun main() = runBlocking {
346    withContext(Dispatchers.Default) {
347        massiveRun {
348            // protect each increment with lock
349            mutex.withLock {
350                counter++
351            }
352        }
353    }
354    println("Counter = $counter")
355}
356//sampleEnd
357```
358{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
359
360> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt).
361>
362{type="note"}
363
364<!--- TEST ARBITRARY_TIME
365Completed 100000 actions in xxx ms
366Counter = 100000
367-->
368
369The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
370where you absolutely must modify some shared state periodically, but there is no natural thread that this state
371is confined to.
372
373## Actors
374
375An [actor](https://en.wikipedia.org/wiki/Actor_model) is an entity made up of a combination of a coroutine,
376the state that is confined and encapsulated into this coroutine,
377and a channel to communicate with other coroutines. A simple actor can be written as a function,
378but an actor with a complex state is better suited for a class.
379
380There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
381scope to receive messages from and combines the send channel into the resulting job object, so that a
382single reference to the actor can be carried around as its handle.
383
384The first step of using an actor is to define a class of messages that an actor is going to process.
385Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose.
386We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message
387to get its value. The latter needs to send a response. A [CompletableDeferred] communication
388primitive, that represents a single value that will be known (communicated) in the future,
389is used here for that purpose.
390
391```kotlin
392// Message types for counterActor
393sealed class CounterMsg
394object IncCounter : CounterMsg() // one-way message to increment counter
395class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
396```
397
398Then we define a function that launches an actor using an [actor] coroutine builder:
399
400```kotlin
401// This function launches a new counter actor
402fun CoroutineScope.counterActor() = actor<CounterMsg> {
403    var counter = 0 // actor state
404    for (msg in channel) { // iterate over incoming messages
405        when (msg) {
406            is IncCounter -> counter++
407            is GetCounter -> msg.response.complete(counter)
408        }
409    }
410}
411```
412
413The main code is straightforward:
414
415<!--- CLEAR -->
416
417```kotlin
418import kotlinx.coroutines.*
419import kotlinx.coroutines.channels.*
420import kotlin.system.*
421
422suspend fun massiveRun(action: suspend () -> Unit) {
423    val n = 100  // number of coroutines to launch
424    val k = 1000 // times an action is repeated by each coroutine
425    val time = measureTimeMillis {
426        coroutineScope { // scope for coroutines
427            repeat(n) {
428                launch {
429                    repeat(k) { action() }
430                }
431            }
432        }
433    }
434    println("Completed ${n * k} actions in $time ms")
435}
436
437// Message types for counterActor
438sealed class CounterMsg
439object IncCounter : CounterMsg() // one-way message to increment counter
440class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
441
442// This function launches a new counter actor
443fun CoroutineScope.counterActor() = actor<CounterMsg> {
444    var counter = 0 // actor state
445    for (msg in channel) { // iterate over incoming messages
446        when (msg) {
447            is IncCounter -> counter++
448            is GetCounter -> msg.response.complete(counter)
449        }
450    }
451}
452
453//sampleStart
454fun main() = runBlocking<Unit> {
455    val counter = counterActor() // create the actor
456    withContext(Dispatchers.Default) {
457        massiveRun {
458            counter.send(IncCounter)
459        }
460    }
461    // send a message to get a counter value from an actor
462    val response = CompletableDeferred<Int>()
463    counter.send(GetCounter(response))
464    println("Counter = ${response.await()}")
465    counter.close() // shutdown the actor
466}
467//sampleEnd
468```
469{kotlin-runnable="true" kotlin-min-compiler-version="1.3"}
470
471> You can get the full code [here](../../kotlinx-coroutines-core/jvm/test/guide/example-sync-07.kt).
472>
473{type="note"}
474
475<!--- TEST ARBITRARY_TIME
476Completed 100000 actions in xxx ms
477Counter = 100000
478-->
479
480It does not matter (for correctness) what context the actor itself is executed in. An actor is
481a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
482works as a solution to the problem of shared mutable state. Indeed, actors may modify their own private state,
483but can only affect each other through messages (avoiding the need for any locks).
484
485Actor is more efficient than locking under load, because in this case it always has work to do and it does not
486have to switch to a different context at all.
487
488> Note that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
489> with the channel that it receives messages from, while a producer is associated with the channel that it
490> sends elements to.
491>
492{type="note"}
493
494<!--- MODULE kotlinx-coroutines-core -->
495<!--- INDEX kotlinx.coroutines -->
496
497[Dispatchers.Default]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
498[withContext]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
499[CompletableDeferred]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html
500
501<!--- INDEX kotlinx.coroutines.sync -->
502
503[Mutex]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
504[Mutex.lock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
505[Mutex.unlock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html
506[withLock]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html
507
508<!--- INDEX kotlinx.coroutines.channels -->
509
510[actor]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html
511[produce]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
512
513<!--- END -->
514