1 /*
<lambda>null2  * Copyright 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.datastore.core
18 
19 import android.os.FileObserver
20 import androidx.annotation.CheckResult
21 import androidx.annotation.VisibleForTesting
22 import java.io.File
23 import java.util.concurrent.CopyOnWriteArrayList
24 import kotlinx.coroutines.DisposableHandle
25 import kotlinx.coroutines.channels.awaitClose
26 import kotlinx.coroutines.channels.trySendBlocking
27 import kotlinx.coroutines.flow.channelFlow
28 
29 internal typealias FileMoveObserver = (String?) -> Unit
30 
31 /**
32  * A [FileObserver] wrapper that works around the Android bug that breaks observers when multiple
33  * observers are registered on the same directory.
34  *
35  * see: b/37017033, b/279997241
36  */
37 @Suppress("DEPRECATION")
38 internal class MulticastFileObserver
39 private constructor(
40     val path: String,
41 ) : FileObserver(path, MOVED_TO) {
42     /**
43      * The actual listeners. We are using a CopyOnWriteArrayList because this field is modified by
44      * the companion object.
45      */
46     private val delegates = CopyOnWriteArrayList<FileMoveObserver>()
47 
48     override fun onEvent(event: Int, path: String?) {
49         delegates.forEach { it(path) }
50     }
51 
52     companion object {
53         private val LOCK = Any()
54 
55         // visible for tests to be able to validate all observers are removed at the end
56         @VisibleForTesting
57         internal val fileObservers = mutableMapOf<String, MulticastFileObserver>()
58 
59         /**
60          * Returns a `Flow` that emits a `Unit` every time the give [file] is changed. It also emits
61          * a `Unit` when the file system observer is established. Note that this class only observes
62          * move events as it is the only event needed for DataStore.
63          */
64         @CheckResult
65         fun observe(file: File) = channelFlow {
66             val flowObserver = { fileName: String? ->
67                 if (fileName == file.name) {
68                     // Note that, this block still be called after channel is closed as the disposal
69                     // of the listener happens after the channel is closed.
70                     // We don't need to check the result of `trySendBlocking` because we are not
71                     // worried about missed events that happen after the channel is closed.
72                     trySendBlocking(Unit)
73                 }
74             }
75             val disposeListener = observe(file.parentFile!!, flowObserver)
76             // Send Unit after we create the observer on the filesystem, to denote "initialization".
77             // This is not necessary for DataStore to function but it makes it easier to control
78             // state in the MulticastFileObserverTest (e.g. test can know that the file system
79             // observer is registered before continuing with assertions).
80             send(Unit)
81             awaitClose { disposeListener.dispose() }
82         }
83 
84         /**
85          * Creates a system level file observer (if needed) and starts observing the given [parent]
86          * directory.
87          *
88          * Callers should dispose the returned handle when it is done.
89          */
90         @CheckResult
91         private fun observe(parent: File, observer: FileMoveObserver): DisposableHandle {
92             val key = parent.canonicalFile.path
93             synchronized(LOCK) {
94                 val filesystemObserver = fileObservers.getOrPut(key) { MulticastFileObserver(key) }
95                 filesystemObserver.delegates.add(observer)
96                 if (filesystemObserver.delegates.size == 1) {
97                     // start watching inside the lock so we can avoid the bug if multiple observers
98                     // are registered/unregistered in parallel
99                     filesystemObserver.startWatching()
100                 }
101             }
102             return DisposableHandle {
103                 synchronized(LOCK) {
104                     fileObservers[key]?.let { filesystemObserver ->
105                         filesystemObserver.delegates.remove(observer)
106                         // return the instance if it needs to be stopped
107                         if (filesystemObserver.delegates.isEmpty()) {
108                             fileObservers.remove(key)
109                             // stop watching inside the lock so we can avoid the bug if multiple
110                             // observers are registered/unregistered in parallel
111                             filesystemObserver.stopWatching()
112                         }
113                     }
114                 }
115             }
116         }
117 
118         /**
119          * Used in tests to cleanup all observers. There are tests that will potentially leak
120          * observers, which is usually OK but it is harmful for the tests of
121          * [MulticastFileObserver], hence we provide this API to cleanup.
122          */
123         @VisibleForTesting
124         internal fun removeAllObservers() {
125             synchronized(LOCK) {
126                 fileObservers.values.forEach { it.stopWatching() }
127                 fileObservers.clear()
128             }
129         }
130     }
131 }
132