• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 @file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE")
5 
6 package kotlinx.coroutines.internal
7 
8 import kotlinx.atomicfu.atomic
9 import kotlinx.coroutines.*
10 import kotlin.jvm.*
11 import kotlin.native.concurrent.*
12 
13 /**
14  * The most abstract operation that can be in process. Other threads observing an instance of this
15  * class in the fields of their object shall invoke [perform] to help.
16  *
17  * @suppress **This is unstable API and it is subject to change.**
18  */
19 public abstract class OpDescriptor {
20     /**
21      * Returns `null` is operation was performed successfully or some other
22      * object that indicates the failure reason.
23      */
performnull24     abstract fun perform(affected: Any?): Any?
25 
26     /**
27      * Returns reference to atomic operation that this descriptor is a part of or `null`
28      * if not a part of any [AtomicOp].
29      */
30     abstract val atomicOp: AtomicOp<*>?
31 
32     override fun toString(): String = "$classSimpleName@$hexAddress" // debug
33 
34     fun isEarlierThan(that: OpDescriptor): Boolean {
35         val thisOp = atomicOp ?: return false
36         val thatOp = that.atomicOp ?: return false
37         return thisOp.opSequence < thatOp.opSequence
38     }
39 }
40 
41 @SharedImmutable
42 @JvmField
43 internal val NO_DECISION: Any = Symbol("NO_DECISION")
44 
45 /**
46  * Descriptor for multi-word atomic operation.
47  * Based on paper
48  * ["A Practical Multi-Word Compare-and-Swap Operation"](https://www.cl.cam.ac.uk/research/srg/netos/papers/2002-casn.pdf)
49  * by Timothy L. Harris, Keir Fraser and Ian A. Pratt.
50  *
51  * Note: parts of atomic operation must be globally ordered. Otherwise, this implementation will produce
52  * `StackOverflowError`.
53  *
54  * @suppress **This is unstable API and it is subject to change.**
55  */
56 @InternalCoroutinesApi
57 public abstract class AtomicOp<in T> : OpDescriptor() {
58     private val _consensus = atomic<Any?>(NO_DECISION)
59 
60     // Returns NO_DECISION when there is not decision yet
61     val consensus: Any? get() = _consensus.value
62 
63     val isDecided: Boolean get() = _consensus.value !== NO_DECISION
64 
65     /**
66      * Sequence number of this multi-word operation for deadlock resolution.
67      * An operation with lower number aborts itself with (using [RETRY_ATOMIC] error symbol) if it encounters
68      * the need to help the operation with higher sequence number and then restarts
69      * (using higher `opSequence` to ensure progress).
70      * Simple operations that cannot get into the deadlock always return zero here.
71      *
72      * See https://github.com/Kotlin/kotlinx.coroutines/issues/504
73      */
74     open val opSequence: Long get() = 0L
75 
76     override val atomicOp: AtomicOp<*> get() = this
77 
decidenull78     fun decide(decision: Any?): Any? {
79         assert { decision !== NO_DECISION }
80         val current = _consensus.value
81         if (current !== NO_DECISION) return current
82         if (_consensus.compareAndSet(NO_DECISION, decision)) return decision
83         return _consensus.value
84     }
85 
preparenull86     abstract fun prepare(affected: T): Any? // `null` if Ok, or failure reason
87 
88     abstract fun complete(affected: T, failure: Any?) // failure != null if failed to prepare op
89 
90     // returns `null` on success
91     @Suppress("UNCHECKED_CAST")
92     final override fun perform(affected: Any?): Any? {
93         // make decision on status
94         var decision = this._consensus.value
95         if (decision === NO_DECISION) {
96             decision = decide(prepare(affected as T))
97         }
98         // complete operation
99         complete(affected as T, decision)
100         return decision
101     }
102 }
103 
104 /**
105  * A part of multi-step atomic operation [AtomicOp].
106  *
107  * @suppress **This is unstable API and it is subject to change.**
108  */
109 public abstract class AtomicDesc {
110     lateinit var atomicOp: AtomicOp<*> // the reference to parent atomicOp, init when AtomicOp is created
preparenull111     abstract fun prepare(op: AtomicOp<*>): Any? // returns `null` if prepared successfully
112     abstract fun complete(op: AtomicOp<*>, failure: Any?) // decision == null if success
113 }
114 
115 /**
116  * It is returned as an error by [AtomicOp] implementations when they detect potential deadlock
117  * using [AtomicOp.opSequence] numbers.
118  */
119 @JvmField
120 @SharedImmutable
121 internal val RETRY_ATOMIC: Any = Symbol("RETRY_ATOMIC")
122