1 /*
2  * Copyright 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 @file:JvmName("RxDataStore")
17 
18 package androidx.datastore.rxjava3
19 
20 import androidx.annotation.RestrictTo
21 import androidx.datastore.core.DataStore
22 import io.reactivex.rxjava3.core.Completable
23 import io.reactivex.rxjava3.core.Flowable
24 import io.reactivex.rxjava3.core.Single
25 import io.reactivex.rxjava3.disposables.Disposable
26 import io.reactivex.rxjava3.functions.Function
27 import kotlinx.coroutines.CoroutineScope
28 import kotlinx.coroutines.ExperimentalCoroutinesApi
29 import kotlinx.coroutines.Job
30 import kotlinx.coroutines.SupervisorJob
31 import kotlinx.coroutines.async
32 import kotlinx.coroutines.job
33 import kotlinx.coroutines.rx3.asCompletable
34 import kotlinx.coroutines.rx3.asFlowable
35 import kotlinx.coroutines.rx3.asSingle
36 import kotlinx.coroutines.rx3.await
37 
38 /** A DataStore that supports RxJava operations on DataStore. */
39 public class RxDataStore<T : Any>
40 private constructor(
41     /** The delegate DataStore. */
42     private val delegateDs: DataStore<T>,
43     /**
44      * The CoroutineScope that the DataStore is created with. Must contain a Job to allow for
45      * cancellation.
46      */
47     private val scope: CoroutineScope
48 ) : Disposable {
49 
50     companion object {
51         /** Visible for datastore-preferences-rxjava2 artifact only */
52         @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
createnull53         public fun <T : Any> create(
54             delegateDs: DataStore<T>,
55             scope: CoroutineScope
56         ): RxDataStore<T> {
57             return RxDataStore<T>(delegateDs, scope)
58         }
59     }
60 
61     /**
62      * Dispose of the DataStore. Wait for the Completable returned by [shutdownComplete] to confirm
63      * that the DataStore has been shut down.
64      */
disposenull65     override fun dispose() = scope.coroutineContext.job.cancel()
66 
67     /** Returns whether this DataStore is closed */
68     override fun isDisposed(): Boolean = !scope.coroutineContext.job.isActive
69 
70     /**
71      * Returns a completable that completes when the DataStore is completed. It is not safe to
72      * create a new DataStore with the same file name until this has completed.
73      */
74     public fun shutdownComplete(): Completable =
75         scope.coroutineContext.job.asCompletable(scope.coroutineContext.minusKey(Job))
76 
77     /**
78      * Gets a reactivex.Flowable of the data from DataStore. See [DataStore.data] for more
79      * information.
80      *
81      * Provides efficient, cached (when possible) access to the latest durably persisted state. The
82      * flow will always either emit a value or throw an exception encountered when attempting to
83      * read from disk. If an exception is encountered, collecting again will attempt to read the
84      * data again.
85      *
86      * Do not layer a cache on top of this API: it will be be impossible to guarantee consistency.
87      * Instead, use data.first() to access a single snapshot.
88      *
89      * The Flowable will complete with an IOException when an exception is encountered when reading
90      * data.
91      *
92      * @return a flow representing the current state of the data
93      */
94     @ExperimentalCoroutinesApi
95     public fun data(): Flowable<T> {
96         return delegateDs.data.asFlowable(scope.coroutineContext)
97     }
98 
99     /**
100      * See [DataStore.updateData]
101      *
102      * Updates the data transactionally in an atomic read-modify-write operation. All operations are
103      * serialized, and the transform itself is a async so it can perform heavy work such as RPCs.
104      *
105      * The Single completes when the data has been persisted durably to disk (after which [data]
106      * will reflect the update). If the transform or write to disk fails, the transaction is aborted
107      * and the returned Single is completed with the error.
108      *
109      * The transform will be run on the scheduler that DataStore was constructed with.
110      *
111      * @return the snapshot returned by the transform
112      * @throws Exception when thrown by the transform function
113      */
114     @ExperimentalCoroutinesApi
updateDataAsyncnull115     public fun updateDataAsync(transform: Function<T, Single<T>>): Single<T> {
116         return scope
117             .async(SupervisorJob()) { delegateDs.updateData { transform.apply(it).await() } }
118             .asSingle(scope.coroutineContext.minusKey(Job))
119     }
120 }
121