1 /*
<lambda>null2  * Copyright 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.datastore.core
18 
19 import android.os.StrictMode
20 import androidx.datastore.TestFile
21 import androidx.datastore.TestIO
22 import androidx.datastore.TestingSerializerConfig
23 import androidx.datastore.core.handlers.NoOpCorruptionHandler
24 import androidx.test.filters.FlakyTest
25 import androidx.test.filters.LargeTest
26 import androidx.testutils.assertThrows
27 import com.google.common.truth.Truth.assertThat
28 import java.io.File
29 import java.io.InputStream
30 import java.io.OutputStream
31 import java.util.concurrent.Executors
32 import java.util.concurrent.atomic.AtomicBoolean
33 import java.util.concurrent.atomic.AtomicInteger
34 import kotlin.coroutines.AbstractCoroutineContextElement
35 import kotlin.coroutines.CoroutineContext
36 import kotlinx.coroutines.CancellationException
37 import kotlinx.coroutines.CompletableDeferred
38 import kotlinx.coroutines.CoroutineScope
39 import kotlinx.coroutines.DelicateCoroutinesApi
40 import kotlinx.coroutines.Dispatchers
41 import kotlinx.coroutines.ExperimentalCoroutinesApi
42 import kotlinx.coroutines.Job
43 import kotlinx.coroutines.asCoroutineDispatcher
44 import kotlinx.coroutines.async
45 import kotlinx.coroutines.awaitCancellation
46 import kotlinx.coroutines.cancel
47 import kotlinx.coroutines.cancelAndJoin
48 import kotlinx.coroutines.flow.first
49 import kotlinx.coroutines.flow.take
50 import kotlinx.coroutines.flow.toList
51 import kotlinx.coroutines.job
52 import kotlinx.coroutines.launch
53 import kotlinx.coroutines.newSingleThreadContext
54 import kotlinx.coroutines.runBlocking
55 import kotlinx.coroutines.test.TestScope
56 import kotlinx.coroutines.test.UnconfinedTestDispatcher
57 import kotlinx.coroutines.test.runCurrent
58 import kotlinx.coroutines.test.runTest
59 import kotlinx.coroutines.withContext
60 import kotlinx.coroutines.withTimeout
61 import org.junit.Before
62 import org.junit.Test
63 import org.junit.runner.RunWith
64 import org.junit.runners.JUnit4
65 
66 /**
67  * A testing class based on duplicate from "SingleProcessDataStoreTest" that only tests the features
68  * in a single process use case. More tests are added for StrictMode.
69  */
70 @OptIn(DelicateCoroutinesApi::class)
71 @ExperimentalCoroutinesApi
72 @LargeTest
73 @RunWith(JUnit4::class)
74 abstract class MultiProcessDataStoreSingleProcessTest<F : TestFile<F>>(
75     protected val testIO: TestIO<F, *>
76 ) {
77     protected lateinit var store: DataStore<Byte>
78     private lateinit var serializerConfig: TestingSerializerConfig
79     protected lateinit var testFile: F
80     protected lateinit var tempFolder: F
81     protected lateinit var dataStoreScope: TestScope
82 
83     abstract fun getJavaFile(file: F): File
84 
85     private fun newDataStore(
86         file: F = testFile,
87         scope: CoroutineScope = dataStoreScope,
88         initTasksList: List<suspend (api: InitializerApi<Byte>) -> Unit> = listOf(),
89         corruptionHandler: CorruptionHandler<Byte> = NoOpCorruptionHandler<Byte>()
90     ): DataStore<Byte> {
91         return DataStoreImpl(
92             storage =
93                 testIO.getStorage(
94                     serializerConfig,
95                     {
96                         MultiProcessCoordinator(
97                             dataStoreScope.coroutineContext,
98                             getJavaFile(testFile)
99                         )
100                     }
101                 ) {
102                     file
103                 },
104             scope = scope,
105             initTasksList = initTasksList,
106             corruptionHandler = corruptionHandler
107         )
108     }
109 
110     @Before
111     fun setUp() {
112         serializerConfig = TestingSerializerConfig()
113         tempFolder = testIO.newTempFile().also { it.mkdirs() }
114         testFile = testIO.newTempFile(parentFile = tempFolder)
115         dataStoreScope = TestScope(UnconfinedTestDispatcher() + Job())
116         store =
117             testIO.getStore(
118                 serializerConfig,
119                 dataStoreScope,
120                 { MultiProcessCoordinator(dataStoreScope.coroutineContext, getJavaFile(testFile)) }
121             ) {
122                 testFile
123             }
124     }
125 
126     @Test fun testReadNewMessage() = runTest { assertThat(store.data.first()).isEqualTo(0) }
127 
128     @Test
129     fun testReadWithNewInstance() = runBlocking {
130         runTest {
131             val newStore = newDataStore(testFile, scope = backgroundScope)
132             newStore.updateData { 1 }
133         }
134         runTest {
135             val newStore = newDataStore(testFile, scope = backgroundScope)
136             assertThat(newStore.data.first()).isEqualTo(1)
137         }
138     }
139 
140     @Test
141     fun testScopeCancelledWithActiveFlow() = runTest {
142         val storeScope = CoroutineScope(Job())
143         val dataStore = newDataStore(scope = storeScope)
144         val collection = async {
145             dataStore.data.take(2).collect {
146                 // Do nothing, this will wait on another element which will never arrive
147             }
148         }
149 
150         storeScope.cancel()
151         collection.join()
152 
153         assertThat(collection.isCompleted).isTrue()
154         assertThat(collection.isActive).isFalse()
155     }
156 
157     @Test
158     fun testWriteAndRead() = runTest {
159         store.updateData { 1 }
160         assertThat(store.data.first()).isEqualTo(1)
161     }
162 
163     @Test
164     fun testWritesDontBlockReadsInSameProcess() = runTest {
165         val transformStarted = CompletableDeferred<Unit>()
166         val continueTransform = CompletableDeferred<Unit>()
167 
168         val slowUpdate = async {
169             store.updateData {
170                 transformStarted.complete(Unit)
171                 continueTransform.await()
172                 it.inc()
173             }
174         }
175         // Wait for the transform to begin.
176         transformStarted.await()
177 
178         // Read is not blocked.
179         assertThat(store.data.first()).isEqualTo(0)
180 
181         continueTransform.complete(Unit)
182         slowUpdate.await()
183 
184         // After update completes, update runs, and read shows new data.
185         assertThat(store.data.first()).isEqualTo(1)
186     }
187 
188     @Test
189     fun testWriteMultiple() = runTest {
190         store.updateData { 2 }
191 
192         assertThat(store.data.first()).isEqualTo(2)
193 
194         store.updateData { it.dec() }
195 
196         assertThat(store.data.first()).isEqualTo(1)
197     }
198 
199     @Test
200     fun testReadAfterTransientBadWrite() = runBlocking {
201         val file = testIO.newTempFile(tempFolder)
202         runTest {
203             val store = newDataStore(file, scope = backgroundScope)
204             store.updateData { 1 }
205             serializerConfig.failingWrite = true
206             assertThrows<IOException> { store.updateData { 2 } }
207         }
208 
209         runTest {
210             val newStore = newDataStore(file, scope = backgroundScope)
211             assertThat(newStore.data.first()).isEqualTo(1)
212         }
213     }
214 
215     @Test
216     fun testWriteToNonExistentDir() = runBlocking {
217         val fileInNonExistentDir = testIO.newTempFile(relativePath = "/this/does/not/exist/ds.txt")
218         assertThat(fileInNonExistentDir.exists()).isFalse()
219         assertThat(fileInNonExistentDir.parentFile()!!.exists()).isFalse()
220         runTest {
221             val newStore = newDataStore(fileInNonExistentDir, scope = backgroundScope)
222 
223             newStore.updateData { 1 }
224 
225             assertThat(newStore.data.first()).isEqualTo(1)
226         }
227 
228         runTest {
229             val newStore = newDataStore(fileInNonExistentDir, scope = backgroundScope)
230             assertThat(newStore.data.first()).isEqualTo(1)
231         }
232     }
233 
234     @Test
235     fun testReadFromNonExistentFile() = runTest {
236         val newStore = newDataStore(testFile)
237         assertThat(newStore.data.first()).isEqualTo(0)
238     }
239 
240     @Test
241     fun testWriteToDirFails() = runTest {
242         val directoryFile =
243             testIO.newTempFile(relativePath = "this/is/a/directory").also { it.mkdirs() }
244 
245         assertThat(directoryFile.isDirectory()).isTrue()
246 
247         val newStore = newDataStore(directoryFile)
248         assertThrows<IOException> { newStore.data.first() }
249     }
250 
251     @Test
252     fun testExceptionWhenCreatingFilePropagates() = runTest {
253         var failFileProducer = true
254 
255         val fileProducer = {
256             if (failFileProducer) {
257                 throw IOException("Exception when producing file")
258             }
259             testFile
260         }
261 
262         val newStore =
263             testIO.getStore(
264                 serializerConfig,
265                 dataStoreScope,
266                 {
267                     MultiProcessCoordinator(
268                         dataStoreScope.coroutineContext,
269                         getJavaFile(fileProducer())
270                     )
271                 },
272                 fileProducer
273             )
274 
275         assertThrows<IOException> { newStore.data.first() }
276             .hasMessageThat()
277             .isEqualTo("Exception when producing file")
278 
279         failFileProducer = false
280 
281         assertThat(newStore.data.first()).isEqualTo(0)
282     }
283 
284     @Test
285     fun testWriteTransformCancellation() = runTest {
286         val transform = CompletableDeferred<Byte>()
287 
288         val write = async { store.updateData { transform.await() } }
289 
290         assertThat(write.isCompleted).isFalse()
291 
292         transform.cancel()
293 
294         assertThrows<CancellationException> { write.await() }
295 
296         // Check that the datastore's scope is still active:
297 
298         assertThat(store.updateData { it.inc().inc() }).isEqualTo(2)
299     }
300 
301     @Test
302     fun testWriteAfterTransientBadRead() = runTest {
303         testFile.write("")
304         assertThat(testFile.exists()).isTrue()
305 
306         serializerConfig.failingRead = true
307 
308         assertThrows<IOException> { store.data.first() }
309 
310         serializerConfig.failingRead = false
311 
312         store.updateData { 1 }
313         assertThat(store.data.first()).isEqualTo(1)
314     }
315 
316     @Test
317     fun testWriteWithBadReadFails() = runTest {
318         testFile.write("")
319         assertThat(testFile.exists()).isTrue()
320 
321         serializerConfig.failingRead = true
322 
323         assertThrows<IOException> { store.updateData { 1 } }
324     }
325 
326     @Test
327     fun testCancellingDataStoreScopePropagatesToWrites() =
328         runBlocking<Unit> {
329             val scope = CoroutineScope(Job())
330 
331             val dataStore = newDataStore(scope = scope)
332 
333             val latch = CompletableDeferred<Unit>()
334 
335             val slowUpdate = async {
336                 dataStore.updateData {
337                     latch.await()
338                     it.inc()
339                 }
340             }
341 
342             val notStartedUpdate = async { dataStore.updateData { it.inc() } }
343 
344             scope.cancel()
345 
346             assertThrows<CancellationException> { slowUpdate.await() }
347 
348             assertThrows<CancellationException> { notStartedUpdate.await() }
349 
350             assertThrows<CancellationException> { dataStore.updateData { 123 } }
351         }
352 
353     @Test
354     fun testCancellingCallerScopePropagatesToWrites() =
355         runBlocking<Unit> {
356             val dsScope = CoroutineScope(Job())
357             val callerScope = CoroutineScope(Job())
358 
359             val dataStore = newDataStore(scope = dsScope)
360 
361             val latch = CompletableDeferred<Unit>()
362 
363             // The ordering of the following are not guaranteed but I think they won't be flaky with
364             // Dispatchers.Unconfined
365             val awaitingCancellation =
366                 callerScope.async(Dispatchers.Unconfined) {
367                     dataStore.updateData { awaitCancellation() }
368                 }
369 
370             val started =
371                 dsScope.async(Dispatchers.Unconfined) {
372                     dataStore.updateData {
373                         latch.await()
374                         it.inc()
375                     }
376                 }
377 
378             val notStarted =
379                 callerScope.async(Dispatchers.Unconfined) { dataStore.updateData { it.inc() } }
380 
381             callerScope.coroutineContext.job.cancelAndJoin()
382 
383             assertThat(awaitingCancellation.isCancelled).isTrue()
384             assertThat(notStarted.isCancelled).isTrue()
385 
386             // wait for coroutine to complete to prevent it from outliving the test, which is flaky
387             latch.complete(Unit)
388             started.await()
389             assertThat(dataStore.data.first()).isEqualTo(1)
390         }
391 
392     @Test
393     fun testCanWriteFromInitTask() = runTest {
394         store = newDataStore(initTasksList = listOf { api -> api.updateData { 1 } })
395 
396         assertThat(store.data.first()).isEqualTo(1)
397     }
398 
399     @FlakyTest(bugId = 242765370)
400     @Test
401     fun testInitTaskFailsFirstTimeDueToReadFail() = runTest {
402         store = newDataStore(initTasksList = listOf { api -> api.updateData { 1 } })
403 
404         serializerConfig.failingRead = true
405         assertThrows<IOException> { store.updateData { 2 } }
406 
407         serializerConfig.failingRead = false
408         store.updateData { it.inc().inc() }
409 
410         assertThat(store.data.first()).isEqualTo(3)
411     }
412 
413     @Test
414     fun testInitTaskFailsFirstTimeDueToException() = runTest {
415         val failInit = AtomicBoolean(true)
416         store =
417             newDataStore(
418                 initTasksList =
419                     listOf { _ ->
420                         if (failInit.get()) {
421                             throw IOException("I was asked to fail init")
422                         }
423                     }
424             )
425         assertThrows<IOException> { store.updateData { 5 } }
426 
427         failInit.set(false)
428 
429         store.updateData { it.inc() }
430         assertThat(store.data.first()).isEqualTo(1)
431     }
432 
433     @Test
434     fun testInitTaskOnlyRunsOnce() = runTest {
435         val count = AtomicInteger()
436         val newStore =
437             newDataStore(testFile, initTasksList = listOf { _ -> count.incrementAndGet() })
438 
439         repeat(10) {
440             newStore.updateData { it.inc() }
441             newStore.data.first()
442         }
443 
444         assertThat(count.get()).isEqualTo(1)
445     }
446 
447     @Test
448     fun testWriteDuringInit() = runTest {
449         val continueInit = CompletableDeferred<Unit>()
450 
451         store =
452             newDataStore(
453                 initTasksList =
454                     listOf { api ->
455                         continueInit.await()
456                         api.updateData { 1 }
457                     }
458             )
459 
460         val update = async {
461             store.updateData { b ->
462                 assertThat(b).isEqualTo(1)
463                 b
464             }
465         }
466 
467         continueInit.complete(Unit)
468         update.await()
469 
470         assertThat(store.data.first()).isEqualTo(1)
471     }
472 
473     @Test
474     fun testCancelDuringInit() = runTest {
475         val continueInit = CompletableDeferred<Unit>()
476 
477         store =
478             newDataStore(
479                 initTasksList =
480                     listOf { api ->
481                         continueInit.await()
482                         api.updateData { 1 }
483                     }
484             )
485 
486         val update = async { store.updateData { it } }
487 
488         val read = async { store.data.first() }
489 
490         update.cancel()
491         read.cancel()
492         continueInit.complete(Unit)
493 
494         assertThrows<CancellationException> { update.await() }
495         assertThrows<CancellationException> { read.await() }
496 
497         store.updateData { it.inc().inc() }
498 
499         assertThat(store.data.first()).isEqualTo(3)
500     }
501 
502     @Test
503     fun testConcurrentUpdatesInit() = runTest {
504         val continueUpdate = CompletableDeferred<Unit>()
505 
506         val concurrentUpdateInitializer: suspend (InitializerApi<Byte>) -> Unit = { api ->
507             val update1 = async {
508                 api.updateData {
509                     continueUpdate.await()
510                     it.inc().inc()
511                 }
512             }
513             api.updateData { it.inc() }
514             update1.await()
515         }
516 
517         store = newDataStore(initTasksList = listOf(concurrentUpdateInitializer))
518         val getData = async { store.data.first() }
519         continueUpdate.complete(Unit)
520 
521         assertThat(getData.await()).isEqualTo(3)
522     }
523 
524     @Test
525     fun testInitUpdateBlockRead() = runTest {
526         val continueInit = CompletableDeferred<Unit>()
527         val continueUpdate = CompletableDeferred<Unit>()
528 
529         val updateInitializer: suspend (InitializerApi<Byte>) -> Unit = { api ->
530             api.updateData {
531                 continueInit.await()
532                 it.inc()
533             }
534         }
535 
536         store = newDataStore(initTasksList = listOf(updateInitializer))
537         val getData = async { store.data.first() }
538         val updateData = async {
539             store.updateData {
540                 continueUpdate.await()
541                 it.inc()
542             }
543         }
544 
545         assertThat(getData.isCompleted).isFalse()
546         assertThat(getData.isActive).isTrue()
547 
548         continueInit.complete(Unit)
549         assertThat(getData.await()).isEqualTo(1)
550 
551         assertThat(updateData.isCompleted).isFalse()
552         assertThat(updateData.isActive).isTrue()
553 
554         continueUpdate.complete(Unit)
555         assertThat(updateData.await()).isEqualTo(2)
556         assertThat(store.data.first()).isEqualTo(2)
557     }
558 
559     @Test
560     fun testUpdateSuccessfullyCommittedInit() = runTest {
561         var otherStorage: Byte = 123
562 
563         val initializer: suspend (InitializerApi<Byte>) -> Unit = { api ->
564             api.updateData { otherStorage }
565             // Similar to cleanUp():
566             otherStorage = 0
567         }
568 
569         val store = newDataStore(initTasksList = listOf(initializer))
570 
571         serializerConfig.failingWrite = true
572         assertThrows<IOException> { store.data.first() }
573 
574         serializerConfig.failingWrite = false
575         assertThat(store.data.first()).isEqualTo(123)
576     }
577 
578     @Test
579     fun testInitApiUpdateThrowsAfterInitTasksComplete() = runTest {
580         var savedApi: InitializerApi<Byte>? = null
581 
582         val initializer: suspend (InitializerApi<Byte>) -> Unit = { api -> savedApi = api }
583 
584         val store = newDataStore(initTasksList = listOf(initializer))
585 
586         assertThat(store.data.first()).isEqualTo(0)
587 
588         assertThrows<IllegalStateException> { savedApi?.updateData { 123 } }
589     }
590 
591     @Test
592     fun testFlowReceivesUpdates() = runTest {
593         val collectedBytes = mutableListOf<Byte>()
594 
595         val flowCollectionJob = async { store.data.take(8).toList(collectedBytes) }
596 
597         runCurrent()
598         repeat(7) { store.updateData { it.inc() } }
599 
600         flowCollectionJob.join()
601 
602         assertThat(collectedBytes).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
603     }
604 
605     @Test
606     fun testMultipleFlowsReceiveData() = runTest {
607         val flowOf8 = store.data.take(8)
608 
609         val bytesFromFirstCollect = mutableListOf<Byte>()
610         val bytesFromSecondCollect = mutableListOf<Byte>()
611 
612         val flowCollection1 = async { flowOf8.toList(bytesFromFirstCollect) }
613 
614         val flowCollection2 = async { flowOf8.toList(bytesFromSecondCollect) }
615 
616         runCurrent()
617         repeat(7) { store.updateData { it.inc() } }
618 
619         flowCollection1.join()
620         flowCollection2.join()
621 
622         // This test only works because runTest ensures consistent behavior
623         // Otherwise, we cannot really expect the collector to read every single value
624         // (we provide eventual consistency, so it would also be OK if it missed some intermediate
625         // values as long as it received 7 at the end).
626         assertThat(bytesFromFirstCollect).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
627         assertThat(bytesFromSecondCollect).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
628     }
629 
630     @Test
631     fun testExceptionInFlowDoesNotBreakUpstream() = runTest {
632         val flowOf8 = store.data.take(8)
633 
634         val collectedBytes = mutableListOf<Byte>()
635 
636         val failedFlowCollection = async {
637             assertThrows<Exception> {
638                     flowOf8.collect { throw Exception("Failure while collecting") }
639                 }
640                 .hasMessageThat()
641                 .contains("Failure while collecting")
642         }
643 
644         val successfulFlowCollection = async { flowOf8.take(8).toList(collectedBytes) }
645 
646         runCurrent()
647         repeat(7) { store.updateData { it.inc() } }
648 
649         successfulFlowCollection.join()
650         failedFlowCollection.await()
651 
652         assertThat(collectedBytes).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
653     }
654 
655     @Test
656     fun testSlowConsumerDoesntBlockOtherConsumers() = runTest {
657         val flowOf8 = store.data.take(8)
658 
659         val collectedBytes = mutableListOf<Byte>()
660 
661         val flowCollection2 = async { flowOf8.toList(collectedBytes) }
662 
663         val blockedCollection = async { flowOf8.collect { flowCollection2.await() } }
664 
665         runCurrent()
666         repeat(15) { store.updateData { it.inc() } }
667 
668         flowCollection2.await()
669         assertThat(collectedBytes).isEqualTo(mutableListOf<Byte>(0, 1, 2, 3, 4, 5, 6, 7))
670 
671         blockedCollection.await()
672     }
673 
674     @Test
675     fun testHandlerNotCalledGoodData() = runBlocking {
676         runTest { newDataStore(testFile, scope = backgroundScope).updateData { 1 } }
677 
678         runTest {
679             val testingHandler: TestingCorruptionHandler = TestingCorruptionHandler()
680             val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
681 
682             newStore.updateData { 2 }
683             newStore.data.first()
684 
685             assertThat(testingHandler.numCalls).isEqualTo(0)
686         }
687     }
688 
689     @Test
690     fun handlerNotCalledNonCorruption() = runBlocking {
691         runTest { newDataStore(testFile, scope = backgroundScope).updateData { 1 } }
692 
693         runTest {
694             val testingHandler = TestingCorruptionHandler()
695             serializerConfig.failingRead = true
696             val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
697 
698             assertThrows<IOException> { newStore.updateData { 2 } }
699             assertThrows<IOException> { newStore.data.first() }
700 
701             assertThat(testingHandler.numCalls).isEqualTo(0)
702         }
703     }
704 
705     @Test
706     fun testHandlerCalledCorruptDataRead() = runBlocking {
707         runTest {
708             val newStore = newDataStore(testFile, scope = backgroundScope)
709             newStore.updateData { 1 } // Pre-seed the data so the file exists.
710         }
711 
712         runTest {
713             val testingHandler: TestingCorruptionHandler = TestingCorruptionHandler()
714             serializerConfig.failReadWithCorruptionException = true
715             val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
716 
717             assertThrows<IOException> { newStore.data.first() }
718                 .hasMessageThat()
719                 .contains("Handler thrown exception.")
720 
721             assertThat(testingHandler.numCalls).isEqualTo(1)
722         }
723     }
724 
725     @Test
726     fun testHandlerCalledCorruptDataWrite() = runBlocking {
727         runTest {
728             val newStore = newDataStore(file = testFile, scope = backgroundScope)
729             newStore.updateData { 1 }
730         }
731 
732         runTest {
733             val testingHandler: TestingCorruptionHandler = TestingCorruptionHandler()
734             serializerConfig.failReadWithCorruptionException = true
735             val newStore = newDataStore(corruptionHandler = testingHandler, file = testFile)
736 
737             assertThrows<IOException> { newStore.updateData { 1 } }
738                 .hasMessageThat()
739                 .contains("Handler thrown exception.")
740 
741             assertThat(testingHandler.numCalls).isEqualTo(1)
742         }
743     }
744 
745     @Test
746     fun testHandlerReplaceData() = runBlocking {
747         runTest { newDataStore(file = testFile, scope = backgroundScope).updateData { 1 } }
748 
749         runTest {
750             val testingHandler: TestingCorruptionHandler =
751                 TestingCorruptionHandler(replaceWith = 10)
752             serializerConfig.failReadWithCorruptionException = true
753             val newStore =
754                 newDataStore(
755                     corruptionHandler = testingHandler,
756                     file = testFile,
757                     scope = backgroundScope
758                 )
759 
760             assertThat(newStore.data.first()).isEqualTo(10)
761         }
762     }
763 
764     @Test
765     fun testDefaultValueUsedWhenNoDataOnDisk() = runTest {
766         val dataStore =
767             testIO.getStore(
768                 TestingSerializerConfig(defaultValue = 99),
769                 dataStoreScope,
770                 { MultiProcessCoordinator(dataStoreScope.coroutineContext, getJavaFile(testFile)) }
771             ) {
772                 testFile
773             }
774 
775         assertThat(dataStore.data.first()).isEqualTo(99)
776     }
777 
778     @Test
779     fun testTransformRunInCallersContext() =
780         runBlocking<Unit> {
781             suspend fun getContext(): CoroutineContext {
782                 return kotlin.coroutines.coroutineContext
783             }
784 
785             withContext(TestElement("123")) {
786                 store.updateData {
787                     val context = getContext()
788                     assertThat(context[TestElement.Key]!!.name).isEqualTo("123")
789                     it.inc()
790                 }
791             }
792         }
793 
794     private class TestElement(val name: String) : AbstractCoroutineContextElement(Key) {
795         companion object Key : CoroutineContext.Key<TestElement>
796     }
797 
798     @Test
799     fun testCancelInflightWrite() =
800         doBlockingWithTimeout(1000) {
801             val myScope =
802                 CoroutineScope(Job() + Executors.newSingleThreadExecutor().asCoroutineDispatcher())
803 
804             val updateStarted = CompletableDeferred<Unit>()
805             myScope.launch {
806                 store.updateData {
807                     updateStarted.complete(Unit)
808                     awaitCancellation()
809                 }
810             }
811             updateStarted.await()
812             myScope.coroutineContext[Job]!!.cancelAndJoin()
813         }
814 
815     @Test
816     fun testWrite_afterCanceledWrite_succeeds() =
817         doBlockingWithTimeout(1000) {
818             val myScope =
819                 CoroutineScope(Job() + Executors.newSingleThreadExecutor().asCoroutineDispatcher())
820 
821             val cancelNow = CompletableDeferred<Unit>()
822 
823             myScope.launch {
824                 store.updateData {
825                     cancelNow.complete(Unit)
826                     awaitCancellation()
827                 }
828             }
829 
830             cancelNow.await()
831             myScope.coroutineContext[Job]!!.cancelAndJoin()
832 
833             store.updateData { 123 }
834         }
835 
836     @Test
837     fun testWrite_fromOtherScope_doesntGetCancelledFromDifferentScope() =
838         doBlockingWithTimeout(1000) {
839             val otherScope = CoroutineScope(Job())
840 
841             val callerScope = CoroutineScope(Job())
842 
843             val firstUpdateStarted = CompletableDeferred<Unit>()
844             val finishFirstUpdate = CompletableDeferred<Byte>()
845 
846             val firstUpdate =
847                 otherScope.async(Dispatchers.Unconfined) {
848                     store.updateData {
849                         firstUpdateStarted.complete(Unit)
850                         finishFirstUpdate.await()
851                     }
852                 }
853 
854             callerScope.launch(Dispatchers.Unconfined) { store.updateData { awaitCancellation() } }
855 
856             firstUpdateStarted.await()
857             callerScope.coroutineContext.job.cancelAndJoin()
858             finishFirstUpdate.complete(1)
859             firstUpdate.await()
860 
861             // It's still usable:
862             assertThat(store.updateData { it.inc() }).isEqualTo(2)
863         }
864 
865     @Test
866     fun testCreateDuplicateActiveDataStore() = runTest {
867         val file = testIO.newTempFile(parentFile = tempFolder)
868         val dataStore = newDataStore(file = file, scope = CoroutineScope(Job()))
869 
870         dataStore.data.first()
871 
872         val duplicateDataStore = newDataStore(file = file, scope = CoroutineScope(Job()))
873 
874         assertThrows<IllegalStateException> { duplicateDataStore.data.first() }
875     }
876 
877     @Test
878     fun testCreateDataStore_withSameFileAsInactiveDataStore() = runTest {
879         val file = testIO.newTempFile(parentFile = tempFolder)
880         val scope1 = CoroutineScope(Job())
881         val dataStore1 = newDataStore(file = file, scope = scope1)
882 
883         dataStore1.data.first()
884 
885         scope1.coroutineContext.job.cancelAndJoin()
886 
887         val dataStore2 = newDataStore(file = file, scope = CoroutineScope(Job()))
888 
889         // This shouldn't throw an exception bc the scope1 has been cancelled.
890         dataStore2.data.first()
891     }
892 
893     @Test
894     fun testCreateDataStoreAndRead_withStrictMode() = runTest {
895         StrictMode.setThreadPolicy(
896             StrictMode.ThreadPolicy.Builder()
897                 .detectDiskReads()
898                 .detectDiskWrites()
899                 .penaltyDeath()
900                 .build()
901         )
902         val dataStore =
903             newDataStore(file = testFile, scope = CoroutineScope(newSingleThreadContext("test")))
904         assertThat(dataStore.data.first()).isEqualTo(0)
905         StrictMode.allowThreadDiskReads()
906         StrictMode.allowThreadDiskWrites()
907     }
908 
909     @Test
910     fun testCreateDataStoreAndUpdate_withStrictMode() = runTest {
911         StrictMode.setThreadPolicy(
912             StrictMode.ThreadPolicy.Builder()
913                 .detectDiskReads()
914                 .detectDiskWrites()
915                 .penaltyDeath()
916                 .build()
917         )
918         val dataStore =
919             newDataStore(file = testFile, scope = CoroutineScope(newSingleThreadContext("test")))
920         dataStore.updateData { it.inc() }
921         assertThat(dataStore.data.first()).isEqualTo(1)
922         StrictMode.allowThreadDiskReads()
923         StrictMode.allowThreadDiskWrites()
924     }
925 
926     @Test
927     fun testWriteSameValueSkipDisk() = runTest {
928         // write a non-default value to force a disk write
929         store.updateData { 10 }
930         assertThat(serializerConfig.writeCount).isEqualTo(1)
931 
932         // write same value again
933         store.updateData { 10 }
934         assertThat(serializerConfig.writeCount).isEqualTo(1)
935     }
936 
937     // Mutable wrapper around a byte
938     data class ByteWrapper(var byte: Byte) {
939         internal class ByteWrapperSerializer() : Serializer<ByteWrapper> {
940             private val delegate = TestingSerializer()
941 
942             override val defaultValue = ByteWrapper(delegate.defaultValue)
943 
944             override suspend fun readFrom(input: InputStream): ByteWrapper {
945                 return ByteWrapper(delegate.readFrom(input))
946             }
947 
948             override suspend fun writeTo(t: ByteWrapper, output: OutputStream) {
949                 delegate.writeTo(t.byte, output)
950             }
951         }
952     }
953 
954     private class TestingCorruptionHandler(private val replaceWith: Byte? = null) :
955         CorruptionHandler<Byte> {
956 
957         @Volatile var numCalls = 0
958 
959         override suspend fun handleCorruption(ex: CorruptionException): Byte {
960             numCalls++
961 
962             replaceWith?.let {
963                 return it
964             }
965 
966             throw IOException("Handler thrown exception.")
967         }
968     }
969 }
970 
doBlockingWithTimeoutnull971 fun doBlockingWithTimeout(ms: Long, block: suspend () -> Unit): Unit =
972     runBlocking<Unit> { withTimeout(ms) { block() } }
973