1 /*
<lambda>null2 * Copyright 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package androidx.datastore.core.okio
18
19 import androidx.datastore.core.InterProcessCoordinator
20 import androidx.datastore.core.ReadScope
21 import androidx.datastore.core.Storage
22 import androidx.datastore.core.StorageConnection
23 import androidx.datastore.core.WriteScope
24 import androidx.datastore.core.createSingleProcessCoordinator
25 import androidx.datastore.core.use
26 import kotlinx.coroutines.sync.Mutex
27 import kotlinx.coroutines.sync.withLock
28 import okio.FileNotFoundException
29 import okio.FileSystem
30 import okio.IOException
31 import okio.Path
32 import okio.buffer
33 import okio.use
34
35 /**
36 * OKIO implementation of the Storage interface, providing cross platform IO using the OKIO library.
37 *
38 * @param fileSystem The file system to perform IO operations on.
39 * @param serializer The serializer for `T`.
40 * @param coordinatorProducer The producer to provide [InterProcessCoordinator] that coordinates IO
41 * operations across processes if needed. By default it provides single process coordinator, which
42 * doesn't support cross process use cases.
43 * @param producePath The file producer that returns the file path that will be read and written.
44 */
45 public class OkioStorage<T>(
46 private val fileSystem: FileSystem,
47 private val serializer: OkioSerializer<T>,
48 private val coordinatorProducer: (Path, FileSystem) -> InterProcessCoordinator = { path, _ ->
49 createSingleProcessCoordinator(path)
50 },
51 private val producePath: () -> Path
52 ) : Storage<T> {
<lambda>null53 private val canonicalPath by lazy {
54 val path = producePath()
55 check(path.isAbsolute) {
56 "OkioStorage requires absolute paths, but did not get an absolute path from " +
57 "producePath = $producePath, instead got $path"
58 }
59 path.normalized()
60 }
61
createConnectionnull62 override fun createConnection(): StorageConnection<T> {
63 canonicalPath.toString().let { path ->
64 activeFilesLock.withLock {
65 check(!activeFiles.contains(path)) {
66 "There are multiple DataStores active for the same file: $path. You should " +
67 "either maintain your DataStore as a singleton or confirm that there is " +
68 "no two DataStore's active on the same file (by confirming that the scope" +
69 " is cancelled)."
70 }
71 activeFiles.add(path)
72 }
73 }
74 return OkioStorageConnection(
75 fileSystem,
76 canonicalPath,
77 serializer,
78 coordinatorProducer(canonicalPath, fileSystem)
79 ) {
80 activeFilesLock.withLock { activeFiles.remove(canonicalPath.toString()) }
81 }
82 }
83
84 internal companion object {
85 internal val activeFiles = mutableSetOf<String>()
86 val activeFilesLock = Synchronizer()
87 }
88 }
89
90 internal class OkioStorageConnection<T>(
91 private val fileSystem: FileSystem,
92 private val path: Path,
93 private val serializer: OkioSerializer<T>,
94 override val coordinator: InterProcessCoordinator,
95 private val onClose: () -> Unit
96 ) : StorageConnection<T> {
97
98 private val closed = AtomicBoolean(false)
99
100 // TODO:(b/233402915) support multiple readers
101 private val transactionMutex = Mutex()
102
103 // TODO(b/394876261): Add exception handling for exceptions thrown due to direct boot.
readScopenull104 override suspend fun <R> readScope(block: suspend ReadScope<T>.(locked: Boolean) -> R): R {
105 checkNotClosed()
106
107 val lock = transactionMutex.tryLock()
108 try {
109 OkioReadScope(fileSystem, path, serializer).use {
110 return block(it, lock)
111 }
112 } finally {
113 if (lock) {
114 transactionMutex.unlock()
115 }
116 }
117 }
118
119 // TODO(b/394876261): Add exception handling for exceptions thrown due to direct boot.
writeScopenull120 override suspend fun writeScope(block: suspend WriteScope<T>.() -> Unit) {
121 checkNotClosed()
122 val parentDir = path.parent ?: error("must have a parent path")
123 fileSystem.createDirectories(dir = parentDir, mustCreate = false)
124 transactionMutex.withLock {
125 val scratchPath = parentDir / "${path.name}.tmp"
126 try {
127 fileSystem.delete(path = scratchPath, mustExist = false)
128 OkioWriteScope(fileSystem, scratchPath, serializer).use { block(it) }
129 if (fileSystem.exists(scratchPath)) {
130 fileSystem.atomicMove(scratchPath, path)
131 }
132 } catch (ex: IOException) {
133 if (fileSystem.exists(scratchPath)) {
134 try {
135 fileSystem.delete(scratchPath)
136 } catch (e: IOException) {
137 // swallow failure to delete
138 }
139 }
140 throw ex
141 }
142 }
143 }
144
checkNotClosednull145 private fun checkNotClosed() {
146 check(!closed.get()) { "StorageConnection has already been disposed." }
147 }
148
closenull149 override fun close() {
150 closed.set(true)
151 onClose()
152 }
153 }
154
155 internal open class OkioReadScope<T>(
156 protected val fileSystem: FileSystem,
157 protected val path: Path,
158 protected val serializer: OkioSerializer<T>
159 ) : ReadScope<T> {
160
161 private val closed = AtomicBoolean(false)
162
readDatanull163 override suspend fun readData(): T {
164 checkClose()
165
166 return try {
167 fileSystem.read(file = path) { serializer.readFrom(this) }
168 } catch (ex: FileNotFoundException) {
169 if (fileSystem.exists(path)) {
170 // Attempt a second read in case a race condition resulted in the file being created
171 // by a different process. If we can't read again, a FileNotFoundException is
172 // thrown.
173 fileSystem.read(file = path) { serializer.readFrom(this) }
174 } else {
175 // File does not exist, return default value.
176 serializer.defaultValue
177 }
178 }
179 }
180
closenull181 override fun close() {
182 closed.set(true)
183 }
184
checkClosenull185 protected fun checkClose() {
186 check(!closed.get()) { "This scope has already been closed." }
187 }
188 }
189
190 internal class OkioWriteScope<T>(
191 fileSystem: FileSystem,
192 path: Path,
193 serializer: OkioSerializer<T>
194 ) : OkioReadScope<T>(fileSystem, path, serializer), WriteScope<T> {
195
writeDatanull196 override suspend fun writeData(value: T) {
197 checkClose()
198 val fileHandle = fileSystem.openReadWrite(path)
199 fileHandle.use { handle ->
200 handle.sink().buffer().use { sink ->
201 serializer.writeTo(value, sink)
202 handle.flush()
203 }
204 }
205 }
206 }
207
208 /**
209 * Create a coordinator for single process use cases.
210 *
211 * @param path The canonical path of the file managed by [createSingleProcessCoordinator]
212 */
createSingleProcessCoordinatornull213 public fun createSingleProcessCoordinator(path: Path): InterProcessCoordinator =
214 createSingleProcessCoordinator(path.normalized().toString())
215