1 /*
<lambda>null2 * Copyright 2023 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 @file:JvmName("MultiProcessCoordinatorKt") // Workaround for b/313964643
18
19 package androidx.datastore.core
20
21 import java.io.File
22 import java.io.FileInputStream
23 import java.io.FileOutputStream
24 import java.io.IOException
25 import java.nio.channels.FileLock
26 import kotlin.contracts.ExperimentalContracts
27 import kotlin.coroutines.CoroutineContext
28 import kotlinx.coroutines.delay
29 import kotlinx.coroutines.flow.Flow
30 import kotlinx.coroutines.sync.Mutex
31 import kotlinx.coroutines.sync.withLock
32 import kotlinx.coroutines.withContext
33
34 internal class MultiProcessCoordinator(
35 private val context: CoroutineContext,
36 protected val file: File
37 ) : InterProcessCoordinator {
38 // TODO(b/269375542): the flow should `flowOn` the provided [context]
39 override val updateNotifications: Flow<Unit> = MulticastFileObserver.observe(file)
40
41 // run block with the exclusive lock
42 override suspend fun <T> lock(block: suspend () -> T): T {
43 inMemoryMutex.withLock {
44 FileOutputStream(lockFile).use { lockFileStream ->
45 var lock: FileLock? = null
46 try {
47 lock = getExclusiveFileLockWithRetryIfDeadlock(lockFileStream)
48 return block()
49 } finally {
50 lock?.release()
51 }
52 }
53 }
54 }
55
56 // run block with an attempt to get the exclusive lock, still run even if
57 // attempt fails. Pass a boolean to indicate if the attempt succeeds.
58 @OptIn(ExperimentalContracts::class) // withTryLock
59 override suspend fun <T> tryLock(block: suspend (Boolean) -> T): T {
60 inMemoryMutex.withTryLock<T> {
61 if (it == false) {
62 return block(false)
63 }
64 FileInputStream(lockFile).use { lockFileStream ->
65 var lock: FileLock? = null
66 try {
67 try {
68 lock =
69 lockFileStream
70 .getChannel()
71 .tryLock(
72 /* position= */ 0L,
73 /* size= */ Long.MAX_VALUE,
74 /* shared= */ true
75 )
76 } catch (ex: IOException) {
77 // TODO(b/255419657): Update the shared lock IOException handling logic for
78 // KMM.
79
80 // Some platforms / OS do not support shared lock and convert shared lock
81 // requests to exclusive lock requests. If the lock can't be acquired, it
82 // will throw an IOException with EAGAIN error, instead of returning null as
83 // specified in {@link FileChannel#tryLock}. We only continue if the error
84 // message is EAGAIN, otherwise just throw it.
85 if (
86 (ex.message?.startsWith(LOCK_ERROR_MESSAGE) != true) &&
87 (ex.message?.startsWith(DEADLOCK_ERROR_MESSAGE) != true)
88 ) {
89 throw ex
90 }
91 }
92 return block(lock != null)
93 } finally {
94 lock?.release()
95 }
96 }
97 }
98 }
99
100 // get the current version
101 override suspend fun getVersion(): Int {
102 // Only switch coroutine if sharedCounter is not initialized because initialization incurs
103 // disk IO
104 return withLazyCounter { it.getValue() }
105 }
106
107 // increment version and return the new one
108 override suspend fun incrementAndGetVersion(): Int {
109 // Only switch coroutine if sharedCounter is not initialized because initialization incurs
110 // disk IO
111 return withLazyCounter { it.incrementAndGetValue() }
112 }
113
114 private val LOCK_SUFFIX = ".lock"
115 private val VERSION_SUFFIX = ".version"
116 private val LOCK_ERROR_MESSAGE = "fcntl failed: EAGAIN"
117
118 private val inMemoryMutex = Mutex()
119 private val lockFile: File by lazy {
120 val lockFile = fileWithSuffix(LOCK_SUFFIX)
121 lockFile.createIfNotExists()
122 lockFile
123 }
124
125 private val lazySharedCounter = lazy {
126 SharedCounter.loadLib()
127 SharedCounter.create {
128 val versionFile = fileWithSuffix(VERSION_SUFFIX)
129 versionFile.createIfNotExists()
130 versionFile
131 }
132 }
133 private val sharedCounter by lazySharedCounter
134
135 private fun fileWithSuffix(suffix: String): File {
136 return File(file.absolutePath + suffix)
137 }
138
139 private fun File.createIfNotExists() {
140 createParentDirectories()
141 if (!exists()) {
142 createNewFile()
143 }
144 }
145
146 private fun File.createParentDirectories() {
147 val parent: File? = canonicalFile.parentFile
148
149 parent?.let {
150 it.mkdirs()
151 if (!it.isDirectory) {
152 throw IOException("Unable to create parent directories of $this")
153 }
154 }
155 }
156
157 /**
158 * {@link SharedCounter} needs to be initialized in a separate coroutine so it does not violate
159 * StrictMode policy in the main thread.
160 */
161 private suspend inline fun <T> withLazyCounter(
162 crossinline block: suspend (SharedCounter) -> T
163 ): T {
164 return if (lazySharedCounter.isInitialized()) {
165 block(sharedCounter)
166 } else {
167 withContext(context) { block(sharedCounter) }
168 }
169 }
170
171 companion object {
172 // Retry with exponential backoff to get file lock if it hits "Resource deadlock would
173 // occur" error until the backoff reaches [MAX_WAIT_MILLIS].
174 private suspend fun getExclusiveFileLockWithRetryIfDeadlock(
175 lockFileStream: FileOutputStream
176 ): FileLock {
177 var backoff = INITIAL_WAIT_MILLIS
178 while (backoff <= MAX_WAIT_MILLIS) {
179 try {
180 return lockFileStream.getChannel().lock(0L, Long.MAX_VALUE, /* shared= */ false)
181 } catch (ex: IOException) {
182 if (ex.message?.contains(DEADLOCK_ERROR_MESSAGE) != true) {
183 throw ex
184 }
185 delay(backoff)
186 backoff *= 2
187 }
188 }
189 return lockFileStream.getChannel().lock(0L, Long.MAX_VALUE, /* shared= */ false)
190 }
191
192 private val DEADLOCK_ERROR_MESSAGE = "Resource deadlock would occur"
193 private val INITIAL_WAIT_MILLIS: Long = 10
194 private val MAX_WAIT_MILLIS: Long = 60000
195 }
196 }
197
198 /**
199 * Create a coordinator for multiple process use cases.
200 *
201 * @param context the coroutine context to be used by the [MultiProcessCoordinator] for IO
202 * operations.
203 * @param file the File in which [DataStore] stores the data.
204 */
205 @Suppress("StreamFiles")
createMultiProcessCoordinatornull206 fun createMultiProcessCoordinator(context: CoroutineContext, file: File): InterProcessCoordinator =
207 MultiProcessCoordinator(context, file)
208