1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.channels
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.intrinsics.*
9 import kotlinx.coroutines.selects.*
10 import kotlin.coroutines.*
11 import kotlin.coroutines.intrinsics.*
12
13 /**
14 * Scope for [actor][GlobalScope.actor] coroutine builder.
15 *
16 * **Note: This API will become obsolete in future updates with introduction of complex actors.**
17 * See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
18 */
19 @ObsoleteCoroutinesApi
20 public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
21 /**
22 * A reference to the mailbox channel that this coroutine [receives][receive] messages from.
23 * It is provided for convenience, so that the code in the coroutine can refer
24 * to the channel as `channel` as apposed to `this`.
25 * All the [ReceiveChannel] functions on this interface delegate to
26 * the channel instance returned by this function.
27 */
28 public val channel: Channel<E>
29 }
30
31 /**
32 * Launches new coroutine that is receiving messages from its mailbox channel
33 * and returns a reference to its mailbox channel as a [SendChannel]. The resulting
34 * object can be used to [send][SendChannel.send] messages to this coroutine.
35 *
36 * The scope of the coroutine contains [ActorScope] interface, which implements
37 * both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke
38 * [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
39 * when the coroutine completes.
40 *
41 * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
42 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
43 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
44 * with corresponding [context] element.
45 *
46 * By default, the coroutine is immediately scheduled for execution.
47 * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
48 * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
49 * it will be started implicitly on the first message
50 * [sent][SendChannel.send] to this actors's mailbox channel.
51 *
52 * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
53 * the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
54 *
55 * The kind of the resulting channel depends on the specified [capacity] parameter.
56 * See [Channel] interface documentation for details.
57 *
58 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
59 *
60 * ### Using actors
61 *
62 * A typical usage of the actor builder looks like this:
63 *
64 * ```
65 * val c = actor {
66 * // initialize actor's state
67 * for (msg in channel) {
68 * // process message here
69 * }
70 * }
71 * // send messages to the actor
72 * c.send(...)
73 * ...
74 * // stop the actor when it is no longer needed
75 * c.close()
76 * ```
77 *
78 * ### Stopping and cancelling actors
79 *
80 * When the inbox channel of the actor is [closed][SendChannel.close] it sends a special "close token" to the actor.
81 * The actor still processes all the messages that were already sent and then "`for (msg in channel)`" loop terminates
82 * and the actor completes.
83 *
84 * If the actor needs to be aborted without processing all the messages that were already sent to it, then
85 * it shall be created with a parent job:
86 *
87 * ```
88 * val job = Job()
89 * val c = actor(context = job) { ... }
90 * ...
91 * // abort the actor
92 * job.cancel()
93 * ```
94 *
95 * When actor's parent job is [cancelled][Job.cancel], then actor's job becomes cancelled. It means that
96 * "`for (msg in channel)`" and other cancellable suspending functions throw [CancellationException] and actor
97 * completes without processing remaining messages.
98 *
99 * **Note: This API will become obsolete in future updates with introduction of complex actors.**
100 * See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
101 *
102 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
103 * @param capacity capacity of the channel's buffer (no buffer by default).
104 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
105 * @param onCompletion optional completion handler for the actor coroutine (see [Job.invokeOnCompletion])
106 * @param block the coroutine code.
107 */
108 @ObsoleteCoroutinesApi
actornull109 public fun <E> CoroutineScope.actor(
110 context: CoroutineContext = EmptyCoroutineContext,
111 capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
112 start: CoroutineStart = CoroutineStart.DEFAULT,
113 onCompletion: CompletionHandler? = null,
114 block: suspend ActorScope<E>.() -> Unit
115 ): SendChannel<E> {
116 val newContext = newCoroutineContext(context)
117 val channel = Channel<E>(capacity)
118 val coroutine = if (start.isLazy)
119 LazyActorCoroutine(newContext, channel, block) else
120 ActorCoroutine(newContext, channel, active = true)
121 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
122 coroutine.start(start, coroutine, block)
123 return coroutine
124 }
125
126 private open class ActorCoroutine<E>(
127 parentContext: CoroutineContext,
128 channel: Channel<E>,
129 active: Boolean
130 ) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> {
131
132 init {
133 initParentJob(parentContext[Job])
134 }
135
onCancellingnull136 override fun onCancelling(cause: Throwable?) {
137 _channel.cancel(cause?.let {
138 it as? CancellationException ?: CancellationException("$classSimpleName was cancelled", it)
139 })
140 }
141
handleJobExceptionnull142 override fun handleJobException(exception: Throwable): Boolean {
143 handleCoroutineException(context, exception)
144 return true
145 }
146 }
147
148 private class LazyActorCoroutine<E>(
149 parentContext: CoroutineContext,
150 channel: Channel<E>,
151 block: suspend ActorScope<E>.() -> Unit
152 ) : ActorCoroutine<E>(parentContext, channel, active = false) {
153
154 private var continuation = block.createCoroutineUnintercepted(this, this)
155
onStartnull156 override fun onStart() {
157 continuation.startCoroutineCancellable(this)
158 }
159
sendnull160 override suspend fun send(element: E) {
161 start()
162 return super.send(element)
163 }
164
165 @Suppress("DEPRECATION_ERROR")
166 @Deprecated(
167 level = DeprecationLevel.ERROR,
168 message = "Deprecated in the favour of 'trySend' method",
169 replaceWith = ReplaceWith("trySend(element).isSuccess")
170 ) // See super()
offernull171 override fun offer(element: E): Boolean {
172 start()
173 return super.offer(element)
174 }
175
trySendnull176 override fun trySend(element: E): ChannelResult<Unit> {
177 start()
178 return super.trySend(element)
179 }
180
closenull181 override fun close(cause: Throwable?): Boolean {
182 // close the channel _first_
183 val closed = super.close(cause)
184 // then start the coroutine (it will promptly fail if it was not started yet)
185 start()
186 return closed
187 }
188
189 @Suppress("UNCHECKED_CAST")
190 override val onSend: SelectClause2<E, SendChannel<E>> get() = SelectClause2Impl(
191 clauseObject = this,
192 regFunc = LazyActorCoroutine<*>::onSendRegFunction as RegistrationFunction,
193 processResFunc = super.onSend.processResFunc
194 )
195
onSendRegFunctionnull196 private fun onSendRegFunction(select: SelectInstance<*>, element: Any?) {
197 onStart()
198 super.onSend.regFunc(this, select, element)
199 }
200 }
201