1 /*
<lambda>null2  * Copyright 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.datastore.core
18 
19 import androidx.datastore.core.handlers.NoOpCorruptionHandler
20 import kotlin.contracts.ExperimentalContracts
21 import kotlin.contracts.InvocationKind
22 import kotlin.contracts.contract
23 import kotlin.coroutines.CoroutineContext
24 import kotlin.coroutines.coroutineContext
25 import kotlinx.coroutines.CancellationException
26 import kotlinx.coroutines.CompletableDeferred
27 import kotlinx.coroutines.CoroutineScope
28 import kotlinx.coroutines.Job
29 import kotlinx.coroutines.SupervisorJob
30 import kotlinx.coroutines.completeWith
31 import kotlinx.coroutines.flow.Flow
32 import kotlinx.coroutines.flow.conflate
33 import kotlinx.coroutines.flow.dropWhile
34 import kotlinx.coroutines.flow.emitAll
35 import kotlinx.coroutines.flow.flow
36 import kotlinx.coroutines.flow.map
37 import kotlinx.coroutines.flow.onCompletion
38 import kotlinx.coroutines.flow.onStart
39 import kotlinx.coroutines.flow.takeWhile
40 import kotlinx.coroutines.launch
41 import kotlinx.coroutines.sync.Mutex
42 import kotlinx.coroutines.sync.withLock
43 import kotlinx.coroutines.withContext
44 
45 /** Multi process implementation of DataStore. It is multi-process safe. */
46 internal class DataStoreImpl<T>(
47     private val storage: Storage<T>,
48     /**
49      * The list of initialization tasks to perform. These tasks will be completed before any data is
50      * published to the data and before any read-modify-writes execute in updateData. If any of the
51      * tasks fail, the tasks will be run again the next time data is collected or updateData is
52      * called. Init tasks should not wait on results from data - this will result in deadlock.
53      */
54     initTasksList: List<suspend (api: InitializerApi<T>) -> Unit> = emptyList(),
55     /**
56      * The handler of [CorruptionException]s when they are thrown during reads or writes. It
57      * produces the new data to replace the corrupted data on disk. By default it is a no-op which
58      * simply throws the exception and does not produce new data.
59      */
60     private val corruptionHandler: CorruptionHandler<T> = NoOpCorruptionHandler(),
61     private val scope: CoroutineScope = CoroutineScope(ioDispatcher() + SupervisorJob())
62 ) : CurrentDataProviderStore<T> {
63 
64     /**
65      * The actual values of DataStore. This is exposed in the API via [data] to be able to combine
66      * its lifetime with IPC update collection ([updateCollection]).
67      */
68     override val data: Flow<T> = flow {
69         val startState = readState(requireLock = false)
70         when (startState) {
71             is Data<T> -> emit(startState.value)
72             is UnInitialized -> error(BUG_MESSAGE)
73             is ReadException<T> -> throw startState.readException
74             // TODO(b/273990827): decide the contract of accessing when state is Final
75             is Final -> return@flow
76         }
77 
78         /**
79          * If downstream flow is UnInitialized, no data has been read yet, we need to trigger a new
80          * read then start emitting values once we have seen a new value (or exception).
81          *
82          * If downstream flow has a ReadException, there was an exception last time we tried to read
83          * data. We need to trigger a new read then start emitting values once we have seen a new
84          * value (or exception).
85          *
86          * If downstream flow has Data, we should start emitting from downstream flow as long as its
87          * version is not stale compared to the version read from the shared counter when we enter
88          * the flow.
89          *
90          * If Downstream flow is Final, the scope has been cancelled so the data store is no longer
91          * usable. We should just propagate this exception.
92          *
93          * State always starts at null. null can transition to ReadException, Data or Final.
94          * ReadException can transition to another ReadException, Data or Final. Data can transition
95          * to another Data or Final. Final will not change.
96          */
97         emitAll(
98             inMemoryCache.flow
99                 .onStart { incrementCollector() }
100                 .takeWhile {
101                     // end the flow if we reach the final value
102                     it !is Final
103                 }
104                 .dropWhile { it is Data && it.version <= startState.version }
105                 .map {
106                     when (it) {
107                         is ReadException<T> -> throw it.readException
108                         is Data<T> -> it.value
109                         is Final<T>,
110                         is UnInitialized -> error(BUG_MESSAGE)
111                     }
112                 }
113                 .onCompletion { decrementCollector() }
114         )
115     }
116 
117     override suspend fun currentData(): T {
118         val startState = readState(requireLock = false)
119         when (startState) {
120             is Data<T> -> return startState.value
121             is UnInitialized -> error(BUG_MESSAGE)
122             is ReadException<T> -> throw startState.readException
123             // TODO(b/273990827): decide the contract of accessing when state is Final
124             is Final -> throw startState.finalException
125         }
126     }
127 
128     private val collectorMutex = Mutex()
129     private var collectorCounter = 0
130     /**
131      * Job responsible for observing [InterProcessCoordinator] for file changes. Each downstream
132      * [data] flow collects on this [kotlinx.coroutines.Job] to ensure we observe the
133      * [InterProcessCoordinator] when there is an active collection on the [data].
134      */
135     private var collectorJob: Job? = null
136 
137     private suspend fun incrementCollector() {
138         collectorMutex.withLock {
139             if (++collectorCounter == 1) {
140                 collectorJob =
141                     scope.launch {
142                         readAndInit.awaitComplete()
143                         coordinator.updateNotifications.conflate().collect {
144                             val currentState = inMemoryCache.currentState
145                             if (currentState !is Final) {
146                                 // update triggered reads should always wait for lock
147                                 readDataAndUpdateCache(requireLock = true)
148                             }
149                         }
150                     }
151             }
152         }
153     }
154 
155     private suspend fun decrementCollector() {
156         collectorMutex.withLock {
157             if (--collectorCounter == 0) {
158                 collectorJob?.cancel()
159                 collectorJob = null
160             }
161         }
162     }
163 
164     override suspend fun updateData(transform: suspend (t: T) -> T): T {
165         val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key]
166         parentContextElement?.checkNotUpdating(this)
167         val childContextElement =
168             UpdatingDataContextElement(parent = parentContextElement, instance = this)
169         return withContext(childContextElement) {
170             val ack = CompletableDeferred<T>()
171             val currentDownStreamFlowState = inMemoryCache.currentState
172             val updateMsg =
173                 Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
174             writeActor.offer(updateMsg)
175             ack.await()
176         }
177     }
178 
179     // cache is only set by the reads who have file lock, so cache always has stable data
180     private val inMemoryCache = DataStoreInMemoryCache<T>()
181 
182     private val readAndInit = InitDataStore(initTasksList)
183 
184     // TODO(b/269772127): make this private after we allow multiple instances of DataStore on the
185     //  same file
186     private val storageConnectionDelegate = lazy { storage.createConnection() }
187     internal val storageConnection by storageConnectionDelegate
188     private val coordinator: InterProcessCoordinator by lazy { storageConnection.coordinator }
189 
190     private val writeActor =
191         SimpleActor<Message.Update<T>>(
192             scope = scope,
193             onComplete = {
194                 // We expect it to always be non-null but we will leave the alternative as a no-op
195                 // just in case.
196                 it?.let { inMemoryCache.tryUpdate(Final(it)) }
197                 // don't try to close storage connection if it was not created in the first place.
198                 if (storageConnectionDelegate.isInitialized()) {
199                     storageConnection.close()
200                 }
201             },
202             onUndeliveredElement = { msg, ex ->
203                 msg.ack.completeExceptionally(
204                     ex
205                         ?: CancellationException(
206                             "DataStore scope was cancelled before updateData could complete"
207                         )
208                 )
209             }
210         ) { msg ->
211             handleUpdate(msg)
212         }
213 
214     private suspend fun readState(requireLock: Boolean): State<T> =
215         withContext(scope.coroutineContext) {
216             if (inMemoryCache.currentState is Final) {
217                 // if state is Final, just return it
218                 inMemoryCache.currentState
219             } else {
220                 try {
221                     // make sure we initialize properly before reading from file.
222                     readAndInitOrPropagateAndThrowFailure()
223                 } catch (throwable: Throwable) {
224                     // init or read failed, it is already updated in the cached value
225                     // so we don't need to do anything.
226                     return@withContext ReadException(throwable, -1)
227                 }
228                 // after init, try to read again. If the init run for this block, it won't re-read
229                 // the file and use cache, so this is an OK call to make wrt performance.
230                 readDataAndUpdateCache(requireLock)
231             }
232         }
233 
234     private suspend fun handleUpdate(update: Message.Update<T>) {
235         update.ack.completeWith(
236             runCatching {
237                 val result: T
238                 when (val currentState = inMemoryCache.currentState) {
239                     is Data -> {
240                         // We are already initialized, we just need to perform the update
241                         result = transformAndWrite(update.transform, update.callerContext)
242                     }
243                     is ReadException,
244                     is UnInitialized -> {
245                         if (currentState === update.lastState) {
246                             // we need to try to read again
247                             readAndInitOrPropagateAndThrowFailure()
248 
249                             // We've successfully read, now we need to perform the update
250                             result = transformAndWrite(update.transform, update.callerContext)
251                         } else {
252                             // Someone else beat us to read but also failed. We just need to
253                             // signal the writer that is waiting on ack.
254                             // This cast is safe because we can't be in the UnInitialized
255                             // state if the state has changed.
256                             throw (currentState as ReadException).readException
257                         }
258                     }
259                     is Final -> throw currentState.finalException // won't happen
260                 }
261                 result
262             }
263         )
264     }
265 
266     private suspend fun readAndInitOrPropagateAndThrowFailure() {
267         val preReadVersion = coordinator.getVersion()
268         try {
269             readAndInit.runIfNeeded()
270         } catch (throwable: Throwable) {
271             inMemoryCache.tryUpdate(ReadException(throwable, preReadVersion))
272             throw throwable
273         }
274     }
275 
276     /**
277      * Reads the file and updates the cache unless current cached value is Data and its version is
278      * equal to the latest version, or it is unable to get lock.
279      *
280      * Calling this method when state is UnInitialized is a bug and this method will throw if that
281      * happens.
282      */
283     private suspend fun readDataAndUpdateCache(requireLock: Boolean): State<T> {
284         // Check if the cached version matches with shared memory counter
285         val currentState = inMemoryCache.currentState
286         // should not call this without initialization first running
287         check(currentState !is UnInitialized) { BUG_MESSAGE }
288         val latestVersion = coordinator.getVersion()
289         val cachedVersion = if (currentState is Data) currentState.version else -1
290 
291         // Return cached value if cached version is latest
292         if (currentState is Data && latestVersion == cachedVersion) {
293             return currentState
294         }
295         val (newState, acquiredLock) =
296             if (requireLock) {
297                 coordinator.lock {
298                     try {
299                         readDataOrHandleCorruption(hasWriteFileLock = true)
300                     } catch (ex: Throwable) {
301                         ReadException<T>(ex, coordinator.getVersion())
302                     } to true
303                 }
304             } else {
305                 coordinator.tryLock { locked ->
306                     try {
307                         readDataOrHandleCorruption(locked)
308                     } catch (ex: Throwable) {
309                         ReadException<T>(
310                             ex,
311                             if (locked) coordinator.getVersion() else cachedVersion
312                         )
313                     } to locked
314                 }
315             }
316         if (acquiredLock) {
317             inMemoryCache.tryUpdate(newState)
318         }
319         return newState
320     }
321 
322     // Caller is responsible for (try to) getting file lock. It reads from the file directly without
323     // checking shared counter version and returns serializer default value if file is not found.
324     private suspend fun readDataFromFileOrDefault(): T {
325         return storageConnection.readData()
326     }
327 
328     private suspend fun transformAndWrite(
329         transform: suspend (t: T) -> T,
330         callerContext: CoroutineContext
331     ): T =
332         coordinator.lock {
333             val curData = readDataOrHandleCorruption(hasWriteFileLock = true)
334             val newData = withContext(callerContext) { transform(curData.value) }
335 
336             // Check that curData has not changed...
337             curData.checkHashCode()
338 
339             if (curData.value != newData) {
340                 writeData(newData, updateCache = true)
341             }
342             newData
343         }
344 
345     // Write data to disk and return the corresponding version if succeed.
346     internal suspend fun writeData(newData: T, updateCache: Boolean): Int {
347         var newVersion = 0
348 
349         // The code in `writeScope` is run synchronously, i.e. the newVersion isn't returned until
350         // the code in `writeScope` completes.
351         storageConnection.writeScope {
352             // update version before write to file to avoid the case where if update version after
353             // file write, the process can crash after file write but before version increment, so
354             // the readers might skip reading forever because the version isn't changed
355             newVersion = coordinator.incrementAndGetVersion()
356             writeData(newData)
357             if (updateCache) {
358                 inMemoryCache.tryUpdate(Data(newData, newData.hashCode(), newVersion))
359             }
360         }
361 
362         return newVersion
363     }
364 
365     private suspend fun readDataOrHandleCorruption(hasWriteFileLock: Boolean): Data<T> {
366         try {
367             return if (hasWriteFileLock) {
368                 val data = readDataFromFileOrDefault()
369                 Data(data, data.hashCode(), version = coordinator.getVersion())
370             } else {
371                 val preLockVersion = coordinator.getVersion()
372                 coordinator.tryLock { locked ->
373                     val data = readDataFromFileOrDefault()
374                     val version = if (locked) coordinator.getVersion() else preLockVersion
375                     Data(data, data.hashCode(), version)
376                 }
377             }
378         } catch (ex: CorruptionException) {
379             var newData: T = corruptionHandler.handleCorruption(ex)
380             var version: Int // initialized inside the try block
381 
382             try {
383                 doWithWriteFileLock(hasWriteFileLock) {
384                     // Confirms the file is still corrupted before overriding
385                     try {
386                         newData = readDataFromFileOrDefault()
387                         version = coordinator.getVersion()
388                     } catch (ignoredEx: CorruptionException) {
389                         version = writeData(newData, updateCache = true)
390                     }
391                 }
392             } catch (writeEx: Throwable) {
393                 // If we fail to write the handled data, add the new exception as a suppressed
394                 // exception.
395                 ex.addSuppressed(writeEx)
396                 throw ex
397             }
398 
399             // If we reach this point, we've successfully replaced the data on disk with newData.
400             return Data(newData, newData.hashCode(), version)
401         }
402     }
403 
404     @OptIn(ExperimentalContracts::class)
405     @Suppress(
406         "LEAKED_IN_PLACE_LAMBDA",
407         "WRONG_INVOCATION_KIND"
408     ) // https://youtrack.jetbrains.com/issue/KT-29963
409     private suspend fun <R> doWithWriteFileLock(
410         hasWriteFileLock: Boolean,
411         block: suspend () -> R
412     ): R {
413         contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
414         return if (hasWriteFileLock) {
415             block()
416         } else {
417             coordinator.lock { block() }
418         }
419     }
420 
421     private inner class InitDataStore(
422         initTasksList: List<suspend (api: InitializerApi<T>) -> Unit>
423     ) : RunOnce() {
424         // cleaned after initialization is complete
425         private var initTasks: List<suspend (api: InitializerApi<T>) -> Unit>? =
426             initTasksList.toList()
427 
428         override suspend fun doRun() {
429             val initData =
430                 if ((initTasks == null) || initTasks!!.isEmpty()) {
431                     // if there are no init tasks, we can directly read
432                     readDataOrHandleCorruption(hasWriteFileLock = false)
433                 } else {
434                     // if there are init tasks, we need to obtain a lock to ensure migrations
435                     // run as 1 chunk
436                     coordinator.lock {
437                         val updateLock = Mutex()
438                         var initializationComplete = false
439                         var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value
440 
441                         val api =
442                             object : InitializerApi<T> {
443                                 override suspend fun updateData(transform: suspend (t: T) -> T): T {
444                                     return updateLock.withLock {
445                                         check(!initializationComplete) {
446                                             "InitializerApi.updateData should not be called after " +
447                                                 "initialization is complete."
448                                         }
449 
450                                         val newData = transform(currentData)
451                                         if (newData != currentData) {
452                                             writeData(newData, updateCache = false)
453                                             currentData = newData
454                                         }
455 
456                                         currentData
457                                     }
458                                 }
459                             }
460 
461                         initTasks?.forEach { it(api) }
462                         // Init tasks have run successfully, we don't need them anymore.
463                         initTasks = null
464                         updateLock.withLock { initializationComplete = true }
465                         // only to make compiler happy
466                         Data(
467                             value = currentData,
468                             hashCode = currentData.hashCode(),
469                             version = coordinator.getVersion()
470                         )
471                     }
472                 }
473             inMemoryCache.tryUpdate(initData)
474         }
475     }
476 
477     companion object {
478         private const val BUG_MESSAGE =
479             "This is a bug in DataStore. Please file a bug at: " +
480                 "https://issuetracker.google.com/issues/new?component=907884&template=1466542"
481     }
482 }
483 
484 /**
485  * Helper class that executes [doRun] up to 1 time to completion. If it fails, it will be retried in
486  * the next [runIfNeeded] call.
487  */
488 internal abstract class RunOnce {
489     private val runMutex = Mutex()
490     private val didRun = CompletableDeferred<Unit>()
491 
doRunnull492     protected abstract suspend fun doRun()
493 
494     suspend fun awaitComplete() = didRun.await()
495 
496     suspend fun runIfNeeded() {
497         if (didRun.isCompleted) return
498         runMutex.withLock {
499             if (didRun.isCompleted) return
500             doRun()
501             didRun.complete(Unit)
502         }
503     }
504 }
505 
506 /**
507  * [CoroutineContext.Element] that is added to the coroutineContext when [DataStore.updateData] is
508  * called to detect nested calls. b/241760537 (see: [DataStoreImpl.updateData])
509  *
510  * It is OK for different DataStore instances to nest updateData calls, they just cannot be on the
511  * same DataStore. To track these instances, each [UpdatingDataContextElement] holds a reference to
512  * a parent.
513  */
514 internal class UpdatingDataContextElement(
515     private val parent: UpdatingDataContextElement?,
516     private val instance: DataStoreImpl<*>
517 ) : CoroutineContext.Element {
518 
519     companion object {
520         internal val NESTED_UPDATE_ERROR_MESSAGE =
521             """
522                 Calling updateData inside updateData on the same DataStore instance is not supported
523                 since updates made in the parent updateData call will not be visible to the nested
524                 updateData call. See https://issuetracker.google.com/issues/241760537 for details.
525             """
526                 .trimIndent()
527 
528         internal object Key : CoroutineContext.Key<UpdatingDataContextElement>
529     }
530 
531     /** Checks the given [candidate] is not currently in a [DataStore.updateData] block. */
checkNotUpdatingnull532     fun checkNotUpdating(candidate: DataStore<*>) {
533         if (instance === candidate) {
534             error(NESTED_UPDATE_ERROR_MESSAGE)
535         }
536         // check the parent if it exists to detect nested calls between [DataStore] instances.
537         parent?.checkNotUpdating(candidate)
538     }
539 
540     override val key: CoroutineContext.Key<*>
541         get() = Key
542 }
543