• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<!--- TEST_NAME SharedStateGuideTest -->
2
3**Table of contents**
4
5<!--- TOC -->
6
7* [Shared mutable state and concurrency](#shared-mutable-state-and-concurrency)
8  * [The problem](#the-problem)
9  * [Volatiles are of no help](#volatiles-are-of-no-help)
10  * [Thread-safe data structures](#thread-safe-data-structures)
11  * [Thread confinement fine-grained](#thread-confinement-fine-grained)
12  * [Thread confinement coarse-grained](#thread-confinement-coarse-grained)
13  * [Mutual exclusion](#mutual-exclusion)
14  * [Actors](#actors)
15
16<!--- END -->
17
18## Shared mutable state and concurrency
19
20Coroutines can be executed concurrently using a multi-threaded dispatcher like the [Dispatchers.Default]. It presents
21all the usual concurrency problems. The main problem being synchronization of access to **shared mutable state**.
22Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world,
23but others are unique.
24
25### The problem
26
27Let us launch a hundred coroutines all doing the same action a thousand times.
28We'll also measure their completion time for further comparisons:
29
30<div class="sample" markdown="1" theme="idea" data-highlight-only>
31
32```kotlin
33suspend fun massiveRun(action: suspend () -> Unit) {
34    val n = 100  // number of coroutines to launch
35    val k = 1000 // times an action is repeated by each coroutine
36    val time = measureTimeMillis {
37        coroutineScope { // scope for coroutines
38            repeat(n) {
39                launch {
40                    repeat(k) { action() }
41                }
42            }
43        }
44    }
45    println("Completed ${n * k} actions in $time ms")
46}
47```
48
49</div>
50
51We start with a very simple action that increments a shared mutable variable using
52multi-threaded [Dispatchers.Default].
53
54<!--- CLEAR -->
55
56<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
57
58```kotlin
59import kotlinx.coroutines.*
60import kotlin.system.*
61
62suspend fun massiveRun(action: suspend () -> Unit) {
63    val n = 100  // number of coroutines to launch
64    val k = 1000 // times an action is repeated by each coroutine
65    val time = measureTimeMillis {
66        coroutineScope { // scope for coroutines
67            repeat(n) {
68                launch {
69                    repeat(k) { action() }
70                }
71            }
72        }
73    }
74    println("Completed ${n * k} actions in $time ms")
75}
76
77//sampleStart
78var counter = 0
79
80fun main() = runBlocking {
81    withContext(Dispatchers.Default) {
82        massiveRun {
83            counter++
84        }
85    }
86    println("Counter = $counter")
87}
88//sampleEnd
89```
90
91</div>
92
93> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt).
94
95<!--- TEST LINES_START
96Completed 100000 actions in
97Counter =
98-->
99
100What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a hundred coroutines
101increment the `counter` concurrently from multiple threads without any synchronization.
102
103### Volatiles are of no help
104
105There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it:
106
107<!--- CLEAR -->
108
109<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
110
111```kotlin
112import kotlinx.coroutines.*
113import kotlin.system.*
114
115suspend fun massiveRun(action: suspend () -> Unit) {
116    val n = 100  // number of coroutines to launch
117    val k = 1000 // times an action is repeated by each coroutine
118    val time = measureTimeMillis {
119        coroutineScope { // scope for coroutines
120            repeat(n) {
121                launch {
122                    repeat(k) { action() }
123                }
124            }
125        }
126    }
127    println("Completed ${n * k} actions in $time ms")
128}
129
130//sampleStart
131@Volatile // in Kotlin `volatile` is an annotation
132var counter = 0
133
134fun main() = runBlocking {
135    withContext(Dispatchers.Default) {
136        massiveRun {
137            counter++
138        }
139    }
140    println("Counter = $counter")
141}
142//sampleEnd
143```
144
145</div>
146
147> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt).
148
149<!--- TEST LINES_START
150Completed 100000 actions in
151Counter =
152-->
153
154This code works slower, but we still don't get "Counter = 100000" at the end, because volatile variables guarantee
155linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but
156do not provide atomicity of larger actions (increment in our case).
157
158### Thread-safe data structures
159
160The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
161linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding
162operations that needs to be performed on a shared state.
163In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations:
164
165<!--- CLEAR -->
166
167<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
168
169```kotlin
170import kotlinx.coroutines.*
171import java.util.concurrent.atomic.*
172import kotlin.system.*
173
174suspend fun massiveRun(action: suspend () -> Unit) {
175    val n = 100  // number of coroutines to launch
176    val k = 1000 // times an action is repeated by each coroutine
177    val time = measureTimeMillis {
178        coroutineScope { // scope for coroutines
179            repeat(n) {
180                launch {
181                    repeat(k) { action() }
182                }
183            }
184        }
185    }
186    println("Completed ${n * k} actions in $time ms")
187}
188
189//sampleStart
190val counter = AtomicInteger()
191
192fun main() = runBlocking {
193    withContext(Dispatchers.Default) {
194        massiveRun {
195            counter.incrementAndGet()
196        }
197    }
198    println("Counter = $counter")
199}
200//sampleEnd
201```
202
203</div>
204
205> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt).
206
207<!--- TEST ARBITRARY_TIME
208Completed 100000 actions in xxx ms
209Counter = 100000
210-->
211
212This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other
213standard data structures and basic operations on them. However, it does not easily scale to complex
214state or to complex operations that do not have ready-to-use thread-safe implementations.
215
216### Thread confinement fine-grained
217
218_Thread confinement_ is an approach to the problem of shared mutable state where all access to the particular shared
219state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
220the single event-dispatch/application thread. It is easy to apply with coroutines by using a
221single-threaded context.
222
223<!--- CLEAR -->
224
225<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
226
227```kotlin
228import kotlinx.coroutines.*
229import kotlin.system.*
230
231suspend fun massiveRun(action: suspend () -> Unit) {
232    val n = 100  // number of coroutines to launch
233    val k = 1000 // times an action is repeated by each coroutine
234    val time = measureTimeMillis {
235        coroutineScope { // scope for coroutines
236            repeat(n) {
237                launch {
238                    repeat(k) { action() }
239                }
240            }
241        }
242    }
243    println("Completed ${n * k} actions in $time ms")
244}
245
246//sampleStart
247val counterContext = newSingleThreadContext("CounterContext")
248var counter = 0
249
250fun main() = runBlocking {
251    withContext(Dispatchers.Default) {
252        massiveRun {
253            // confine each increment to a single-threaded context
254            withContext(counterContext) {
255                counter++
256            }
257        }
258    }
259    println("Counter = $counter")
260}
261//sampleEnd
262```
263
264</div>
265
266> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt).
267
268<!--- TEST ARBITRARY_TIME
269Completed 100000 actions in xxx ms
270Counter = 100000
271-->
272
273This code works very slowly, because it does _fine-grained_ thread-confinement. Each individual increment switches
274from multi-threaded [Dispatchers.Default] context to the single-threaded context using
275[withContext(counterContext)][withContext] block.
276
277### Thread confinement coarse-grained
278
279In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic
280are confined to the single thread. The following example does it like that, running each coroutine in
281the single-threaded context to start with.
282
283<!--- CLEAR -->
284
285<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
286
287```kotlin
288import kotlinx.coroutines.*
289import kotlin.system.*
290
291suspend fun massiveRun(action: suspend () -> Unit) {
292    val n = 100  // number of coroutines to launch
293    val k = 1000 // times an action is repeated by each coroutine
294    val time = measureTimeMillis {
295        coroutineScope { // scope for coroutines
296            repeat(n) {
297                launch {
298                    repeat(k) { action() }
299                }
300            }
301        }
302    }
303    println("Completed ${n * k} actions in $time ms")
304}
305
306//sampleStart
307val counterContext = newSingleThreadContext("CounterContext")
308var counter = 0
309
310fun main() = runBlocking {
311    // confine everything to a single-threaded context
312    withContext(counterContext) {
313        massiveRun {
314            counter++
315        }
316    }
317    println("Counter = $counter")
318}
319//sampleEnd
320```
321
322</div>
323
324> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt).
325
326<!--- TEST ARBITRARY_TIME
327Completed 100000 actions in xxx ms
328Counter = 100000
329-->
330
331This now works much faster and produces correct result.
332
333### Mutual exclusion
334
335Mutual exclusion solution to the problem is to protect all modifications of the shared state with a _critical section_
336that is never executed concurrently. In a blocking world you'd typically use `synchronized` or `ReentrantLock` for that.
337Coroutine's alternative is called [Mutex]. It has [lock][Mutex.lock] and [unlock][Mutex.unlock] functions to
338delimit a critical section. The key difference is that `Mutex.lock()` is a suspending function. It does not block a thread.
339
340There is also [withLock] extension function that conveniently represents
341`mutex.lock(); try { ... } finally { mutex.unlock() }` pattern:
342
343<!--- CLEAR -->
344
345<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
346
347```kotlin
348import kotlinx.coroutines.*
349import kotlinx.coroutines.sync.*
350import kotlin.system.*
351
352suspend fun massiveRun(action: suspend () -> Unit) {
353    val n = 100  // number of coroutines to launch
354    val k = 1000 // times an action is repeated by each coroutine
355    val time = measureTimeMillis {
356        coroutineScope { // scope for coroutines
357            repeat(n) {
358                launch {
359                    repeat(k) { action() }
360                }
361            }
362        }
363    }
364    println("Completed ${n * k} actions in $time ms")
365}
366
367//sampleStart
368val mutex = Mutex()
369var counter = 0
370
371fun main() = runBlocking {
372    withContext(Dispatchers.Default) {
373        massiveRun {
374            // protect each increment with lock
375            mutex.withLock {
376                counter++
377            }
378        }
379    }
380    println("Counter = $counter")
381}
382//sampleEnd
383```
384
385</div>
386
387> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt).
388
389<!--- TEST ARBITRARY_TIME
390Completed 100000 actions in xxx ms
391Counter = 100000
392-->
393
394The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations
395where you absolutely must modify some shared state periodically, but there is no natural thread that this state
396is confined to.
397
398### Actors
399
400An [actor](https://en.wikipedia.org/wiki/Actor_model) is an entity made up of a combination of a coroutine,
401the state that is confined and encapsulated into this coroutine,
402and a channel to communicate with other coroutines. A simple actor can be written as a function,
403but an actor with a complex state is better suited for a class.
404
405There is an [actor] coroutine builder that conveniently combines actor's mailbox channel into its
406scope to receive messages from and combines the send channel into the resulting job object, so that a
407single reference to the actor can be carried around as its handle.
408
409The first step of using an actor is to define a class of messages that an actor is going to process.
410Kotlin's [sealed classes](https://kotlinlang.org/docs/reference/sealed-classes.html) are well suited for that purpose.
411We define `CounterMsg` sealed class with `IncCounter` message to increment a counter and `GetCounter` message
412to get its value. The later needs to send a response. A [CompletableDeferred] communication
413primitive, that represents a single value that will be known (communicated) in the future,
414is used here for that purpose.
415
416<div class="sample" markdown="1" theme="idea" data-highlight-only>
417
418```kotlin
419// Message types for counterActor
420sealed class CounterMsg
421object IncCounter : CounterMsg() // one-way message to increment counter
422class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
423```
424
425</div>
426
427Then we define a function that launches an actor using an [actor] coroutine builder:
428
429<div class="sample" markdown="1" theme="idea" data-highlight-only>
430
431```kotlin
432// This function launches a new counter actor
433fun CoroutineScope.counterActor() = actor<CounterMsg> {
434    var counter = 0 // actor state
435    for (msg in channel) { // iterate over incoming messages
436        when (msg) {
437            is IncCounter -> counter++
438            is GetCounter -> msg.response.complete(counter)
439        }
440    }
441}
442```
443
444</div>
445
446The main code is straightforward:
447
448<!--- CLEAR -->
449
450<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
451
452```kotlin
453import kotlinx.coroutines.*
454import kotlinx.coroutines.channels.*
455import kotlin.system.*
456
457suspend fun massiveRun(action: suspend () -> Unit) {
458    val n = 100  // number of coroutines to launch
459    val k = 1000 // times an action is repeated by each coroutine
460    val time = measureTimeMillis {
461        coroutineScope { // scope for coroutines
462            repeat(n) {
463                launch {
464                    repeat(k) { action() }
465                }
466            }
467        }
468    }
469    println("Completed ${n * k} actions in $time ms")
470}
471
472// Message types for counterActor
473sealed class CounterMsg
474object IncCounter : CounterMsg() // one-way message to increment counter
475class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
476
477// This function launches a new counter actor
478fun CoroutineScope.counterActor() = actor<CounterMsg> {
479    var counter = 0 // actor state
480    for (msg in channel) { // iterate over incoming messages
481        when (msg) {
482            is IncCounter -> counter++
483            is GetCounter -> msg.response.complete(counter)
484        }
485    }
486}
487
488//sampleStart
489fun main() = runBlocking<Unit> {
490    val counter = counterActor() // create the actor
491    withContext(Dispatchers.Default) {
492        massiveRun {
493            counter.send(IncCounter)
494        }
495    }
496    // send a message to get a counter value from an actor
497    val response = CompletableDeferred<Int>()
498    counter.send(GetCounter(response))
499    println("Counter = ${response.await()}")
500    counter.close() // shutdown the actor
501}
502//sampleEnd
503```
504
505</div>
506
507> You can get the full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-sync-07.kt).
508
509<!--- TEST ARBITRARY_TIME
510Completed 100000 actions in xxx ms
511Counter = 100000
512-->
513
514It does not matter (for correctness) what context the actor itself is executed in. An actor is
515a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine
516works as a solution to the problem of shared mutable state. Indeed, actors may modify their own private state,
517but can only affect each other through messages (avoiding the need for any locks).
518
519Actor is more efficient than locking under load, because in this case it always has work to do and it does not
520have to switch to a different context at all.
521
522> Note that an [actor] coroutine builder is a dual of [produce] coroutine builder. An actor is associated
523  with the channel that it receives messages from, while a producer is associated with the channel that it
524  sends elements to.
525
526<!--- MODULE kotlinx-coroutines-core -->
527<!--- INDEX kotlinx.coroutines -->
528[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
529[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
530[CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html
531<!--- INDEX kotlinx.coroutines.sync -->
532[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
533[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
534[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html
535[withLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html
536<!--- INDEX kotlinx.coroutines.channels -->
537[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html
538[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
539<!--- END -->
540