1 /*
<lambda>null2  * Copyright 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.datastore.core.okio
18 
19 import androidx.datastore.core.InterProcessCoordinator
20 import androidx.datastore.core.ReadScope
21 import androidx.datastore.core.Storage
22 import androidx.datastore.core.StorageConnection
23 import androidx.datastore.core.WriteScope
24 import androidx.datastore.core.createSingleProcessCoordinator
25 import androidx.datastore.core.use
26 import kotlinx.coroutines.sync.Mutex
27 import kotlinx.coroutines.sync.withLock
28 import okio.FileNotFoundException
29 import okio.FileSystem
30 import okio.IOException
31 import okio.Path
32 import okio.buffer
33 import okio.use
34 
35 /**
36  * OKIO implementation of the Storage interface, providing cross platform IO using the OKIO library.
37  *
38  * @param fileSystem The file system to perform IO operations on.
39  * @param serializer The serializer for `T`.
40  * @param coordinatorProducer The producer to provide [InterProcessCoordinator] that coordinates IO
41  *   operations across processes if needed. By default it provides single process coordinator, which
42  *   doesn't support cross process use cases.
43  * @param producePath The file producer that returns the file path that will be read and written.
44  */
45 public class OkioStorage<T>(
46     private val fileSystem: FileSystem,
47     private val serializer: OkioSerializer<T>,
48     private val coordinatorProducer: (Path, FileSystem) -> InterProcessCoordinator = { path, _ ->
49         createSingleProcessCoordinator(path)
50     },
51     private val producePath: () -> Path
52 ) : Storage<T> {
<lambda>null53     private val canonicalPath by lazy {
54         val path = producePath()
55         check(path.isAbsolute) {
56             "OkioStorage requires absolute paths, but did not get an absolute path from " +
57                 "producePath = $producePath, instead got $path"
58         }
59         path.normalized()
60     }
61 
createConnectionnull62     override fun createConnection(): StorageConnection<T> {
63         canonicalPath.toString().let { path ->
64             activeFilesLock.withLock {
65                 check(!activeFiles.contains(path)) {
66                     "There are multiple DataStores active for the same file: $path. You should " +
67                         "either maintain your DataStore as a singleton or confirm that there is " +
68                         "no two DataStore's active on the same file (by confirming that the scope" +
69                         " is cancelled)."
70                 }
71                 activeFiles.add(path)
72             }
73         }
74         return OkioStorageConnection(
75             fileSystem,
76             canonicalPath,
77             serializer,
78             coordinatorProducer(canonicalPath, fileSystem)
79         ) {
80             activeFilesLock.withLock { activeFiles.remove(canonicalPath.toString()) }
81         }
82     }
83 
84     internal companion object {
85         internal val activeFiles = mutableSetOf<String>()
86         val activeFilesLock = Synchronizer()
87     }
88 }
89 
90 internal class OkioStorageConnection<T>(
91     private val fileSystem: FileSystem,
92     private val path: Path,
93     private val serializer: OkioSerializer<T>,
94     override val coordinator: InterProcessCoordinator,
95     private val onClose: () -> Unit
96 ) : StorageConnection<T> {
97 
98     private val closed = AtomicBoolean(false)
99 
100     // TODO:(b/233402915) support multiple readers
101     private val transactionMutex = Mutex()
102 
103     // TODO(b/394876261): Add exception handling for exceptions thrown due to direct boot.
readScopenull104     override suspend fun <R> readScope(block: suspend ReadScope<T>.(locked: Boolean) -> R): R {
105         checkNotClosed()
106 
107         val lock = transactionMutex.tryLock()
108         try {
109             OkioReadScope(fileSystem, path, serializer).use {
110                 return block(it, lock)
111             }
112         } finally {
113             if (lock) {
114                 transactionMutex.unlock()
115             }
116         }
117     }
118 
119     // TODO(b/394876261): Add exception handling for exceptions thrown due to direct boot.
writeScopenull120     override suspend fun writeScope(block: suspend WriteScope<T>.() -> Unit) {
121         checkNotClosed()
122         val parentDir = path.parent ?: error("must have a parent path")
123         fileSystem.createDirectories(dir = parentDir, mustCreate = false)
124         transactionMutex.withLock {
125             val scratchPath = parentDir / "${path.name}.tmp"
126             try {
127                 fileSystem.delete(path = scratchPath, mustExist = false)
128                 OkioWriteScope(fileSystem, scratchPath, serializer).use { block(it) }
129                 if (fileSystem.exists(scratchPath)) {
130                     fileSystem.atomicMove(scratchPath, path)
131                 }
132             } catch (ex: IOException) {
133                 if (fileSystem.exists(scratchPath)) {
134                     try {
135                         fileSystem.delete(scratchPath)
136                     } catch (e: IOException) {
137                         // swallow failure to delete
138                     }
139                 }
140                 throw ex
141             }
142         }
143     }
144 
checkNotClosednull145     private fun checkNotClosed() {
146         check(!closed.get()) { "StorageConnection has already been disposed." }
147     }
148 
closenull149     override fun close() {
150         closed.set(true)
151         onClose()
152     }
153 }
154 
155 internal open class OkioReadScope<T>(
156     protected val fileSystem: FileSystem,
157     protected val path: Path,
158     protected val serializer: OkioSerializer<T>
159 ) : ReadScope<T> {
160 
161     private val closed = AtomicBoolean(false)
162 
readDatanull163     override suspend fun readData(): T {
164         checkClose()
165 
166         return try {
167             fileSystem.read(file = path) { serializer.readFrom(this) }
168         } catch (ex: FileNotFoundException) {
169             if (fileSystem.exists(path)) {
170                 // Attempt a second read in case a race condition resulted in the file being created
171                 // by a different process. If we can't read again, a FileNotFoundException is
172                 // thrown.
173                 fileSystem.read(file = path) { serializer.readFrom(this) }
174             } else {
175                 // File does not exist, return default value.
176                 serializer.defaultValue
177             }
178         }
179     }
180 
closenull181     override fun close() {
182         closed.set(true)
183     }
184 
checkClosenull185     protected fun checkClose() {
186         check(!closed.get()) { "This scope has already been closed." }
187     }
188 }
189 
190 internal class OkioWriteScope<T>(
191     fileSystem: FileSystem,
192     path: Path,
193     serializer: OkioSerializer<T>
194 ) : OkioReadScope<T>(fileSystem, path, serializer), WriteScope<T> {
195 
writeDatanull196     override suspend fun writeData(value: T) {
197         checkClose()
198         val fileHandle = fileSystem.openReadWrite(path)
199         fileHandle.use { handle ->
200             handle.sink().buffer().use { sink ->
201                 serializer.writeTo(value, sink)
202                 handle.flush()
203             }
204         }
205     }
206 }
207 
208 /**
209  * Create a coordinator for single process use cases.
210  *
211  * @param path The canonical path of the file managed by [createSingleProcessCoordinator]
212  */
createSingleProcessCoordinatornull213 public fun createSingleProcessCoordinator(path: Path): InterProcessCoordinator =
214     createSingleProcessCoordinator(path.normalized().toString())
215