• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.intrinsics.*
9 import kotlinx.coroutines.selects.*
10 import kotlin.coroutines.*
11 import kotlin.coroutines.intrinsics.*
12 
13 /**
14  * Scope for [actor][GlobalScope.actor] coroutine builder.
15  *
16  * **Note: This API will become obsolete in future updates with introduction of complex actors.**
17  *           See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
18  */
19 @ObsoleteCoroutinesApi
20 public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
21     /**
22      * A reference to the mailbox channel that this coroutine [receives][receive] messages from.
23      * It is provided for convenience, so that the code in the coroutine can refer
24      * to the channel as `channel` as apposed to `this`.
25      * All the [ReceiveChannel] functions on this interface delegate to
26      * the channel instance returned by this function.
27      */
28     public val channel: Channel<E>
29 }
30 
31 /**
32  * Launches new coroutine that is receiving messages from its mailbox channel
33  * and returns a reference to its mailbox channel as a [SendChannel]. The resulting
34  * object can be used to [send][SendChannel.send] messages to this coroutine.
35  *
36  * The scope of the coroutine contains [ActorScope] interface, which implements
37  * both [CoroutineScope] and [ReceiveChannel], so that coroutine can invoke
38  * [receive][ReceiveChannel.receive] directly. The channel is [closed][SendChannel.close]
39  * when the coroutine completes.
40  *
41  * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
42  * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
43  * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
44  * with corresponding [context] element.
45  *
46  * By default, the coroutine is immediately scheduled for execution.
47  * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
48  * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
49  * it will be started implicitly on the first message
50  * [sent][SendChannel.send] to this actors's mailbox channel.
51  *
52  * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
53  * the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
54  *
55  * The kind of the resulting channel depends on the specified [capacity] parameter.
56  * See [Channel] interface documentation for details.
57  *
58  * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
59  *
60  * ### Using actors
61  *
62  * A typical usage of the actor builder looks like this:
63  *
64  * ```
65  * val c = actor {
66  *     // initialize actor's state
67  *     for (msg in channel) {
68  *         // process message here
69  *     }
70  * }
71  * // send messages to the actor
72  * c.send(...)
73  * ...
74  * // stop the actor when it is no longer needed
75  * c.close()
76  * ```
77  *
78  * ### Stopping and cancelling actors
79  *
80  * When the inbox channel of the actor is [closed][SendChannel.close] it sends a special "close token" to the actor.
81  * The actor still processes all the messages that were already sent and then "`for (msg in channel)`" loop terminates
82  * and the actor completes.
83  *
84  * If the actor needs to be aborted without processing all the messages that were already sent to it, then
85  * it shall be created with a parent job:
86  *
87  * ```
88  * val job = Job()
89  * val c = actor(context = job) {  ... }
90  * ...
91  * // abort the actor
92  * job.cancel()
93  * ```
94  *
95  * When actor's parent job is [cancelled][Job.cancel], then actor's job becomes cancelled. It means that
96  * "`for (msg in channel)`" and other cancellable suspending functions throw [CancellationException] and actor
97  * completes without processing remaining messages.
98  *
99  * **Note: This API will become obsolete in future updates with introduction of complex actors.**
100  *           See [issue #87](https://github.com/Kotlin/kotlinx.coroutines/issues/87).
101  *
102  * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
103  * @param capacity capacity of the channel's buffer (no buffer by default).
104  * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
105  * @param onCompletion optional completion handler for the actor coroutine (see [Job.invokeOnCompletion])
106  * @param block the coroutine code.
107  */
108 @ObsoleteCoroutinesApi
actornull109 public fun <E> CoroutineScope.actor(
110     context: CoroutineContext = EmptyCoroutineContext,
111     capacity: Int = 0, // todo: Maybe Channel.DEFAULT here?
112     start: CoroutineStart = CoroutineStart.DEFAULT,
113     onCompletion: CompletionHandler? = null,
114     block: suspend ActorScope<E>.() -> Unit
115 ): SendChannel<E> {
116     val newContext = newCoroutineContext(context)
117     val channel = Channel<E>(capacity)
118     val coroutine = if (start.isLazy)
119         LazyActorCoroutine(newContext, channel, block) else
120         ActorCoroutine(newContext, channel, active = true)
121     if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
122     coroutine.start(start, coroutine, block)
123     return coroutine
124 }
125 
126 private open class ActorCoroutine<E>(
127     parentContext: CoroutineContext,
128     channel: Channel<E>,
129     active: Boolean
130 ) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> {
131 
132     init {
133         initParentJob(parentContext[Job])
134     }
135 
onCancellingnull136     override fun onCancelling(cause: Throwable?) {
137         _channel.cancel(cause?.let {
138             it as? CancellationException ?: CancellationException("$classSimpleName was cancelled", it)
139         })
140     }
141 
handleJobExceptionnull142     override fun handleJobException(exception: Throwable): Boolean {
143         handleCoroutineException(context, exception)
144         return true
145     }
146 }
147 
148 private class LazyActorCoroutine<E>(
149     parentContext: CoroutineContext,
150     channel: Channel<E>,
151     block: suspend ActorScope<E>.() -> Unit
152 ) : ActorCoroutine<E>(parentContext, channel, active = false) {
153 
154     private var continuation = block.createCoroutineUnintercepted(this, this)
155 
onStartnull156     override fun onStart() {
157         continuation.startCoroutineCancellable(this)
158     }
159 
sendnull160     override suspend fun send(element: E) {
161         start()
162         return super.send(element)
163     }
164 
165     @Suppress("DEPRECATION_ERROR")
166     @Deprecated(
167         level = DeprecationLevel.ERROR,
168         message = "Deprecated in the favour of 'trySend' method",
169         replaceWith = ReplaceWith("trySend(element).isSuccess")
170     ) // See super()
offernull171     override fun offer(element: E): Boolean {
172         start()
173         return super.offer(element)
174     }
175 
trySendnull176     override fun trySend(element: E): ChannelResult<Unit> {
177         start()
178         return super.trySend(element)
179     }
180 
closenull181     override fun close(cause: Throwable?): Boolean {
182         // close the channel _first_
183         val closed = super.close(cause)
184         // then start the coroutine (it will promptly fail if it was not started yet)
185         start()
186         return closed
187     }
188 
189     @Suppress("UNCHECKED_CAST")
190     override val onSend: SelectClause2<E, SendChannel<E>> get() = SelectClause2Impl(
191         clauseObject = this,
192         regFunc = LazyActorCoroutine<*>::onSendRegFunction as RegistrationFunction,
193         processResFunc = super.onSend.processResFunc
194     )
195 
onSendRegFunctionnull196     private fun onSendRegFunction(select: SelectInstance<*>, element: Any?) {
197         onStart()
198         super.onSend.regFunc(this, select, element)
199     }
200 }
201