1 /*
2 * Copyright 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 // Parcelize object is testing internal implementation of datastore-core library
18 @file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
19
20 package androidx.datastore.testapp.multiprocess
21
22 import androidx.datastore.testapp.multiprocess.ipcActions.ReadTextAction
23 import androidx.datastore.testapp.multiprocess.ipcActions.SetTextAction
24 import androidx.datastore.testapp.multiprocess.ipcActions.StorageVariant
25 import androidx.datastore.testapp.multiprocess.ipcActions.createMultiProcessTestDatastore
26 import androidx.datastore.testapp.multiprocess.ipcActions.datastore
27 import androidx.datastore.testapp.twoWayIpc.CompositeServiceSubjectModel
28 import androidx.datastore.testapp.twoWayIpc.InterProcessCompletable
29 import androidx.datastore.testapp.twoWayIpc.IpcAction
30 import androidx.datastore.testapp.twoWayIpc.IpcUnit
31 import androidx.datastore.testapp.twoWayIpc.SubjectReadWriteProperty
32 import androidx.datastore.testapp.twoWayIpc.TwoWayIpcSubject
33 import androidx.datastore.testing.TestMessageProto.FooProto
34 import androidx.kruth.assertThat
35 import kotlin.time.Duration.Companion.seconds
36 import kotlinx.coroutines.TimeoutCancellationException
37 import kotlinx.coroutines.flow.SharingCommand
38 import kotlinx.coroutines.flow.StateFlow
39 import kotlinx.coroutines.flow.collect
40 import kotlinx.coroutines.flow.flow
41 import kotlinx.coroutines.flow.stateIn
42 import kotlinx.coroutines.flow.takeWhile
43 import kotlinx.coroutines.withTimeout
44 import kotlinx.parcelize.Parcelize
45 import org.junit.Rule
46 import org.junit.Test
47 import org.junit.rules.TemporaryFolder
48 import org.junit.runner.RunWith
49 import org.junit.runners.Parameterized
50 import org.junit.runners.Parameterized.Parameters
51
52 @RunWith(Parameterized::class)
53 internal class MultipleDataStoresInMultipleProcessesTest(
54 private val storageVariant: StorageVariant,
55 /**
56 * if set to true, we'll run remote subjects in 2 different processes. if set to false, we'll
57 * run them on the same remote process.
58 */
59 private val useMultipleRemoteProcesses: Boolean,
60 /**
61 * If true, both datastores will be created in the same folder. If false, their parent folders
62 * will be different.
63 */
64 private val useTheSameParentFolder: Boolean,
65 ) {
66
67 companion object {
68 @Suppress("TYPE_INTERSECTION_AS_REIFIED_WARNING", "unused") // test parameters
69 @get:JvmStatic
70 @get:Parameters(name = "storage_{0}_multipleProcesses={1}_sameParentFolder={2}")
<lambda>null71 val params = buildList {
72 for (storageVariant in StorageVariant.values()) {
73 for (useMultipleProcesses in arrayOf(true, false)) {
74 for (useTheSameParentFolder in arrayOf(true, false)) {
75 add(arrayOf(storageVariant, useMultipleProcesses, useTheSameParentFolder))
76 }
77 }
78 }
79 }
80 }
81
82 @get:Rule val multiProcessRule = MultiProcessTestRule()
83
84 @get:Rule val tmpFolder = TemporaryFolder()
85
86 @Test
testnull87 fun test() =
88 multiProcessRule.runTest {
89 // create subjects in 2 different processes.
90 // our main process serves as the subject that has the case of observing 2 different
91 // files
92 // in the same folder.
93 val (subject1, subject2) =
94 if (useMultipleRemoteProcesses) {
95 // create a process per remote subject
96 multiProcessRule.createConnection().createSubject(this) to
97 multiProcessRule.createConnection().createSubject(this)
98 } else {
99 // reuse the same remote process for both remote subjects
100 val connection = multiProcessRule.createConnection()
101 connection.createSubject(this) to connection.createSubject(this)
102 }
103 val (file1, file2) =
104 if (useTheSameParentFolder) {
105 val parent = tmpFolder.newFolder()
106 parent.resolve("ds1.pb") to parent.resolve("ds2.pb")
107 } else {
108 val parent1 = tmpFolder.newFolder()
109 val parent2 = tmpFolder.newFolder()
110 parent1.resolve("ds1.pb") to parent2.resolve("ds2.pb")
111 }
112
113 val datastore1 =
114 createMultiProcessTestDatastore(
115 filePath = file1.canonicalPath,
116 storageVariant = storageVariant,
117 hostDatastoreScope = multiProcessRule.datastoreScope,
118 subjects = arrayOf(subject1)
119 )
120 val datastore2 =
121 createMultiProcessTestDatastore(
122 filePath = file2.canonicalPath,
123 storageVariant = storageVariant,
124 hostDatastoreScope = multiProcessRule.datastoreScope,
125 subjects = arrayOf(subject2)
126 )
127 val ds1Value = datastore1.data.stateIn(multiProcessRule.datastoreScope)
128 val ds2Value = datastore2.data.stateIn(multiProcessRule.datastoreScope)
129 ds1Value.awaitValue("")
130 ds2Value.awaitValue("")
131 // simple assertions of host process reading the value after an after in the remote
132 // process
133 subject1.invokeInRemoteProcess(SetTextAction("ds1-version-1"))
134 subject2.invokeInRemoteProcess(SetTextAction("ds2-version-1"))
135 ds1Value.awaitValue("ds1-version-1")
136 ds2Value.awaitValue("ds2-version-1")
137
138 // create an observer in subject1
139 val subject1Observer = ObserveFileAction()
140 subject1.invokeInRemoteProcess(subject1Observer)
141 subject1.assertRemoteObservedValue("ds1-version-1")
142
143 // create an observer in subject2
144 val subject2Observer = ObserveFileAction()
145 subject2.invokeInRemoteProcess(subject2Observer)
146 subject2.assertRemoteObservedValue("ds2-version-1")
147
148 // while the observers are active in the subjects, update the value in main process and
149 // ensure they get the new value
150 datastore1.updateData { it.toBuilder().setText("ds1-version-2").build() }
151 datastore2.updateData { it.toBuilder().setText("ds2-version-2").build() }
152 // everyone gets the value
153 ds1Value.awaitValue("ds1-version-2")
154 subject1.assertRemoteObservedValue("ds1-version-2")
155 ds2Value.awaitValue("ds2-version-2")
156 subject2.assertRemoteObservedValue("ds2-version-2")
157
158 // stop subject 1, it should not get the update
159 subject1Observer.stopObserving.complete(subject1, IpcUnit)
160 subject1Observer.stoppedObserving.await(subject1)
161 subject1.invokeInRemoteProcess(SetTextAction(value = "ds1-version-3"))
162 ds1Value.awaitValue("ds1-version-3")
163 // observation is stopped so the observed value should stay the same
164 assertThat(subject1.invokeInRemoteProcess(ReadRemoteObservedValue()).value)
165 .isEqualTo("ds1-version-2")
166 // a new observer in subject1 process would see the new value
167 assertThat(subject1.invokeInRemoteProcess(ReadTextAction()).value)
168 .isEqualTo("ds1-version-3")
169 // make sure the observer for the other process is still working well even after we
170 // stopped
171 // the observer in process 1
172 subject2.invokeInRemoteProcess(SetTextAction("ds2-version-3"))
173 ds2Value.awaitValue("ds2-version-3")
174 subject2.assertRemoteObservedValue("ds2-version-3")
175 datastore2.updateData { it.toBuilder().setText("ds2-version-4").build() }
176 subject2.assertRemoteObservedValue("ds2-version-4")
177 }
178 }
179
180 /** key used in test to keep a StateFlow of the datastore value */
181 private val REMOTE_OBSERVER_KEY = CompositeServiceSubjectModel.Key<StateFlow<FooProto>>()
182
183 /** The StateFlow value for test that is created by the [ObserveFileAction]. */
184 private var TwoWayIpcSubject.remoteProcessStateFlow by SubjectReadWriteProperty(REMOTE_OBSERVER_KEY)
185
186 /**
187 * An IPC action that will create a StateFlow of the DataStore value and keep it active until
188 * [stopObserving] is completed. The value of that StateFlow ([remoteProcessStateFlow]) can be
189 * asserted via [AssertRemoteObservedValue] or read via [ReadRemoteObservedValue].
190 *
191 * @see AssertRemoteObservedValue
192 * @see ReadRemoteObservedValue
193 */
194 @Parcelize
195 internal class ObserveFileAction(
196 /** When completed, we'll stop the StateFlow */
197 val stopObserving: InterProcessCompletable<IpcUnit> = InterProcessCompletable(),
198 /** We'll complete this action when the StateFlow is stopped. */
199 val stoppedObserving: InterProcessCompletable<IpcUnit> = InterProcessCompletable(),
200 ) : IpcAction<IpcUnit>() {
invokeInRemoteProcessnull201 override suspend fun invokeInRemoteProcess(subject: TwoWayIpcSubject): IpcUnit {
202 subject.remoteProcessStateFlow =
203 subject.datastore.data.stateIn(
204 subject.datastoreScope,
205 started = {
206 flow {
207 // immediately start observing
208 emit(SharingCommand.START)
209 // wait until stop observing is called
210 stopObserving.await(subject)
211 // stop observing
212 emit(SharingCommand.STOP)
213 stoppedObserving.complete(subject, IpcUnit)
214 }
215 },
216 initialValue = FooProto.getDefaultInstance()
217 )
218 return IpcUnit
219 }
220 }
221
222 /** Asserts the value of the [remoteProcessStateFlow] by waiting it to dispatch [expectedValue]. */
223 @Parcelize
224 internal class AssertRemoteObservedValue(
225 private val expectedValue: String,
226 ) : IpcAction<IpcUnit>() {
invokeInRemoteProcessnull227 override suspend fun invokeInRemoteProcess(subject: TwoWayIpcSubject): IpcUnit {
228 subject.remoteProcessStateFlow.awaitValue(expectedValue)
229 return IpcUnit
230 }
231 }
232
233 /** Reads the current value of [remoteProcessStateFlow]. */
234 @Parcelize
235 internal class ReadRemoteObservedValue : IpcAction<ReadTextAction.TextValue>() {
invokeInRemoteProcessnull236 override suspend fun invokeInRemoteProcess(
237 subject: TwoWayIpcSubject
238 ): ReadTextAction.TextValue {
239 return ReadTextAction.TextValue(subject.remoteProcessStateFlow.value.text)
240 }
241 }
242
243 /**
244 * Collects [this] until the [StateFlow.value] is equal to [value].
245 *
246 * @see assertRemoteObservedValue
247 */
awaitValuenull248 private suspend fun StateFlow<FooProto>.awaitValue(value: String) {
249 try {
250 // 5 seconds is what we use for IPC action timeouts, hence we pick a lower number
251 // here to get this timeout before the IPC
252 withTimeout(4.seconds) { this@awaitValue.takeWhile { it.text != value }.collect() }
253 } catch (timeout: TimeoutCancellationException) {
254 throw AssertionError(
255 """
256 expected "$value" didn't arrive, current value: "${this@awaitValue.value.text}"
257 """
258 .trimIndent()
259 )
260 }
261 }
262
263 /**
264 * Asserts the value of [remoteProcessStateFlow] to be equal to [expectedValue]
265 *
266 * @see awaitValue
267 */
assertRemoteObservedValuenull268 private suspend fun TwoWayIpcSubject.assertRemoteObservedValue(expectedValue: String) {
269 invokeInRemoteProcess(AssertRemoteObservedValue(expectedValue = expectedValue))
270 }
271