1 /* <lambda>null2 * Copyright 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.datastore.core 18 19 import kotlinx.coroutines.CoroutineScope 20 import kotlinx.coroutines.Job 21 import kotlinx.coroutines.channels.Channel 22 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED 23 import kotlinx.coroutines.channels.ClosedSendChannelException 24 import kotlinx.coroutines.channels.onClosed 25 import kotlinx.coroutines.ensureActive 26 import kotlinx.coroutines.launch 27 28 internal class SimpleActor<T>( 29 /** The scope in which to consume messages. */ 30 private val scope: CoroutineScope, 31 /** Function that will be called when scope is cancelled. Should *not* throw exceptions. */ 32 onComplete: (Throwable?) -> Unit, 33 /** 34 * Function that will be called for each element when the scope is cancelled. Should *not* throw 35 * exceptions. 36 */ 37 onUndeliveredElement: (T, Throwable?) -> Unit, 38 /** 39 * Function that will be called once for each message. 40 * 41 * Must *not* throw an exception (other than CancellationException if scope is cancelled). 42 */ 43 private val consumeMessage: suspend (T) -> Unit 44 ) { 45 private val messageQueue = Channel<T>(capacity = UNLIMITED) 46 47 /** 48 * Count of the number of remaining messages to process. When the messageQueue is closed, this 49 * is no longer used. 50 */ 51 private val remainingMessages = AtomicInt(0) 52 53 init { 54 // If the scope doesn't have a job, it won't be cancelled, so we don't need to register a 55 // callback. 56 scope.coroutineContext[Job]?.invokeOnCompletion { ex -> 57 onComplete(ex) 58 59 // TODO(rohitsat): replace this with Channel(onUndeliveredElement) when it 60 // is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/2435 61 62 messageQueue.close(ex) 63 64 while (true) { 65 messageQueue.tryReceive().getOrNull()?.let { msg -> onUndeliveredElement(msg, ex) } 66 ?: break 67 } 68 } 69 } 70 71 /** 72 * Sends a message to a message queue to be processed by [consumeMessage] in [scope]. 73 * 74 * If [offer] completes successfully, the msg *will* be processed either by consumeMessage or 75 * onUndeliveredElement. If [offer] throws an exception, the message may or may not be 76 * processed. 77 */ 78 fun offer(msg: T) { 79 /** 80 * Possible states: 81 * 1) remainingMessages = 0 All messages have been consumed, so there is no active consumer 82 * 2) remainingMessages > 0, no active consumer One of the senders is responsible for 83 * triggering the consumer 84 * 3) remainingMessages > 0, active consumer Consumer will continue to consume until 85 * remainingMessages is 0 86 * 4) messageQueue is closed, there are remaining messages to consume Attempts to offer 87 * messages will fail, onComplete() will consume remaining messages with onUndelivered. 88 * The Consumer has already completed since close() is called by onComplete(). 89 * 5) messageQueue is closed, there are no remaining messages to consume Attempts to offer 90 * messages will fail. 91 */ 92 93 // should never return false bc the channel capacity is unlimited 94 check( 95 messageQueue 96 .trySend(msg) 97 .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") } 98 .isSuccess 99 ) 100 101 // If the number of remaining messages was 0, there is no active consumer, since it quits 102 // consuming once remaining messages hits 0. We must kick off a new consumer. 103 if (remainingMessages.getAndIncrement() == 0) { 104 scope.launch { 105 // We shouldn't have started a new consumer unless there are remaining messages... 106 check(remainingMessages.get() > 0) 107 108 do { 109 // We don't want to try to consume a new message unless we are still active. 110 // If ensureActive throws, the scope is no longer active, so it doesn't 111 // matter that we have remaining messages. 112 scope.ensureActive() 113 114 consumeMessage(messageQueue.receive()) 115 } while (remainingMessages.decrementAndGet() != 0) 116 } 117 } 118 } 119 } 120