1 /*
<lambda>null2  * Copyright 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.datastore.core
18 
19 import kotlinx.coroutines.CoroutineScope
20 import kotlinx.coroutines.Job
21 import kotlinx.coroutines.channels.Channel
22 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
23 import kotlinx.coroutines.channels.ClosedSendChannelException
24 import kotlinx.coroutines.channels.onClosed
25 import kotlinx.coroutines.ensureActive
26 import kotlinx.coroutines.launch
27 
28 internal class SimpleActor<T>(
29     /** The scope in which to consume messages. */
30     private val scope: CoroutineScope,
31     /** Function that will be called when scope is cancelled. Should *not* throw exceptions. */
32     onComplete: (Throwable?) -> Unit,
33     /**
34      * Function that will be called for each element when the scope is cancelled. Should *not* throw
35      * exceptions.
36      */
37     onUndeliveredElement: (T, Throwable?) -> Unit,
38     /**
39      * Function that will be called once for each message.
40      *
41      * Must *not* throw an exception (other than CancellationException if scope is cancelled).
42      */
43     private val consumeMessage: suspend (T) -> Unit
44 ) {
45     private val messageQueue = Channel<T>(capacity = UNLIMITED)
46 
47     /**
48      * Count of the number of remaining messages to process. When the messageQueue is closed, this
49      * is no longer used.
50      */
51     private val remainingMessages = AtomicInt(0)
52 
53     init {
54         // If the scope doesn't have a job, it won't be cancelled, so we don't need to register a
55         // callback.
56         scope.coroutineContext[Job]?.invokeOnCompletion { ex ->
57             onComplete(ex)
58 
59             // TODO(rohitsat): replace this with Channel(onUndeliveredElement) when it
60             // is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/2435
61 
62             messageQueue.close(ex)
63 
64             while (true) {
65                 messageQueue.tryReceive().getOrNull()?.let { msg -> onUndeliveredElement(msg, ex) }
66                     ?: break
67             }
68         }
69     }
70 
71     /**
72      * Sends a message to a message queue to be processed by [consumeMessage] in [scope].
73      *
74      * If [offer] completes successfully, the msg *will* be processed either by consumeMessage or
75      * onUndeliveredElement. If [offer] throws an exception, the message may or may not be
76      * processed.
77      */
78     fun offer(msg: T) {
79         /**
80          * Possible states:
81          * 1) remainingMessages = 0 All messages have been consumed, so there is no active consumer
82          * 2) remainingMessages > 0, no active consumer One of the senders is responsible for
83          *    triggering the consumer
84          * 3) remainingMessages > 0, active consumer Consumer will continue to consume until
85          *    remainingMessages is 0
86          * 4) messageQueue is closed, there are remaining messages to consume Attempts to offer
87          *    messages will fail, onComplete() will consume remaining messages with onUndelivered.
88          *    The Consumer has already completed since close() is called by onComplete().
89          * 5) messageQueue is closed, there are no remaining messages to consume Attempts to offer
90          *    messages will fail.
91          */
92 
93         // should never return false bc the channel capacity is unlimited
94         check(
95             messageQueue
96                 .trySend(msg)
97                 .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
98                 .isSuccess
99         )
100 
101         // If the number of remaining messages was 0, there is no active consumer, since it quits
102         // consuming once remaining messages hits 0. We must kick off a new consumer.
103         if (remainingMessages.getAndIncrement() == 0) {
104             scope.launch {
105                 // We shouldn't have started a new consumer unless there are remaining messages...
106                 check(remainingMessages.get() > 0)
107 
108                 do {
109                     // We don't want to try to consume a new message unless we are still active.
110                     // If ensureActive throws, the scope is no longer active, so it doesn't
111                     // matter that we have remaining messages.
112                     scope.ensureActive()
113 
114                     consumeMessage(messageQueue.receive())
115                 } while (remainingMessages.decrementAndGet() != 0)
116             }
117         }
118     }
119 }
120