1 /* 2 * Copyright 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 @file:JvmName("RxDataStore") 17 18 package androidx.datastore.rxjava3 19 20 import androidx.annotation.RestrictTo 21 import androidx.datastore.core.DataStore 22 import io.reactivex.rxjava3.core.Completable 23 import io.reactivex.rxjava3.core.Flowable 24 import io.reactivex.rxjava3.core.Single 25 import io.reactivex.rxjava3.disposables.Disposable 26 import io.reactivex.rxjava3.functions.Function 27 import kotlinx.coroutines.CoroutineScope 28 import kotlinx.coroutines.ExperimentalCoroutinesApi 29 import kotlinx.coroutines.Job 30 import kotlinx.coroutines.SupervisorJob 31 import kotlinx.coroutines.async 32 import kotlinx.coroutines.job 33 import kotlinx.coroutines.rx3.asCompletable 34 import kotlinx.coroutines.rx3.asFlowable 35 import kotlinx.coroutines.rx3.asSingle 36 import kotlinx.coroutines.rx3.await 37 38 /** A DataStore that supports RxJava operations on DataStore. */ 39 public class RxDataStore<T : Any> 40 private constructor( 41 /** The delegate DataStore. */ 42 private val delegateDs: DataStore<T>, 43 /** 44 * The CoroutineScope that the DataStore is created with. Must contain a Job to allow for 45 * cancellation. 46 */ 47 private val scope: CoroutineScope 48 ) : Disposable { 49 50 companion object { 51 /** Visible for datastore-preferences-rxjava2 artifact only */ 52 @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) createnull53 public fun <T : Any> create( 54 delegateDs: DataStore<T>, 55 scope: CoroutineScope 56 ): RxDataStore<T> { 57 return RxDataStore<T>(delegateDs, scope) 58 } 59 } 60 61 /** 62 * Dispose of the DataStore. Wait for the Completable returned by [shutdownComplete] to confirm 63 * that the DataStore has been shut down. 64 */ disposenull65 override fun dispose() = scope.coroutineContext.job.cancel() 66 67 /** Returns whether this DataStore is closed */ 68 override fun isDisposed(): Boolean = !scope.coroutineContext.job.isActive 69 70 /** 71 * Returns a completable that completes when the DataStore is completed. It is not safe to 72 * create a new DataStore with the same file name until this has completed. 73 */ 74 public fun shutdownComplete(): Completable = 75 scope.coroutineContext.job.asCompletable(scope.coroutineContext.minusKey(Job)) 76 77 /** 78 * Gets a reactivex.Flowable of the data from DataStore. See [DataStore.data] for more 79 * information. 80 * 81 * Provides efficient, cached (when possible) access to the latest durably persisted state. The 82 * flow will always either emit a value or throw an exception encountered when attempting to 83 * read from disk. If an exception is encountered, collecting again will attempt to read the 84 * data again. 85 * 86 * Do not layer a cache on top of this API: it will be be impossible to guarantee consistency. 87 * Instead, use data.first() to access a single snapshot. 88 * 89 * The Flowable will complete with an IOException when an exception is encountered when reading 90 * data. 91 * 92 * @return a flow representing the current state of the data 93 */ 94 @ExperimentalCoroutinesApi 95 public fun data(): Flowable<T> { 96 return delegateDs.data.asFlowable(scope.coroutineContext) 97 } 98 99 /** 100 * See [DataStore.updateData] 101 * 102 * Updates the data transactionally in an atomic read-modify-write operation. All operations are 103 * serialized, and the transform itself is a async so it can perform heavy work such as RPCs. 104 * 105 * The Single completes when the data has been persisted durably to disk (after which [data] 106 * will reflect the update). If the transform or write to disk fails, the transaction is aborted 107 * and the returned Single is completed with the error. 108 * 109 * The transform will be run on the scheduler that DataStore was constructed with. 110 * 111 * @return the snapshot returned by the transform 112 * @throws Exception when thrown by the transform function 113 */ 114 @ExperimentalCoroutinesApi updateDataAsyncnull115 public fun updateDataAsync(transform: Function<T, Single<T>>): Single<T> { 116 return scope 117 .async(SupervisorJob()) { delegateDs.updateData { transform.apply(it).await() } } 118 .asSingle(scope.coroutineContext.minusKey(Job)) 119 } 120 } 121