1 /* <lambda>null2 * Copyright 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.datastore.core 18 19 import androidx.datastore.core.handlers.NoOpCorruptionHandler 20 import kotlin.contracts.ExperimentalContracts 21 import kotlin.contracts.InvocationKind 22 import kotlin.contracts.contract 23 import kotlin.coroutines.CoroutineContext 24 import kotlin.coroutines.coroutineContext 25 import kotlinx.coroutines.CancellationException 26 import kotlinx.coroutines.CompletableDeferred 27 import kotlinx.coroutines.CoroutineScope 28 import kotlinx.coroutines.Job 29 import kotlinx.coroutines.SupervisorJob 30 import kotlinx.coroutines.completeWith 31 import kotlinx.coroutines.flow.Flow 32 import kotlinx.coroutines.flow.conflate 33 import kotlinx.coroutines.flow.dropWhile 34 import kotlinx.coroutines.flow.emitAll 35 import kotlinx.coroutines.flow.flow 36 import kotlinx.coroutines.flow.map 37 import kotlinx.coroutines.flow.onCompletion 38 import kotlinx.coroutines.flow.onStart 39 import kotlinx.coroutines.flow.takeWhile 40 import kotlinx.coroutines.launch 41 import kotlinx.coroutines.sync.Mutex 42 import kotlinx.coroutines.sync.withLock 43 import kotlinx.coroutines.withContext 44 45 /** Multi process implementation of DataStore. It is multi-process safe. */ 46 internal class DataStoreImpl<T>( 47 private val storage: Storage<T>, 48 /** 49 * The list of initialization tasks to perform. These tasks will be completed before any data is 50 * published to the data and before any read-modify-writes execute in updateData. If any of the 51 * tasks fail, the tasks will be run again the next time data is collected or updateData is 52 * called. Init tasks should not wait on results from data - this will result in deadlock. 53 */ 54 initTasksList: List<suspend (api: InitializerApi<T>) -> Unit> = emptyList(), 55 /** 56 * The handler of [CorruptionException]s when they are thrown during reads or writes. It 57 * produces the new data to replace the corrupted data on disk. By default it is a no-op which 58 * simply throws the exception and does not produce new data. 59 */ 60 private val corruptionHandler: CorruptionHandler<T> = NoOpCorruptionHandler(), 61 private val scope: CoroutineScope = CoroutineScope(ioDispatcher() + SupervisorJob()) 62 ) : CurrentDataProviderStore<T> { 63 64 /** 65 * The actual values of DataStore. This is exposed in the API via [data] to be able to combine 66 * its lifetime with IPC update collection ([updateCollection]). 67 */ 68 override val data: Flow<T> = flow { 69 val startState = readState(requireLock = false) 70 when (startState) { 71 is Data<T> -> emit(startState.value) 72 is UnInitialized -> error(BUG_MESSAGE) 73 is ReadException<T> -> throw startState.readException 74 // TODO(b/273990827): decide the contract of accessing when state is Final 75 is Final -> return@flow 76 } 77 78 /** 79 * If downstream flow is UnInitialized, no data has been read yet, we need to trigger a new 80 * read then start emitting values once we have seen a new value (or exception). 81 * 82 * If downstream flow has a ReadException, there was an exception last time we tried to read 83 * data. We need to trigger a new read then start emitting values once we have seen a new 84 * value (or exception). 85 * 86 * If downstream flow has Data, we should start emitting from downstream flow as long as its 87 * version is not stale compared to the version read from the shared counter when we enter 88 * the flow. 89 * 90 * If Downstream flow is Final, the scope has been cancelled so the data store is no longer 91 * usable. We should just propagate this exception. 92 * 93 * State always starts at null. null can transition to ReadException, Data or Final. 94 * ReadException can transition to another ReadException, Data or Final. Data can transition 95 * to another Data or Final. Final will not change. 96 */ 97 emitAll( 98 inMemoryCache.flow 99 .onStart { incrementCollector() } 100 .takeWhile { 101 // end the flow if we reach the final value 102 it !is Final 103 } 104 .dropWhile { it is Data && it.version <= startState.version } 105 .map { 106 when (it) { 107 is ReadException<T> -> throw it.readException 108 is Data<T> -> it.value 109 is Final<T>, 110 is UnInitialized -> error(BUG_MESSAGE) 111 } 112 } 113 .onCompletion { decrementCollector() } 114 ) 115 } 116 117 override suspend fun currentData(): T { 118 val startState = readState(requireLock = false) 119 when (startState) { 120 is Data<T> -> return startState.value 121 is UnInitialized -> error(BUG_MESSAGE) 122 is ReadException<T> -> throw startState.readException 123 // TODO(b/273990827): decide the contract of accessing when state is Final 124 is Final -> throw startState.finalException 125 } 126 } 127 128 private val collectorMutex = Mutex() 129 private var collectorCounter = 0 130 /** 131 * Job responsible for observing [InterProcessCoordinator] for file changes. Each downstream 132 * [data] flow collects on this [kotlinx.coroutines.Job] to ensure we observe the 133 * [InterProcessCoordinator] when there is an active collection on the [data]. 134 */ 135 private var collectorJob: Job? = null 136 137 private suspend fun incrementCollector() { 138 collectorMutex.withLock { 139 if (++collectorCounter == 1) { 140 collectorJob = 141 scope.launch { 142 readAndInit.awaitComplete() 143 coordinator.updateNotifications.conflate().collect { 144 val currentState = inMemoryCache.currentState 145 if (currentState !is Final) { 146 // update triggered reads should always wait for lock 147 readDataAndUpdateCache(requireLock = true) 148 } 149 } 150 } 151 } 152 } 153 } 154 155 private suspend fun decrementCollector() { 156 collectorMutex.withLock { 157 if (--collectorCounter == 0) { 158 collectorJob?.cancel() 159 collectorJob = null 160 } 161 } 162 } 163 164 override suspend fun updateData(transform: suspend (t: T) -> T): T { 165 val parentContextElement = coroutineContext[UpdatingDataContextElement.Companion.Key] 166 parentContextElement?.checkNotUpdating(this) 167 val childContextElement = 168 UpdatingDataContextElement(parent = parentContextElement, instance = this) 169 return withContext(childContextElement) { 170 val ack = CompletableDeferred<T>() 171 val currentDownStreamFlowState = inMemoryCache.currentState 172 val updateMsg = 173 Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext) 174 writeActor.offer(updateMsg) 175 ack.await() 176 } 177 } 178 179 // cache is only set by the reads who have file lock, so cache always has stable data 180 private val inMemoryCache = DataStoreInMemoryCache<T>() 181 182 private val readAndInit = InitDataStore(initTasksList) 183 184 // TODO(b/269772127): make this private after we allow multiple instances of DataStore on the 185 // same file 186 private val storageConnectionDelegate = lazy { storage.createConnection() } 187 internal val storageConnection by storageConnectionDelegate 188 private val coordinator: InterProcessCoordinator by lazy { storageConnection.coordinator } 189 190 private val writeActor = 191 SimpleActor<Message.Update<T>>( 192 scope = scope, 193 onComplete = { 194 // We expect it to always be non-null but we will leave the alternative as a no-op 195 // just in case. 196 it?.let { inMemoryCache.tryUpdate(Final(it)) } 197 // don't try to close storage connection if it was not created in the first place. 198 if (storageConnectionDelegate.isInitialized()) { 199 storageConnection.close() 200 } 201 }, 202 onUndeliveredElement = { msg, ex -> 203 msg.ack.completeExceptionally( 204 ex 205 ?: CancellationException( 206 "DataStore scope was cancelled before updateData could complete" 207 ) 208 ) 209 } 210 ) { msg -> 211 handleUpdate(msg) 212 } 213 214 private suspend fun readState(requireLock: Boolean): State<T> = 215 withContext(scope.coroutineContext) { 216 if (inMemoryCache.currentState is Final) { 217 // if state is Final, just return it 218 inMemoryCache.currentState 219 } else { 220 try { 221 // make sure we initialize properly before reading from file. 222 readAndInitOrPropagateAndThrowFailure() 223 } catch (throwable: Throwable) { 224 // init or read failed, it is already updated in the cached value 225 // so we don't need to do anything. 226 return@withContext ReadException(throwable, -1) 227 } 228 // after init, try to read again. If the init run for this block, it won't re-read 229 // the file and use cache, so this is an OK call to make wrt performance. 230 readDataAndUpdateCache(requireLock) 231 } 232 } 233 234 private suspend fun handleUpdate(update: Message.Update<T>) { 235 update.ack.completeWith( 236 runCatching { 237 val result: T 238 when (val currentState = inMemoryCache.currentState) { 239 is Data -> { 240 // We are already initialized, we just need to perform the update 241 result = transformAndWrite(update.transform, update.callerContext) 242 } 243 is ReadException, 244 is UnInitialized -> { 245 if (currentState === update.lastState) { 246 // we need to try to read again 247 readAndInitOrPropagateAndThrowFailure() 248 249 // We've successfully read, now we need to perform the update 250 result = transformAndWrite(update.transform, update.callerContext) 251 } else { 252 // Someone else beat us to read but also failed. We just need to 253 // signal the writer that is waiting on ack. 254 // This cast is safe because we can't be in the UnInitialized 255 // state if the state has changed. 256 throw (currentState as ReadException).readException 257 } 258 } 259 is Final -> throw currentState.finalException // won't happen 260 } 261 result 262 } 263 ) 264 } 265 266 private suspend fun readAndInitOrPropagateAndThrowFailure() { 267 val preReadVersion = coordinator.getVersion() 268 try { 269 readAndInit.runIfNeeded() 270 } catch (throwable: Throwable) { 271 inMemoryCache.tryUpdate(ReadException(throwable, preReadVersion)) 272 throw throwable 273 } 274 } 275 276 /** 277 * Reads the file and updates the cache unless current cached value is Data and its version is 278 * equal to the latest version, or it is unable to get lock. 279 * 280 * Calling this method when state is UnInitialized is a bug and this method will throw if that 281 * happens. 282 */ 283 private suspend fun readDataAndUpdateCache(requireLock: Boolean): State<T> { 284 // Check if the cached version matches with shared memory counter 285 val currentState = inMemoryCache.currentState 286 // should not call this without initialization first running 287 check(currentState !is UnInitialized) { BUG_MESSAGE } 288 val latestVersion = coordinator.getVersion() 289 val cachedVersion = if (currentState is Data) currentState.version else -1 290 291 // Return cached value if cached version is latest 292 if (currentState is Data && latestVersion == cachedVersion) { 293 return currentState 294 } 295 val (newState, acquiredLock) = 296 if (requireLock) { 297 coordinator.lock { 298 try { 299 readDataOrHandleCorruption(hasWriteFileLock = true) 300 } catch (ex: Throwable) { 301 ReadException<T>(ex, coordinator.getVersion()) 302 } to true 303 } 304 } else { 305 coordinator.tryLock { locked -> 306 try { 307 readDataOrHandleCorruption(locked) 308 } catch (ex: Throwable) { 309 ReadException<T>( 310 ex, 311 if (locked) coordinator.getVersion() else cachedVersion 312 ) 313 } to locked 314 } 315 } 316 if (acquiredLock) { 317 inMemoryCache.tryUpdate(newState) 318 } 319 return newState 320 } 321 322 // Caller is responsible for (try to) getting file lock. It reads from the file directly without 323 // checking shared counter version and returns serializer default value if file is not found. 324 private suspend fun readDataFromFileOrDefault(): T { 325 return storageConnection.readData() 326 } 327 328 private suspend fun transformAndWrite( 329 transform: suspend (t: T) -> T, 330 callerContext: CoroutineContext 331 ): T = 332 coordinator.lock { 333 val curData = readDataOrHandleCorruption(hasWriteFileLock = true) 334 val newData = withContext(callerContext) { transform(curData.value) } 335 336 // Check that curData has not changed... 337 curData.checkHashCode() 338 339 if (curData.value != newData) { 340 writeData(newData, updateCache = true) 341 } 342 newData 343 } 344 345 // Write data to disk and return the corresponding version if succeed. 346 internal suspend fun writeData(newData: T, updateCache: Boolean): Int { 347 var newVersion = 0 348 349 // The code in `writeScope` is run synchronously, i.e. the newVersion isn't returned until 350 // the code in `writeScope` completes. 351 storageConnection.writeScope { 352 // update version before write to file to avoid the case where if update version after 353 // file write, the process can crash after file write but before version increment, so 354 // the readers might skip reading forever because the version isn't changed 355 newVersion = coordinator.incrementAndGetVersion() 356 writeData(newData) 357 if (updateCache) { 358 inMemoryCache.tryUpdate(Data(newData, newData.hashCode(), newVersion)) 359 } 360 } 361 362 return newVersion 363 } 364 365 private suspend fun readDataOrHandleCorruption(hasWriteFileLock: Boolean): Data<T> { 366 try { 367 return if (hasWriteFileLock) { 368 val data = readDataFromFileOrDefault() 369 Data(data, data.hashCode(), version = coordinator.getVersion()) 370 } else { 371 val preLockVersion = coordinator.getVersion() 372 coordinator.tryLock { locked -> 373 val data = readDataFromFileOrDefault() 374 val version = if (locked) coordinator.getVersion() else preLockVersion 375 Data(data, data.hashCode(), version) 376 } 377 } 378 } catch (ex: CorruptionException) { 379 var newData: T = corruptionHandler.handleCorruption(ex) 380 var version: Int // initialized inside the try block 381 382 try { 383 doWithWriteFileLock(hasWriteFileLock) { 384 // Confirms the file is still corrupted before overriding 385 try { 386 newData = readDataFromFileOrDefault() 387 version = coordinator.getVersion() 388 } catch (ignoredEx: CorruptionException) { 389 version = writeData(newData, updateCache = true) 390 } 391 } 392 } catch (writeEx: Throwable) { 393 // If we fail to write the handled data, add the new exception as a suppressed 394 // exception. 395 ex.addSuppressed(writeEx) 396 throw ex 397 } 398 399 // If we reach this point, we've successfully replaced the data on disk with newData. 400 return Data(newData, newData.hashCode(), version) 401 } 402 } 403 404 @OptIn(ExperimentalContracts::class) 405 @Suppress( 406 "LEAKED_IN_PLACE_LAMBDA", 407 "WRONG_INVOCATION_KIND" 408 ) // https://youtrack.jetbrains.com/issue/KT-29963 409 private suspend fun <R> doWithWriteFileLock( 410 hasWriteFileLock: Boolean, 411 block: suspend () -> R 412 ): R { 413 contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } 414 return if (hasWriteFileLock) { 415 block() 416 } else { 417 coordinator.lock { block() } 418 } 419 } 420 421 private inner class InitDataStore( 422 initTasksList: List<suspend (api: InitializerApi<T>) -> Unit> 423 ) : RunOnce() { 424 // cleaned after initialization is complete 425 private var initTasks: List<suspend (api: InitializerApi<T>) -> Unit>? = 426 initTasksList.toList() 427 428 override suspend fun doRun() { 429 val initData = 430 if ((initTasks == null) || initTasks!!.isEmpty()) { 431 // if there are no init tasks, we can directly read 432 readDataOrHandleCorruption(hasWriteFileLock = false) 433 } else { 434 // if there are init tasks, we need to obtain a lock to ensure migrations 435 // run as 1 chunk 436 coordinator.lock { 437 val updateLock = Mutex() 438 var initializationComplete = false 439 var currentData = readDataOrHandleCorruption(hasWriteFileLock = true).value 440 441 val api = 442 object : InitializerApi<T> { 443 override suspend fun updateData(transform: suspend (t: T) -> T): T { 444 return updateLock.withLock { 445 check(!initializationComplete) { 446 "InitializerApi.updateData should not be called after " + 447 "initialization is complete." 448 } 449 450 val newData = transform(currentData) 451 if (newData != currentData) { 452 writeData(newData, updateCache = false) 453 currentData = newData 454 } 455 456 currentData 457 } 458 } 459 } 460 461 initTasks?.forEach { it(api) } 462 // Init tasks have run successfully, we don't need them anymore. 463 initTasks = null 464 updateLock.withLock { initializationComplete = true } 465 // only to make compiler happy 466 Data( 467 value = currentData, 468 hashCode = currentData.hashCode(), 469 version = coordinator.getVersion() 470 ) 471 } 472 } 473 inMemoryCache.tryUpdate(initData) 474 } 475 } 476 477 companion object { 478 private const val BUG_MESSAGE = 479 "This is a bug in DataStore. Please file a bug at: " + 480 "https://issuetracker.google.com/issues/new?component=907884&template=1466542" 481 } 482 } 483 484 /** 485 * Helper class that executes [doRun] up to 1 time to completion. If it fails, it will be retried in 486 * the next [runIfNeeded] call. 487 */ 488 internal abstract class RunOnce { 489 private val runMutex = Mutex() 490 private val didRun = CompletableDeferred<Unit>() 491 doRunnull492 protected abstract suspend fun doRun() 493 494 suspend fun awaitComplete() = didRun.await() 495 496 suspend fun runIfNeeded() { 497 if (didRun.isCompleted) return 498 runMutex.withLock { 499 if (didRun.isCompleted) return 500 doRun() 501 didRun.complete(Unit) 502 } 503 } 504 } 505 506 /** 507 * [CoroutineContext.Element] that is added to the coroutineContext when [DataStore.updateData] is 508 * called to detect nested calls. b/241760537 (see: [DataStoreImpl.updateData]) 509 * 510 * It is OK for different DataStore instances to nest updateData calls, they just cannot be on the 511 * same DataStore. To track these instances, each [UpdatingDataContextElement] holds a reference to 512 * a parent. 513 */ 514 internal class UpdatingDataContextElement( 515 private val parent: UpdatingDataContextElement?, 516 private val instance: DataStoreImpl<*> 517 ) : CoroutineContext.Element { 518 519 companion object { 520 internal val NESTED_UPDATE_ERROR_MESSAGE = 521 """ 522 Calling updateData inside updateData on the same DataStore instance is not supported 523 since updates made in the parent updateData call will not be visible to the nested 524 updateData call. See https://issuetracker.google.com/issues/241760537 for details. 525 """ 526 .trimIndent() 527 528 internal object Key : CoroutineContext.Key<UpdatingDataContextElement> 529 } 530 531 /** Checks the given [candidate] is not currently in a [DataStore.updateData] block. */ checkNotUpdatingnull532 fun checkNotUpdating(candidate: DataStore<*>) { 533 if (instance === candidate) { 534 error(NESTED_UPDATE_ERROR_MESSAGE) 535 } 536 // check the parent if it exists to detect nested calls between [DataStore] instances. 537 parent?.checkNotUpdating(candidate) 538 } 539 540 override val key: CoroutineContext.Key<*> 541 get() = Key 542 } 543