1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 @file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE") 5 6 package kotlinx.coroutines.internal 7 8 import kotlinx.atomicfu.atomic 9 import kotlinx.coroutines.* 10 import kotlin.jvm.* 11 import kotlin.native.concurrent.* 12 13 /** 14 * The most abstract operation that can be in process. Other threads observing an instance of this 15 * class in the fields of their object shall invoke [perform] to help. 16 * 17 * @suppress **This is unstable API and it is subject to change.** 18 */ 19 public abstract class OpDescriptor { 20 /** 21 * Returns `null` is operation was performed successfully or some other 22 * object that indicates the failure reason. 23 */ performnull24 abstract fun perform(affected: Any?): Any? 25 26 /** 27 * Returns reference to atomic operation that this descriptor is a part of or `null` 28 * if not a part of any [AtomicOp]. 29 */ 30 abstract val atomicOp: AtomicOp<*>? 31 32 override fun toString(): String = "$classSimpleName@$hexAddress" // debug 33 34 fun isEarlierThan(that: OpDescriptor): Boolean { 35 val thisOp = atomicOp ?: return false 36 val thatOp = that.atomicOp ?: return false 37 return thisOp.opSequence < thatOp.opSequence 38 } 39 } 40 41 @SharedImmutable 42 @JvmField 43 internal val NO_DECISION: Any = Symbol("NO_DECISION") 44 45 /** 46 * Descriptor for multi-word atomic operation. 47 * Based on paper 48 * ["A Practical Multi-Word Compare-and-Swap Operation"](https://www.cl.cam.ac.uk/research/srg/netos/papers/2002-casn.pdf) 49 * by Timothy L. Harris, Keir Fraser and Ian A. Pratt. 50 * 51 * Note: parts of atomic operation must be globally ordered. Otherwise, this implementation will produce 52 * `StackOverflowError`. 53 * 54 * @suppress **This is unstable API and it is subject to change.** 55 */ 56 @InternalCoroutinesApi 57 public abstract class AtomicOp<in T> : OpDescriptor() { 58 private val _consensus = atomic<Any?>(NO_DECISION) 59 60 // Returns NO_DECISION when there is not decision yet 61 val consensus: Any? get() = _consensus.value 62 63 val isDecided: Boolean get() = _consensus.value !== NO_DECISION 64 65 /** 66 * Sequence number of this multi-word operation for deadlock resolution. 67 * An operation with lower number aborts itself with (using [RETRY_ATOMIC] error symbol) if it encounters 68 * the need to help the operation with higher sequence number and then restarts 69 * (using higher `opSequence` to ensure progress). 70 * Simple operations that cannot get into the deadlock always return zero here. 71 * 72 * See https://github.com/Kotlin/kotlinx.coroutines/issues/504 73 */ 74 open val opSequence: Long get() = 0L 75 76 override val atomicOp: AtomicOp<*> get() = this 77 decidenull78 fun decide(decision: Any?): Any? { 79 assert { decision !== NO_DECISION } 80 val current = _consensus.value 81 if (current !== NO_DECISION) return current 82 if (_consensus.compareAndSet(NO_DECISION, decision)) return decision 83 return _consensus.value 84 } 85 preparenull86 abstract fun prepare(affected: T): Any? // `null` if Ok, or failure reason 87 88 abstract fun complete(affected: T, failure: Any?) // failure != null if failed to prepare op 89 90 // returns `null` on success 91 @Suppress("UNCHECKED_CAST") 92 final override fun perform(affected: Any?): Any? { 93 // make decision on status 94 var decision = this._consensus.value 95 if (decision === NO_DECISION) { 96 decision = decide(prepare(affected as T)) 97 } 98 // complete operation 99 complete(affected as T, decision) 100 return decision 101 } 102 } 103 104 /** 105 * A part of multi-step atomic operation [AtomicOp]. 106 * 107 * @suppress **This is unstable API and it is subject to change.** 108 */ 109 public abstract class AtomicDesc { 110 lateinit var atomicOp: AtomicOp<*> // the reference to parent atomicOp, init when AtomicOp is created preparenull111 abstract fun prepare(op: AtomicOp<*>): Any? // returns `null` if prepared successfully 112 abstract fun complete(op: AtomicOp<*>, failure: Any?) // decision == null if success 113 } 114 115 /** 116 * It is returned as an error by [AtomicOp] implementations when they detect potential deadlock 117 * using [AtomicOp.opSequence] numbers. 118 */ 119 @JvmField 120 @SharedImmutable 121 internal val RETRY_ATOMIC: Any = Symbol("RETRY_ATOMIC") 122