1 /*
<lambda>null2 * Copyright 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package androidx.datastore.core
18
19 import android.os.StrictMode
20 import androidx.datastore.TestFile
21 import androidx.datastore.TestIO
22 import androidx.datastore.TestingSerializerConfig
23 import androidx.datastore.core.handlers.NoOpCorruptionHandler
24 import androidx.test.filters.FlakyTest
25 import androidx.test.filters.LargeTest
26 import androidx.testutils.assertThrows
27 import com.google.common.truth.Truth.assertThat
28 import java.io.File
29 import java.io.InputStream
30 import java.io.OutputStream
31 import java.util.concurrent.Executors
32 import java.util.concurrent.atomic.AtomicBoolean
33 import java.util.concurrent.atomic.AtomicInteger
34 import kotlin.coroutines.AbstractCoroutineContextElement
35 import kotlin.coroutines.CoroutineContext
36 import kotlinx.coroutines.CancellationException
37 import kotlinx.coroutines.CompletableDeferred
38 import kotlinx.coroutines.CoroutineScope
39 import kotlinx.coroutines.DelicateCoroutinesApi
40 import kotlinx.coroutines.Dispatchers
41 import kotlinx.coroutines.ExperimentalCoroutinesApi
42 import kotlinx.coroutines.Job
43 import kotlinx.coroutines.asCoroutineDispatcher
44 import kotlinx.coroutines.async
45 import kotlinx.coroutines.awaitCancellation
46 import kotlinx.coroutines.cancel
47 import kotlinx.coroutines.cancelAndJoin
48 import kotlinx.coroutines.flow.first
49 import kotlinx.coroutines.flow.take
50 import kotlinx.coroutines.flow.toList
51 import kotlinx.coroutines.job
52 import kotlinx.coroutines.launch
53 import kotlinx.coroutines.newSingleThreadContext
54 import kotlinx.coroutines.runBlocking
55 import kotlinx.coroutines.test.TestScope
56 import kotlinx.coroutines.test.UnconfinedTestDispatcher
57 import kotlinx.coroutines.test.runCurrent
58 import kotlinx.coroutines.test.runTest
59 import kotlinx.coroutines.withContext
60 import kotlinx.coroutines.withTimeout
61 import org.junit.Before
62 import org.junit.Test
63 import org.junit.runner.RunWith
64 import org.junit.runners.JUnit4
65
66 /**
67 * A testing class based on duplicate from "SingleProcessDataStoreTest" that only tests the features
68 * in a single process use case. More tests are added for StrictMode.
69 */
70 @OptIn(DelicateCoroutinesApi::class)
71 @ExperimentalCoroutinesApi
72 @LargeTest
73 @RunWith(JUnit4::class)
74 abstract class MultiProcessDataStoreSingleProcessTest<F : TestFile<F>>(
75 protected val testIO: TestIO<F, *>
76 ) {
77 protected lateinit var store: DataStore<Byte>
78 private lateinit var serializerConfig: TestingSerializerConfig
79 protected lateinit var testFile: F
80 protected lateinit var tempFolder: F
81 protected lateinit var dataStoreScope: TestScope
82
83 abstract fun getJavaFile(file: F): File
84
85 private fun newDataStore(
86 file: F = testFile,
87 scope: CoroutineScope = dataStoreScope,
88 initTasksList: List<suspend (api: InitializerApi<Byte>) -> Unit> = listOf(),
89 corruptionHandler: CorruptionHandler<Byte> = NoOpCorruptionHandler<Byte>()
90 ): DataStore<Byte> {
91 return DataStoreImpl(
92 storage =
93 testIO.getStorage(
94 serializerConfig,
95 {
96 MultiProcessCoordinator(
97 dataStoreScope.coroutineContext,
98 getJavaFile(testFile)
99 )
100 }
101 ) {
102 file
103 },
104 scope = scope,
105 initTasksList = initTasksList,
106 corruptionHandler = corruptionHandler
107 )
108 }
109
110 @Before
111 fun setUp() {
112 serializerConfig = TestingSerializerConfig()
113 tempFolder = testIO.newTempFile().also { it.mkdirs() }
114 testFile = testIO.newTempFile(parentFile = tempFolder)
115 dataStoreScope = TestScope(UnconfinedTestDispatcher() + Job())
116 store =
117 testIO.getStore(
118 serializerConfig,
119 dataStoreScope,
120 { MultiProcessCoordinator(dataStoreScope.coroutineContext, getJavaFile(testFile)) }
121 ) {
122 testFile
123 }
124 }
125
126 @Test fun testReadNewMessage() = runTest { assertThat(store.data.first()).isEqualTo(0) }
127
128 @Test
129 fun testReadWithNewInstance() = runBlocking {
130 runTest {
131 val newStore = newDataStore(testFile, scope = backgroundScope)
132 newStore.updateData { 1 }
133 }
134 runTest {
135 val newStore = newDataStore(testFile, scope = backgroundScope)
136 assertThat(newStore.data.first()).isEqualTo(1)
137 }
138 }
139
140 @Test
141 fun testScopeCancelledWithActiveFlow() = runTest {
142 val storeScope = CoroutineScope(Job())
143 val dataStore = newDataStore(scope = storeScope)
144 val collection = async {
145 dataStore.data.take(2).collect {
146 // Do nothing, this will wait on another element which will never arrive
147 }
148 }
149
150 storeScope.cancel()
151 collection.join()
152
153 assertThat(collection.isCompleted).isTrue()
154 assertThat(collection.isActive).isFalse()
155 }
156
157 @Test
158 fun testWriteAndRead() = runTest {
159 store.updateData { 1 }
160 assertThat(store.data.first()).isEqualTo(1)
161 }
162
163 @Test
164 fun testWritesDontBlockReadsInSameProcess() = runTest {
165 val transformStarted = CompletableDeferred<Unit>()
166 val continueTransform = CompletableDeferred<Unit>()
167
168 val slowUpdate = async {
169 store.updateData {
170 transformStarted.complete(Unit)
171 continueTransform.await()
172 it.inc()
173 }
174 }
175 // Wait for the transform to begin.
176 transformStarted.await()
177
178 // Read is not blocked.
179 assertThat(store.data.first()).isEqualTo(0)
180
181 continueTransform.complete(Unit)
182 slowUpdate.await()
183
184 // After update completes, update runs, and read shows new data.
185 assertThat(store.data.first()).isEqualTo(1)
186 }
187
188 @Test
189 fun testWriteMultiple() = runTest {
190 store.updateData { 2 }
191
192 assertThat(store.data.first()).isEqualTo(2)
193
194 store.updateData { it.dec() }
195
196 assertThat(store.data.first()).isEqualTo(1)
197 }
198
199 @Test
200 fun testReadAfterTransientBadWrite() = runBlocking {
201 val file = testIO.newTempFile(tempFolder)
202 runTest {
203 val store = newDataStore(file, scope = backgroundScope)
204 store.updateData { 1 }
205 serializerConfig.failingWrite = true
206 assertThrows<IOException> { store.updateData { 2 } }
207 }
208
209 runTest {
210 val newStore = newDataStore(file, scope = backgroundScope)
211 assertThat(newStore.data.first()).isEqualTo(1)
212 }
213 }
214
215 @Test
216 fun testWriteToNonExistentDir() = runBlocking {
217 val fileInNonExistentDir = testIO.newTempFile(relativePath = "/this/does/not/exist/ds.txt")
218 assertThat(fileInNonExistentDir.exists()).isFalse()
219 assertThat(fileInNonExistentDir.parentFile()!!.exists()).isFalse()
220 runTest {
221 val newStore = newDataStore(fileInNonExistentDir, scope = backgroundScope)
222
223 newStore.updateData { 1 }
224
225 assertThat(newStore.data.first()).isEqualTo(1)
226 }
227
228 runTest {
229 val newStore = newDataStore(fileInNonExistentDir, scope = backgroundScope)
230 assertThat(newStore.data.first()).isEqualTo(1)
231 }
232 }
233
234 @Test
235 fun testReadFromNonExistentFile() = runTest {
236 val newStore = newDataStore(testFile)
237 assertThat(newStore.data.first()).isEqualTo(0)
238 }
239
240 @Test
241 fun testWriteToDirFails() = runTest {
242 val directoryFile =
243 testIO.newTempFile(relativePath = "this/is/a/directory").also { it.mkdirs() }
244
245 assertThat(directoryFile.isDirectory()).isTrue()
246
247 val newStore = newDataStore(directoryFile)
248 assertThrows<IOException> { newStore.data.first() }
249 }
250
251 @Test
252 fun testExceptionWhenCreatingFilePropagates() = runTest {
253 var failFileProducer = true
254
255 val fileProducer = {
256 if (failFileProducer) {
257 throw IOException("Exception when producing file")
258 }
259 testFile
260 }
261
262 val newStore =
263 testIO.getStore(
264 serializerConfig,
265 dataStoreScope,
266 {
267 MultiProcessCoordinator(
268 dataStoreScope.coroutineContext,
269 getJavaFile(fileProducer())
270 )
271 },
272 fileProducer
273 )
274
275 assertThrows<IOException> { newStore.data.first() }
276 .hasMessageThat()
277 .isEqualTo("Exception when producing file")
278
279 failFileProducer = false
280
281 assertThat(newStore.data.first()).isEqualTo(0)
282 }
283
284 @Test
285 fun testWriteTransformCancellation() = runTest {
286 val transform = CompletableDeferred<Byte>()
287
288 val write = async { store.updateData { transform.await() } }
289
290 assertThat(write.isCompleted).isFalse()
291
292 transform.cancel()
293
294 assertThrows<CancellationException> { write.await() }
295
296 // Check that the datastore's scope is still active:
297
298 assertThat(store.updateData { it.inc().inc() }).isEqualTo(2)
299 }
300
301 @Test
302 fun testWriteAfterTransientBadRead() = runTest {
303 testFile.write("")
304 assertThat(testFile.exists()).isTrue()
305
306 serializerConfig.failingRead = true
307
308 assertThrows<IOException> { store.data.first() }
309
310 serializerConfig.failingRead = false
311
312 store.updateData { 1 }
313 assertThat(store.data.first()).isEqualTo(1)
314 }
315
316 @Test
317 fun testWriteWithBadReadFails() = runTest {
318 testFile.write("")
319 assertThat(testFile.exists()).isTrue()
320
321 serializerConfig.failingRead = true
322
323 assertThrows<IOException> { store.updateData { 1 } }
324 }
325
326 @Test
327 fun testCancellingDataStoreScopePropagatesToWrites() =
328 runBlocking<Unit> {
329 val scope = CoroutineScope(Job())
330
331 val dataStore = newDataStore(scope = scope)
332
333 val latch = CompletableDeferred<Unit>()
334
335 val slowUpdate = async {
336 dataStore.updateData {
337 latch.await()
338 it.inc()
339 }
340 }
341
342 val notStartedUpdate = async { dataStore.updateData { it.inc() } }
343
344 scope.cancel()
345
346 assertThrows<CancellationException> { slowUpdate.await() }
347
348 assertThrows<CancellationException> { notStartedUpdate.await() }
349
350 assertThrows<CancellationException> { dataStore.updateData { 123 } }
351 }
352
353 @Test
354 fun testCancellingCallerScopePropagatesToWrites() =
355 runBlocking<Unit> {
356 val dsScope = CoroutineScope(Job())
357 val callerScope = CoroutineScope(Job())
358
359 val dataStore = newDataStore(scope = dsScope)
360
361 val latch = CompletableDeferred<Unit>()
362
363 // The ordering of the following are not guaranteed but I think they won't be flaky with
364 // Dispatchers.Unconfined
365 val awaitingCancellation =
366 callerScope.async(Dispatchers.Unconfined) {
367 dataStore.updateData { awaitCancellation() }
368 }
369
370 val started =
371 dsScope.async(Dispatchers.Unconfined) {
372 dataStore.updateData {
373 latch.await()
374 it.inc()
375 }
376 }
377
378 val notStarted =
379 callerScope.async(Dispatchers.Unconfined) { dataStore.updateData { it.inc() } }
380
381 callerScope.coroutineContext.job.cancelAndJoin()
382
383 assertThat(awaitingCancellation.isCancelled).isTrue()
384 assertThat(notStarted.isCancelled).isTrue()
385
386 // wait for coroutine to complete to prevent it from outliving the test, which is flaky
387 latch.complete(Unit)
388 started.await()
389 assertThat(dataStore.data.first()).isEqualTo(1)
390 }
391
392 @Test
393 fun testCanWriteFromInitTask() = runTest {
394 store = newDataStore(initTasksList = listOf { api -> api.updateData { 1 } })
395
396 assertThat(store.data.first()).isEqualTo(1)
397 }
398
399 @FlakyTest(bugId = 242765370)
400 @Test
401 fun testInitTaskFailsFirstTimeDueToReadFail() = runTest {
402 store = newDataStore(initTasksList = listOf { api -> api.updateData { 1 } })
403
404 serializerConfig.failingRead = true
405 assertThrows<IOException> { store.updateData { 2 } }
406
407 serializerConfig.failingRead = false
408 store.updateData { it.inc().inc() }
409
410 assertThat(store.data.first()).isEqualTo(3)
411 }
412
413 @Test
414 fun testInitTaskFailsFirstTimeDueToException() = runTest {
415 val failInit = AtomicBoolean(true)
416 store =
417 newDataStore(
418 initTasksList =
419 listOf { _ ->
420 if (failInit.get()) {
421 throw IOException("I was asked to fail init")
422 }
423 }
424 )
425 assertThrows<IOException> { store.updateData { 5 } }
426
427 failInit.set(false)
428
429 store.updateData { it.inc() }
430 assertThat(store.data.first()).isEqualTo(1)
431 }
432
433 @Test
434 fun testInitTaskOnlyRunsOnce() = runTest {
435 val count = AtomicInteger()
436 val newStore =
437 newDataStore(testFile, initTasksList = listOf { _ -> count.incrementAndGet() })
438
439 repeat(10) {
440 newStore.updateData { it.inc() }
441 newStore.data.first()
442 }
443
444 assertThat(count.get()).isEqualTo(1)
445 }
446
447 @Test
448 fun testWriteDuringInit() = runTest {
449 val continueInit = CompletableDeferred<Unit>()
450
451 store =
452 newDataStore(
453 initTasksList =
454 listOf { api ->
455 continueInit.await()
456 api.updateData { 1 }
457 }
458 )
459
460 val update = async {
461 store.updateData { b ->
462 assertThat(b).isEqualTo(1)
463 b
464 }
465 }
466
467 continueInit.complete(Unit)
468 update.await()
469
470 assertThat(store.data.first()).isEqualTo(1)
471 }
472
473 @Test
474 fun testCancelDuringInit() = runTest {
475 val continueInit = CompletableDeferred<Unit>()
476
477 store =
478 newDataStore(
479 initTasksList =
480 listOf { api ->
481 continueInit.await()
482 api.updateData { 1 }
483 }
484 )
485
486 val update = async { store.updateData { it } }
487
488 val read = async { store.data.first() }
489
490 update.cancel()
491 read.cancel()
492 continueInit.complete(Unit)
493
494 assertThrows<CancellationException> { update.await() }
495 assertThrows<CancellationException> { read.await() }
496
497 store.updateData { it.inc().inc() }
498
499 assertThat(store.data.first()).isEqualTo(3)
500 }
501
502 @Test
503 fun testConcurrentUpdatesInit() = runTest {
504 val continueUpdate = CompletableDeferred<Unit>()
505
506 val concurrentUpdateInitializer: suspend (InitializerApi<Byte>) -> Unit = { api ->
507 val update1 = async {
508 api.updateData {
509 continueUpdate.await()
510 it.inc().inc()
511 }
512 }
513 api.updateData { it.inc() }
514 update1.await()
515 }
516
517 store = newDataStore(initTasksList = listOf(concurrentUpdateInitializer))
518 val getData = async { store.data.first() }
519 continueUpdate.complete(Unit)
520
521 assertThat(getData.await()).isEqualTo(3)
522 }
523
524 @Test
525 fun testInitUpdateBlockRead() = runTest {
526 val continueInit = CompletableDeferred<Unit>()
527 val continueUpdate = CompletableDeferred<Unit>()
528
529 val updateInitializer: suspend (InitializerApi<Byte>) -> Unit = { api ->
530 api.updateData {
531 continueInit.await()
532 it.inc()
533 }
534 }
535
536 store = newDataStore(initTasksList = listOf(updateInitializer))
537 val getData = async { store.data.first() }
538 val updateData = async {
539 store.updateData {
540 continueUpdate.await()
541 it.inc()
542 }
543 }
544
545 assertThat(getData.isCompleted).isFalse()
546 assertThat(getData.isActive).isTrue()
547
548 continueInit.complete(Unit)
549 assertThat(getData.await()).isEqualTo(1)
550
551 assertThat(updateData.isCompleted).isFalse()
552 assertThat(updateData.isActive).isTrue()
553
554 continueUpdate.complete(Unit)
555 assertThat(updateData.await()).isEqualTo(2)
556 assertThat(store.data.first()).isEqualTo(2)
557 }
558
559 @Test
560 fun testUpdateSuccessfullyCommittedInit() = runTest {
561 var otherStorage: Byte = 123
562
563 val initializer: suspend (InitializerApi<Byte>) -> Unit = { api ->
564 api.updateData { otherStorage }
565 // Similar to cleanUp():
566 otherStorage = 0
567 }
568
569 val store = newDataStore(initTasksList = listOf(initializer))
570
571 serializerConfig.failingWrite = true
572 assertThrows<IOException> { store.data.first() }
573
574 serializerConfig.failingWrite = false
575 assertThat(store.data.first()).isEqualTo(123)
576 }
577
578 @Test
579 fun testInitApiUpdateThrowsAfterInitTasksComplete() = runTest {
580 var savedApi: InitializerApi<Byte>? = null
581
582 val initializer: suspend (InitializerApi<Byte>) -> Unit = { api -> savedApi = api }
583
584 val store = newDataStore(initTasksList = listOf(initializer))
585
586 assertThat(store.data.first()).isEqualTo(0)
587
588 assertThrows<IllegalStateException> { savedApi?.updateData { 123 } }
589 }
590
591 @Test
592 fun testFlowReceivesUpdates() = runTest {
593 val collectedBytes = mutableListOf<Byte>()
594
595 val flowCollectionJob = async { store.data.take(8).toList(collectedBytes) }
596
597 runCurrent()
598 repeat(7) { store.updateData { it.inc() } }
599
600 flowCollectionJob.join()
601
602 assertThat(collectedBytes).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
603 }
604
605 @Test
606 fun testMultipleFlowsReceiveData() = runTest {
607 val flowOf8 = store.data.take(8)
608
609 val bytesFromFirstCollect = mutableListOf<Byte>()
610 val bytesFromSecondCollect = mutableListOf<Byte>()
611
612 val flowCollection1 = async { flowOf8.toList(bytesFromFirstCollect) }
613
614 val flowCollection2 = async { flowOf8.toList(bytesFromSecondCollect) }
615
616 runCurrent()
617 repeat(7) { store.updateData { it.inc() } }
618
619 flowCollection1.join()
620 flowCollection2.join()
621
622 // This test only works because runTest ensures consistent behavior
623 // Otherwise, we cannot really expect the collector to read every single value
624 // (we provide eventual consistency, so it would also be OK if it missed some intermediate
625 // values as long as it received 7 at the end).
626 assertThat(bytesFromFirstCollect).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
627 assertThat(bytesFromSecondCollect).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
628 }
629
630 @Test
631 fun testExceptionInFlowDoesNotBreakUpstream() = runTest {
632 val flowOf8 = store.data.take(8)
633
634 val collectedBytes = mutableListOf<Byte>()
635
636 val failedFlowCollection = async {
637 assertThrows<Exception> {
638 flowOf8.collect { throw Exception("Failure while collecting") }
639 }
640 .hasMessageThat()
641 .contains("Failure while collecting")
642 }
643
644 val successfulFlowCollection = async { flowOf8.take(8).toList(collectedBytes) }
645
646 runCurrent()
647 repeat(7) { store.updateData { it.inc() } }
648
649 successfulFlowCollection.join()
650 failedFlowCollection.await()
651
652 assertThat(collectedBytes).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
653 }
654
655 @Test
656 fun testSlowConsumerDoesntBlockOtherConsumers() = runTest {
657 val flowOf8 = store.data.take(8)
658
659 val collectedBytes = mutableListOf<Byte>()
660
661 val flowCollection2 = async { flowOf8.toList(collectedBytes) }
662
663 val blockedCollection = async { flowOf8.collect { flowCollection2.await() } }
664
665 runCurrent()
666 repeat(15) { store.updateData { it.inc() } }
667
668 flowCollection2.await()
669 assertThat(collectedBytes).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
670
671 blockedCollection.await()
672 }
673
674 @Test
675 fun testHandlerNotCalledGoodData() = runBlocking {
676 runTest { newDataStore(testFile, scope = backgroundScope).updateData { 1 } }
677
678 runTest {
679 val testingHandler: TestingCorruptionHandler = TestingCorruptionHandler()
680 val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
681
682 newStore.updateData { 2 }
683 newStore.data.first()
684
685 assertThat(testingHandler.numCalls).isEqualTo(0)
686 }
687 }
688
689 @Test
690 fun handlerNotCalledNonCorruption() = runBlocking {
691 runTest { newDataStore(testFile, scope = backgroundScope).updateData { 1 } }
692
693 runTest {
694 val testingHandler = TestingCorruptionHandler()
695 serializerConfig.failingRead = true
696 val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
697
698 assertThrows<IOException> { newStore.updateData { 2 } }
699 assertThrows<IOException> { newStore.data.first() }
700
701 assertThat(testingHandler.numCalls).isEqualTo(0)
702 }
703 }
704
705 @Test
706 fun testHandlerCalledCorruptDataRead() = runBlocking {
707 runTest {
708 val newStore = newDataStore(testFile, scope = backgroundScope)
709 newStore.updateData { 1 } // Pre-seed the data so the file exists.
710 }
711
712 runTest {
713 val testingHandler: TestingCorruptionHandler = TestingCorruptionHandler()
714 serializerConfig.failReadWithCorruptionException = true
715 val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
716
717 assertThrows<IOException> { newStore.data.first() }
718 .hasMessageThat()
719 .contains("Handler thrown exception.")
720
721 assertThat(testingHandler.numCalls).isEqualTo(1)
722 }
723 }
724
725 @Test
726 fun testHandlerCalledCorruptDataWrite() = runBlocking {
727 runTest {
728 val newStore = newDataStore(file = testFile, scope = backgroundScope)
729 newStore.updateData { 1 }
730 }
731
732 runTest {
733 val testingHandler: TestingCorruptionHandler = TestingCorruptionHandler()
734 serializerConfig.failReadWithCorruptionException = true
735 val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
736
737 assertThrows<IOException> { newStore.updateData { 1 } }
738 .hasMessageThat()
739 .contains("Handler thrown exception.")
740
741 assertThat(testingHandler.numCalls).isEqualTo(1)
742 }
743 }
744
745 @Test
746 fun testHandlerReplaceData() = runBlocking {
747 runTest { newDataStore(file = testFile, scope = backgroundScope).updateData { 1 } }
748
749 runTest {
750 val testingHandler: TestingCorruptionHandler =
751 TestingCorruptionHandler(replaceWith = 10)
752 serializerConfig.failReadWithCorruptionException = true
753 val newStore =
754 newDataStore(
755 corruptionHandler = testingHandler,
756 file = testFile,
757 scope = backgroundScope
758 )
759
760 assertThat(newStore.data.first()).isEqualTo(10)
761 }
762 }
763
764 @Test
765 fun testDefaultValueUsedWhenNoDataOnDisk() = runTest {
766 val dataStore =
767 testIO.getStore(
768 TestingSerializerConfig(defaultValue = 99),
769 dataStoreScope,
770 { MultiProcessCoordinator(dataStoreScope.coroutineContext, getJavaFile(testFile)) }
771 ) {
772 testFile
773 }
774
775 assertThat(dataStore.data.first()).isEqualTo(99)
776 }
777
778 @Test
779 fun testTransformRunInCallersContext() =
780 runBlocking<Unit> {
781 suspend fun getContext(): CoroutineContext {
782 return kotlin.coroutines.coroutineContext
783 }
784
785 withContext(TestElement("123")) {
786 store.updateData {
787 val context = getContext()
788 assertThat(context[TestElement.Key]!!.name).isEqualTo("123")
789 it.inc()
790 }
791 }
792 }
793
794 private class TestElement(val name: String) : AbstractCoroutineContextElement(Key) {
795 companion object Key : CoroutineContext.Key<TestElement>
796 }
797
798 @Test
799 fun testCancelInflightWrite() =
800 doBlockingWithTimeout(1000) {
801 val myScope =
802 CoroutineScope(Job() + Executors.newSingleThreadExecutor().asCoroutineDispatcher())
803
804 val updateStarted = CompletableDeferred<Unit>()
805 myScope.launch {
806 store.updateData {
807 updateStarted.complete(Unit)
808 awaitCancellation()
809 }
810 }
811 updateStarted.await()
812 myScope.coroutineContext[Job]!!.cancelAndJoin()
813 }
814
815 @Test
816 fun testWrite_afterCanceledWrite_succeeds() =
817 doBlockingWithTimeout(1000) {
818 val myScope =
819 CoroutineScope(Job() + Executors.newSingleThreadExecutor().asCoroutineDispatcher())
820
821 val cancelNow = CompletableDeferred<Unit>()
822
823 myScope.launch {
824 store.updateData {
825 cancelNow.complete(Unit)
826 awaitCancellation()
827 }
828 }
829
830 cancelNow.await()
831 myScope.coroutineContext[Job]!!.cancelAndJoin()
832
833 store.updateData { 123 }
834 }
835
836 @Test
837 fun testWrite_fromOtherScope_doesntGetCancelledFromDifferentScope() =
838 doBlockingWithTimeout(1000) {
839 val otherScope = CoroutineScope(Job())
840
841 val callerScope = CoroutineScope(Job())
842
843 val firstUpdateStarted = CompletableDeferred<Unit>()
844 val finishFirstUpdate = CompletableDeferred<Byte>()
845
846 val firstUpdate =
847 otherScope.async(Dispatchers.Unconfined) {
848 store.updateData {
849 firstUpdateStarted.complete(Unit)
850 finishFirstUpdate.await()
851 }
852 }
853
854 callerScope.launch(Dispatchers.Unconfined) { store.updateData { awaitCancellation() } }
855
856 firstUpdateStarted.await()
857 callerScope.coroutineContext.job.cancelAndJoin()
858 finishFirstUpdate.complete(1)
859 firstUpdate.await()
860
861 // It's still usable:
862 assertThat(store.updateData { it.inc() }).isEqualTo(2)
863 }
864
865 @Test
866 fun testCreateDuplicateActiveDataStore() = runTest {
867 val file = testIO.newTempFile(parentFile = tempFolder)
868 val dataStore = newDataStore(file = file, scope = CoroutineScope(Job()))
869
870 dataStore.data.first()
871
872 val duplicateDataStore = newDataStore(file = file, scope = CoroutineScope(Job()))
873
874 assertThrows<IllegalStateException> { duplicateDataStore.data.first() }
875 }
876
877 @Test
878 fun testCreateDataStore_withSameFileAsInactiveDataStore() = runTest {
879 val file = testIO.newTempFile(parentFile = tempFolder)
880 val scope1 = CoroutineScope(Job())
881 val dataStore1 = newDataStore(file = file, scope = scope1)
882
883 dataStore1.data.first()
884
885 scope1.coroutineContext.job.cancelAndJoin()
886
887 val dataStore2 = newDataStore(file = file, scope = CoroutineScope(Job()))
888
889 // This shouldn't throw an exception bc the scope1 has been cancelled.
890 dataStore2.data.first()
891 }
892
893 @Test
894 fun testCreateDataStoreAndRead_withStrictMode() = runTest {
895 StrictMode.setThreadPolicy(
896 StrictMode.ThreadPolicy.Builder()
897 .detectDiskReads()
898 .detectDiskWrites()
899 .penaltyDeath()
900 .build()
901 )
902 val dataStore =
903 newDataStore(file = testFile, scope = CoroutineScope(newSingleThreadContext("test")))
904 assertThat(dataStore.data.first()).isEqualTo(0)
905 StrictMode.allowThreadDiskReads()
906 StrictMode.allowThreadDiskWrites()
907 }
908
909 @Test
910 fun testCreateDataStoreAndUpdate_withStrictMode() = runTest {
911 StrictMode.setThreadPolicy(
912 StrictMode.ThreadPolicy.Builder()
913 .detectDiskReads()
914 .detectDiskWrites()
915 .penaltyDeath()
916 .build()
917 )
918 val dataStore =
919 newDataStore(file = testFile, scope = CoroutineScope(newSingleThreadContext("test")))
920 dataStore.updateData { it.inc() }
921 assertThat(dataStore.data.first()).isEqualTo(1)
922 StrictMode.allowThreadDiskReads()
923 StrictMode.allowThreadDiskWrites()
924 }
925
926 @Test
927 fun testWriteSameValueSkipDisk() = runTest {
928 // write a non-default value to force a disk write
929 store.updateData { 10 }
930 assertThat(serializerConfig.writeCount).isEqualTo(1)
931
932 // write same value again
933 store.updateData { 10 }
934 assertThat(serializerConfig.writeCount).isEqualTo(1)
935 }
936
937 // Mutable wrapper around a byte
938 data class ByteWrapper(var byte: Byte) {
939 internal class ByteWrapperSerializer() : Serializer<ByteWrapper> {
940 private val delegate = TestingSerializer()
941
942 override val defaultValue = ByteWrapper(delegate.defaultValue)
943
944 override suspend fun readFrom(input: InputStream): ByteWrapper {
945 return ByteWrapper(delegate.readFrom(input))
946 }
947
948 override suspend fun writeTo(t: ByteWrapper, output: OutputStream) {
949 delegate.writeTo(t.byte, output)
950 }
951 }
952 }
953
954 private class TestingCorruptionHandler(private val replaceWith: Byte? = null) :
955 CorruptionHandler<Byte> {
956
957 @Volatile var numCalls = 0
958
959 override suspend fun handleCorruption(ex: CorruptionException): Byte {
960 numCalls++
961
962 replaceWith?.let {
963 return it
964 }
965
966 throw IOException("Handler thrown exception.")
967 }
968 }
969 }
970
doBlockingWithTimeoutnull971 fun doBlockingWithTimeout(ms: Long, block: suspend () -> Unit): Unit =
972 runBlocking<Unit> { withTimeout(ms) { block() } }
973