1 /* <lambda>null2 * Copyright 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.datastore.core 18 19 import android.os.FileObserver 20 import androidx.annotation.CheckResult 21 import androidx.annotation.VisibleForTesting 22 import java.io.File 23 import java.util.concurrent.CopyOnWriteArrayList 24 import kotlinx.coroutines.DisposableHandle 25 import kotlinx.coroutines.channels.awaitClose 26 import kotlinx.coroutines.channels.trySendBlocking 27 import kotlinx.coroutines.flow.channelFlow 28 29 internal typealias FileMoveObserver = (String?) -> Unit 30 31 /** 32 * A [FileObserver] wrapper that works around the Android bug that breaks observers when multiple 33 * observers are registered on the same directory. 34 * 35 * see: b/37017033, b/279997241 36 */ 37 @Suppress("DEPRECATION") 38 internal class MulticastFileObserver 39 private constructor( 40 val path: String, 41 ) : FileObserver(path, MOVED_TO) { 42 /** 43 * The actual listeners. We are using a CopyOnWriteArrayList because this field is modified by 44 * the companion object. 45 */ 46 private val delegates = CopyOnWriteArrayList<FileMoveObserver>() 47 48 override fun onEvent(event: Int, path: String?) { 49 delegates.forEach { it(path) } 50 } 51 52 companion object { 53 private val LOCK = Any() 54 55 // visible for tests to be able to validate all observers are removed at the end 56 @VisibleForTesting 57 internal val fileObservers = mutableMapOf<String, MulticastFileObserver>() 58 59 /** 60 * Returns a `Flow` that emits a `Unit` every time the give [file] is changed. It also emits 61 * a `Unit` when the file system observer is established. Note that this class only observes 62 * move events as it is the only event needed for DataStore. 63 */ 64 @CheckResult 65 fun observe(file: File) = channelFlow { 66 val flowObserver = { fileName: String? -> 67 if (fileName == file.name) { 68 // Note that, this block still be called after channel is closed as the disposal 69 // of the listener happens after the channel is closed. 70 // We don't need to check the result of `trySendBlocking` because we are not 71 // worried about missed events that happen after the channel is closed. 72 trySendBlocking(Unit) 73 } 74 } 75 val disposeListener = observe(file.parentFile!!, flowObserver) 76 // Send Unit after we create the observer on the filesystem, to denote "initialization". 77 // This is not necessary for DataStore to function but it makes it easier to control 78 // state in the MulticastFileObserverTest (e.g. test can know that the file system 79 // observer is registered before continuing with assertions). 80 send(Unit) 81 awaitClose { disposeListener.dispose() } 82 } 83 84 /** 85 * Creates a system level file observer (if needed) and starts observing the given [parent] 86 * directory. 87 * 88 * Callers should dispose the returned handle when it is done. 89 */ 90 @CheckResult 91 private fun observe(parent: File, observer: FileMoveObserver): DisposableHandle { 92 val key = parent.canonicalFile.path 93 synchronized(LOCK) { 94 val filesystemObserver = fileObservers.getOrPut(key) { MulticastFileObserver(key) } 95 filesystemObserver.delegates.add(observer) 96 if (filesystemObserver.delegates.size == 1) { 97 // start watching inside the lock so we can avoid the bug if multiple observers 98 // are registered/unregistered in parallel 99 filesystemObserver.startWatching() 100 } 101 } 102 return DisposableHandle { 103 synchronized(LOCK) { 104 fileObservers[key]?.let { filesystemObserver -> 105 filesystemObserver.delegates.remove(observer) 106 // return the instance if it needs to be stopped 107 if (filesystemObserver.delegates.isEmpty()) { 108 fileObservers.remove(key) 109 // stop watching inside the lock so we can avoid the bug if multiple 110 // observers are registered/unregistered in parallel 111 filesystemObserver.stopWatching() 112 } 113 } 114 } 115 } 116 } 117 118 /** 119 * Used in tests to cleanup all observers. There are tests that will potentially leak 120 * observers, which is usually OK but it is harmful for the tests of 121 * [MulticastFileObserver], hence we provide this API to cleanup. 122 */ 123 @VisibleForTesting 124 internal fun removeAllObservers() { 125 synchronized(LOCK) { 126 fileObservers.values.forEach { it.stopWatching() } 127 fileObservers.clear() 128 } 129 } 130 } 131 } 132