1 /*
2  * Copyright 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 // Parcelize object is testing internal implementation of datastore-core library
18 @file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
19 
20 package androidx.datastore.testapp.multiprocess
21 
22 import androidx.datastore.testapp.multiprocess.ipcActions.ReadTextAction
23 import androidx.datastore.testapp.multiprocess.ipcActions.SetTextAction
24 import androidx.datastore.testapp.multiprocess.ipcActions.StorageVariant
25 import androidx.datastore.testapp.multiprocess.ipcActions.createMultiProcessTestDatastore
26 import androidx.datastore.testapp.multiprocess.ipcActions.datastore
27 import androidx.datastore.testapp.twoWayIpc.CompositeServiceSubjectModel
28 import androidx.datastore.testapp.twoWayIpc.InterProcessCompletable
29 import androidx.datastore.testapp.twoWayIpc.IpcAction
30 import androidx.datastore.testapp.twoWayIpc.IpcUnit
31 import androidx.datastore.testapp.twoWayIpc.SubjectReadWriteProperty
32 import androidx.datastore.testapp.twoWayIpc.TwoWayIpcSubject
33 import androidx.datastore.testing.TestMessageProto.FooProto
34 import androidx.kruth.assertThat
35 import kotlin.time.Duration.Companion.seconds
36 import kotlinx.coroutines.TimeoutCancellationException
37 import kotlinx.coroutines.flow.SharingCommand
38 import kotlinx.coroutines.flow.StateFlow
39 import kotlinx.coroutines.flow.collect
40 import kotlinx.coroutines.flow.flow
41 import kotlinx.coroutines.flow.stateIn
42 import kotlinx.coroutines.flow.takeWhile
43 import kotlinx.coroutines.withTimeout
44 import kotlinx.parcelize.Parcelize
45 import org.junit.Rule
46 import org.junit.Test
47 import org.junit.rules.TemporaryFolder
48 import org.junit.runner.RunWith
49 import org.junit.runners.Parameterized
50 import org.junit.runners.Parameterized.Parameters
51 
52 @RunWith(Parameterized::class)
53 internal class MultipleDataStoresInMultipleProcessesTest(
54     private val storageVariant: StorageVariant,
55     /**
56      * if set to true, we'll run remote subjects in 2 different processes. if set to false, we'll
57      * run them on the same remote process.
58      */
59     private val useMultipleRemoteProcesses: Boolean,
60     /**
61      * If true, both datastores will be created in the same folder. If false, their parent folders
62      * will be different.
63      */
64     private val useTheSameParentFolder: Boolean,
65 ) {
66 
67     companion object {
68         @Suppress("TYPE_INTERSECTION_AS_REIFIED_WARNING", "unused") // test parameters
69         @get:JvmStatic
70         @get:Parameters(name = "storage_{0}_multipleProcesses={1}_sameParentFolder={2}")
<lambda>null71         val params = buildList {
72             for (storageVariant in StorageVariant.values()) {
73                 for (useMultipleProcesses in arrayOf(true, false)) {
74                     for (useTheSameParentFolder in arrayOf(true, false)) {
75                         add(arrayOf(storageVariant, useMultipleProcesses, useTheSameParentFolder))
76                     }
77                 }
78             }
79         }
80     }
81 
82     @get:Rule val multiProcessRule = MultiProcessTestRule()
83 
84     @get:Rule val tmpFolder = TemporaryFolder()
85 
86     @Test
testnull87     fun test() =
88         multiProcessRule.runTest {
89             // create subjects in 2 different processes.
90             // our main process serves as the subject that has the case of observing 2 different
91             // files
92             // in the same folder.
93             val (subject1, subject2) =
94                 if (useMultipleRemoteProcesses) {
95                     // create a process per remote subject
96                     multiProcessRule.createConnection().createSubject(this) to
97                         multiProcessRule.createConnection().createSubject(this)
98                 } else {
99                     // reuse the same remote process for both remote subjects
100                     val connection = multiProcessRule.createConnection()
101                     connection.createSubject(this) to connection.createSubject(this)
102                 }
103             val (file1, file2) =
104                 if (useTheSameParentFolder) {
105                     val parent = tmpFolder.newFolder()
106                     parent.resolve("ds1.pb") to parent.resolve("ds2.pb")
107                 } else {
108                     val parent1 = tmpFolder.newFolder()
109                     val parent2 = tmpFolder.newFolder()
110                     parent1.resolve("ds1.pb") to parent2.resolve("ds2.pb")
111                 }
112 
113             val datastore1 =
114                 createMultiProcessTestDatastore(
115                     filePath = file1.canonicalPath,
116                     storageVariant = storageVariant,
117                     hostDatastoreScope = multiProcessRule.datastoreScope,
118                     subjects = arrayOf(subject1)
119                 )
120             val datastore2 =
121                 createMultiProcessTestDatastore(
122                     filePath = file2.canonicalPath,
123                     storageVariant = storageVariant,
124                     hostDatastoreScope = multiProcessRule.datastoreScope,
125                     subjects = arrayOf(subject2)
126                 )
127             val ds1Value = datastore1.data.stateIn(multiProcessRule.datastoreScope)
128             val ds2Value = datastore2.data.stateIn(multiProcessRule.datastoreScope)
129             ds1Value.awaitValue("")
130             ds2Value.awaitValue("")
131             // simple assertions of host process reading the value after an after in the remote
132             // process
133             subject1.invokeInRemoteProcess(SetTextAction("ds1-version-1"))
134             subject2.invokeInRemoteProcess(SetTextAction("ds2-version-1"))
135             ds1Value.awaitValue("ds1-version-1")
136             ds2Value.awaitValue("ds2-version-1")
137 
138             // create an observer in subject1
139             val subject1Observer = ObserveFileAction()
140             subject1.invokeInRemoteProcess(subject1Observer)
141             subject1.assertRemoteObservedValue("ds1-version-1")
142 
143             // create an observer in subject2
144             val subject2Observer = ObserveFileAction()
145             subject2.invokeInRemoteProcess(subject2Observer)
146             subject2.assertRemoteObservedValue("ds2-version-1")
147 
148             // while the observers are active in the subjects, update the value in main process and
149             // ensure they get the new value
150             datastore1.updateData { it.toBuilder().setText("ds1-version-2").build() }
151             datastore2.updateData { it.toBuilder().setText("ds2-version-2").build() }
152             // everyone gets the value
153             ds1Value.awaitValue("ds1-version-2")
154             subject1.assertRemoteObservedValue("ds1-version-2")
155             ds2Value.awaitValue("ds2-version-2")
156             subject2.assertRemoteObservedValue("ds2-version-2")
157 
158             // stop subject 1, it should not get the update
159             subject1Observer.stopObserving.complete(subject1, IpcUnit)
160             subject1Observer.stoppedObserving.await(subject1)
161             subject1.invokeInRemoteProcess(SetTextAction(value = "ds1-version-3"))
162             ds1Value.awaitValue("ds1-version-3")
163             // observation is stopped so the observed value should stay the same
164             assertThat(subject1.invokeInRemoteProcess(ReadRemoteObservedValue()).value)
165                 .isEqualTo("ds1-version-2")
166             // a new observer in subject1 process would see the new value
167             assertThat(subject1.invokeInRemoteProcess(ReadTextAction()).value)
168                 .isEqualTo("ds1-version-3")
169             // make sure the observer for the other process is still working well even after we
170             // stopped
171             // the observer in process 1
172             subject2.invokeInRemoteProcess(SetTextAction("ds2-version-3"))
173             ds2Value.awaitValue("ds2-version-3")
174             subject2.assertRemoteObservedValue("ds2-version-3")
175             datastore2.updateData { it.toBuilder().setText("ds2-version-4").build() }
176             subject2.assertRemoteObservedValue("ds2-version-4")
177         }
178 }
179 
180 /** key used in test to keep a StateFlow of the datastore value */
181 private val REMOTE_OBSERVER_KEY = CompositeServiceSubjectModel.Key<StateFlow<FooProto>>()
182 
183 /** The StateFlow value for test that is created by the [ObserveFileAction]. */
184 private var TwoWayIpcSubject.remoteProcessStateFlow by SubjectReadWriteProperty(REMOTE_OBSERVER_KEY)
185 
186 /**
187  * An IPC action that will create a StateFlow of the DataStore value and keep it active until
188  * [stopObserving] is completed. The value of that StateFlow ([remoteProcessStateFlow]) can be
189  * asserted via [AssertRemoteObservedValue] or read via [ReadRemoteObservedValue].
190  *
191  * @see AssertRemoteObservedValue
192  * @see ReadRemoteObservedValue
193  */
194 @Parcelize
195 internal class ObserveFileAction(
196     /** When completed, we'll stop the StateFlow */
197     val stopObserving: InterProcessCompletable<IpcUnit> = InterProcessCompletable(),
198     /** We'll complete this action when the StateFlow is stopped. */
199     val stoppedObserving: InterProcessCompletable<IpcUnit> = InterProcessCompletable(),
200 ) : IpcAction<IpcUnit>() {
invokeInRemoteProcessnull201     override suspend fun invokeInRemoteProcess(subject: TwoWayIpcSubject): IpcUnit {
202         subject.remoteProcessStateFlow =
203             subject.datastore.data.stateIn(
204                 subject.datastoreScope,
205                 started = {
206                     flow {
207                         // immediately start observing
208                         emit(SharingCommand.START)
209                         // wait until stop observing is called
210                         stopObserving.await(subject)
211                         // stop observing
212                         emit(SharingCommand.STOP)
213                         stoppedObserving.complete(subject, IpcUnit)
214                     }
215                 },
216                 initialValue = FooProto.getDefaultInstance()
217             )
218         return IpcUnit
219     }
220 }
221 
222 /** Asserts the value of the [remoteProcessStateFlow] by waiting it to dispatch [expectedValue]. */
223 @Parcelize
224 internal class AssertRemoteObservedValue(
225     private val expectedValue: String,
226 ) : IpcAction<IpcUnit>() {
invokeInRemoteProcessnull227     override suspend fun invokeInRemoteProcess(subject: TwoWayIpcSubject): IpcUnit {
228         subject.remoteProcessStateFlow.awaitValue(expectedValue)
229         return IpcUnit
230     }
231 }
232 
233 /** Reads the current value of [remoteProcessStateFlow]. */
234 @Parcelize
235 internal class ReadRemoteObservedValue : IpcAction<ReadTextAction.TextValue>() {
invokeInRemoteProcessnull236     override suspend fun invokeInRemoteProcess(
237         subject: TwoWayIpcSubject
238     ): ReadTextAction.TextValue {
239         return ReadTextAction.TextValue(subject.remoteProcessStateFlow.value.text)
240     }
241 }
242 
243 /**
244  * Collects [this] until the [StateFlow.value] is equal to [value].
245  *
246  * @see assertRemoteObservedValue
247  */
awaitValuenull248 private suspend fun StateFlow<FooProto>.awaitValue(value: String) {
249     try {
250         // 5 seconds is what we use for IPC action timeouts, hence we pick a lower number
251         // here to get this timeout before the IPC
252         withTimeout(4.seconds) { this@awaitValue.takeWhile { it.text != value }.collect() }
253     } catch (timeout: TimeoutCancellationException) {
254         throw AssertionError(
255             """
256                 expected "$value" didn't arrive, current value: "${this@awaitValue.value.text}"
257             """
258                 .trimIndent()
259         )
260     }
261 }
262 
263 /**
264  * Asserts the value of [remoteProcessStateFlow] to be equal to [expectedValue]
265  *
266  * @see awaitValue
267  */
assertRemoteObservedValuenull268 private suspend fun TwoWayIpcSubject.assertRemoteObservedValue(expectedValue: String) {
269     invokeInRemoteProcess(AssertRemoteObservedValue(expectedValue = expectedValue))
270 }
271