1 /*
<lambda>null2  * Copyright 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:JvmName("MultiProcessCoordinatorKt") // Workaround for b/313964643
18 
19 package androidx.datastore.core
20 
21 import java.io.File
22 import java.io.FileInputStream
23 import java.io.FileOutputStream
24 import java.io.IOException
25 import java.nio.channels.FileLock
26 import kotlin.contracts.ExperimentalContracts
27 import kotlin.coroutines.CoroutineContext
28 import kotlinx.coroutines.delay
29 import kotlinx.coroutines.flow.Flow
30 import kotlinx.coroutines.sync.Mutex
31 import kotlinx.coroutines.sync.withLock
32 import kotlinx.coroutines.withContext
33 
34 internal class MultiProcessCoordinator(
35     private val context: CoroutineContext,
36     protected val file: File
37 ) : InterProcessCoordinator {
38     // TODO(b/269375542): the flow should `flowOn` the provided [context]
39     override val updateNotifications: Flow<Unit> = MulticastFileObserver.observe(file)
40 
41     // run block with the exclusive lock
42     override suspend fun <T> lock(block: suspend () -> T): T {
43         inMemoryMutex.withLock {
44             FileOutputStream(lockFile).use { lockFileStream ->
45                 var lock: FileLock? = null
46                 try {
47                     lock = getExclusiveFileLockWithRetryIfDeadlock(lockFileStream)
48                     return block()
49                 } finally {
50                     lock?.release()
51                 }
52             }
53         }
54     }
55 
56     // run block with an attempt to get the exclusive lock, still run even if
57     // attempt fails. Pass a boolean to indicate if the attempt succeeds.
58     @OptIn(ExperimentalContracts::class) // withTryLock
59     override suspend fun <T> tryLock(block: suspend (Boolean) -> T): T {
60         inMemoryMutex.withTryLock<T> {
61             if (it == false) {
62                 return block(false)
63             }
64             FileInputStream(lockFile).use { lockFileStream ->
65                 var lock: FileLock? = null
66                 try {
67                     try {
68                         lock =
69                             lockFileStream
70                                 .getChannel()
71                                 .tryLock(
72                                     /* position= */ 0L,
73                                     /* size= */ Long.MAX_VALUE,
74                                     /* shared= */ true
75                                 )
76                     } catch (ex: IOException) {
77                         // TODO(b/255419657): Update the shared lock IOException handling logic for
78                         // KMM.
79 
80                         // Some platforms / OS do not support shared lock and convert shared lock
81                         // requests to exclusive lock requests. If the lock can't be acquired, it
82                         // will throw an IOException with EAGAIN error, instead of returning null as
83                         // specified in {@link FileChannel#tryLock}. We only continue if the error
84                         // message is EAGAIN, otherwise just throw it.
85                         if (
86                             (ex.message?.startsWith(LOCK_ERROR_MESSAGE) != true) &&
87                                 (ex.message?.startsWith(DEADLOCK_ERROR_MESSAGE) != true)
88                         ) {
89                             throw ex
90                         }
91                     }
92                     return block(lock != null)
93                 } finally {
94                     lock?.release()
95                 }
96             }
97         }
98     }
99 
100     // get the current version
101     override suspend fun getVersion(): Int {
102         // Only switch coroutine if sharedCounter is not initialized because initialization incurs
103         // disk IO
104         return withLazyCounter { it.getValue() }
105     }
106 
107     // increment version and return the new one
108     override suspend fun incrementAndGetVersion(): Int {
109         // Only switch coroutine if sharedCounter is not initialized because initialization incurs
110         // disk IO
111         return withLazyCounter { it.incrementAndGetValue() }
112     }
113 
114     private val LOCK_SUFFIX = ".lock"
115     private val VERSION_SUFFIX = ".version"
116     private val LOCK_ERROR_MESSAGE = "fcntl failed: EAGAIN"
117 
118     private val inMemoryMutex = Mutex()
119     private val lockFile: File by lazy {
120         val lockFile = fileWithSuffix(LOCK_SUFFIX)
121         lockFile.createIfNotExists()
122         lockFile
123     }
124 
125     private val lazySharedCounter = lazy {
126         SharedCounter.loadLib()
127         SharedCounter.create {
128             val versionFile = fileWithSuffix(VERSION_SUFFIX)
129             versionFile.createIfNotExists()
130             versionFile
131         }
132     }
133     private val sharedCounter by lazySharedCounter
134 
135     private fun fileWithSuffix(suffix: String): File {
136         return File(file.absolutePath + suffix)
137     }
138 
139     private fun File.createIfNotExists() {
140         createParentDirectories()
141         if (!exists()) {
142             createNewFile()
143         }
144     }
145 
146     private fun File.createParentDirectories() {
147         val parent: File? = canonicalFile.parentFile
148 
149         parent?.let {
150             it.mkdirs()
151             if (!it.isDirectory) {
152                 throw IOException("Unable to create parent directories of $this")
153             }
154         }
155     }
156 
157     /**
158      * {@link SharedCounter} needs to be initialized in a separate coroutine so it does not violate
159      * StrictMode policy in the main thread.
160      */
161     private suspend inline fun <T> withLazyCounter(
162         crossinline block: suspend (SharedCounter) -> T
163     ): T {
164         return if (lazySharedCounter.isInitialized()) {
165             block(sharedCounter)
166         } else {
167             withContext(context) { block(sharedCounter) }
168         }
169     }
170 
171     companion object {
172         // Retry with exponential backoff to get file lock if it hits "Resource deadlock would
173         // occur" error until the backoff reaches [MAX_WAIT_MILLIS].
174         private suspend fun getExclusiveFileLockWithRetryIfDeadlock(
175             lockFileStream: FileOutputStream
176         ): FileLock {
177             var backoff = INITIAL_WAIT_MILLIS
178             while (backoff <= MAX_WAIT_MILLIS) {
179                 try {
180                     return lockFileStream.getChannel().lock(0L, Long.MAX_VALUE, /* shared= */ false)
181                 } catch (ex: IOException) {
182                     if (ex.message?.contains(DEADLOCK_ERROR_MESSAGE) != true) {
183                         throw ex
184                     }
185                     delay(backoff)
186                     backoff *= 2
187                 }
188             }
189             return lockFileStream.getChannel().lock(0L, Long.MAX_VALUE, /* shared= */ false)
190         }
191 
192         private val DEADLOCK_ERROR_MESSAGE = "Resource deadlock would occur"
193         private val INITIAL_WAIT_MILLIS: Long = 10
194         private val MAX_WAIT_MILLIS: Long = 60000
195     }
196 }
197 
198 /**
199  * Create a coordinator for multiple process use cases.
200  *
201  * @param context the coroutine context to be used by the [MultiProcessCoordinator] for IO
202  *   operations.
203  * @param file the File in which [DataStore] stores the data.
204  */
205 @Suppress("StreamFiles")
createMultiProcessCoordinatornull206 fun createMultiProcessCoordinator(context: CoroutineContext, file: File): InterProcessCoordinator =
207     MultiProcessCoordinator(context, file)
208