1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.server.wm; 18 19 import android.os.Process; 20 import android.os.SystemClock; 21 import android.util.Slog; 22 23 import com.android.internal.annotations.VisibleForTesting; 24 25 import java.util.ArrayList; 26 import java.util.function.Predicate; 27 28 /** 29 * The common threading logic for persisters to use so that they can run in the same threads. 30 * Methods in this class are synchronized on its instance, so caller could also synchronize on 31 * its instance to perform modifications in items. 32 */ 33 class PersisterQueue { 34 private static final String TAG = "PersisterQueue"; 35 private static final boolean DEBUG = false; 36 37 /** When not flushing don't write out files faster than this */ 38 private static final long INTER_WRITE_DELAY_MS = 500; 39 40 /** 41 * When not flushing delay this long before writing the first file out. This gives the next task 42 * being launched a chance to load its resources without this occupying IO bandwidth. 43 */ 44 private static final long PRE_TASK_DELAY_MS = 3000; 45 46 /** The maximum number of entries to keep in the queue before draining it automatically. */ 47 private static final int MAX_WRITE_QUEUE_LENGTH = 6; 48 49 /** Special value for mWriteTime to mean don't wait, just write */ 50 private static final long FLUSH_QUEUE = -1; 51 52 /** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link 53 * Listener#onPreProcessItem}. */ 54 static final WriteQueueItem EMPTY_ITEM = () -> { }; 55 56 private final long mInterWriteDelayMs; 57 private final long mPreTaskDelayMs; 58 private final LazyTaskWriterThread mLazyTaskWriterThread; 59 private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>(); 60 61 private final ArrayList<Listener> mListeners = new ArrayList<>(); 62 63 /** 64 * Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes 65 * until the image queue is drained and all tasks needing persisting are written to disk. There 66 * is no delay between writes. == 0 We are Idle. Next writes will be delayed by 67 * #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent 68 * writes will be delayed by #INTER_WRITE_DELAY_MS. 69 */ 70 private long mNextWriteTime = 0; 71 PersisterQueue()72 PersisterQueue() { 73 this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS); 74 } 75 76 /** Used for tests to reduce waiting time. */ 77 @VisibleForTesting PersisterQueue(long interWriteDelayMs, long preTaskDelayMs)78 PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) { 79 if (interWriteDelayMs < 0 || preTaskDelayMs < 0) { 80 throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to" 81 + "be non-negative. inter-write delay: " + interWriteDelayMs 82 + "ms pre-task delay: " + preTaskDelayMs); 83 } 84 mInterWriteDelayMs = interWriteDelayMs; 85 mPreTaskDelayMs = preTaskDelayMs; 86 mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread"); 87 } 88 startPersisting()89 synchronized void startPersisting() { 90 if (!mLazyTaskWriterThread.isAlive()) { 91 mLazyTaskWriterThread.start(); 92 } 93 } 94 95 /** Stops persisting thread. Should only be used in tests. */ 96 @VisibleForTesting stopPersisting()97 void stopPersisting() throws InterruptedException { 98 if (!mLazyTaskWriterThread.isAlive()) { 99 return; 100 } 101 102 synchronized (this) { 103 mLazyTaskWriterThread.interrupt(); 104 } 105 mLazyTaskWriterThread.join(); 106 } 107 addItem(WriteQueueItem item, boolean flush)108 synchronized void addItem(WriteQueueItem item, boolean flush) { 109 mWriteQueue.add(item); 110 111 if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) { 112 mNextWriteTime = FLUSH_QUEUE; 113 } else if (mNextWriteTime == 0) { 114 mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs; 115 } 116 notify(); 117 } 118 findLastItem(Predicate<T> predicate, Class<T> clazz)119 synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) { 120 for (int i = mWriteQueue.size() - 1; i >= 0; --i) { 121 WriteQueueItem writeQueueItem = mWriteQueue.get(i); 122 if (clazz.isInstance(writeQueueItem)) { 123 T item = clazz.cast(writeQueueItem); 124 if (predicate.test(item)) { 125 return item; 126 } 127 } 128 } 129 130 return null; 131 } 132 133 /** 134 * Updates the last item found in the queue that matches the given item, or adds it to the end 135 * of the queue if no such item is found. 136 */ updateLastOrAddItem(T item, boolean flush)137 synchronized <T extends WriteQueueItem> void updateLastOrAddItem(T item, boolean flush) { 138 final T itemToUpdate = findLastItem(item::matches, (Class<T>) item.getClass()); 139 if (itemToUpdate == null) { 140 addItem(item, flush); 141 } else { 142 itemToUpdate.updateFrom(item); 143 } 144 145 yieldIfQueueTooDeep(); 146 } 147 148 /** 149 * Removes all items with which given predicate returns {@code true}. 150 */ removeItems(Predicate<T> predicate, Class<T> clazz)151 synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate, 152 Class<T> clazz) { 153 for (int i = mWriteQueue.size() - 1; i >= 0; --i) { 154 WriteQueueItem writeQueueItem = mWriteQueue.get(i); 155 if (clazz.isInstance(writeQueueItem)) { 156 T item = clazz.cast(writeQueueItem); 157 if (predicate.test(item)) { 158 if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue."); 159 mWriteQueue.remove(i); 160 } 161 } 162 } 163 } 164 flush()165 synchronized void flush() { 166 mNextWriteTime = FLUSH_QUEUE; 167 notifyAll(); 168 do { 169 try { 170 wait(); 171 } catch (InterruptedException e) { 172 } 173 } while (mNextWriteTime == FLUSH_QUEUE); 174 } 175 yieldIfQueueTooDeep()176 void yieldIfQueueTooDeep() { 177 boolean stall = false; 178 synchronized (this) { 179 if (mNextWriteTime == FLUSH_QUEUE) { 180 stall = true; 181 } 182 } 183 if (stall) { 184 Thread.yield(); 185 } 186 } 187 addListener(Listener listener)188 void addListener(Listener listener) { 189 mListeners.add(listener); 190 } 191 192 @VisibleForTesting removeListener(Listener listener)193 boolean removeListener(Listener listener) { 194 return mListeners.remove(listener); 195 } 196 processNextItem()197 private void processNextItem() throws InterruptedException { 198 // This part is extracted into a method so that the GC can clearly see the end of the 199 // scope of the variable 'item'. If this part was in the loop in LazyTaskWriterThread, the 200 // last item it processed would always "leak". 201 // See https://b.corp.google.com/issues/64438652#comment7 202 203 // If mNextWriteTime, then don't delay between each call to saveToXml(). 204 final WriteQueueItem item; 205 synchronized (this) { 206 if (mNextWriteTime != FLUSH_QUEUE) { 207 // The next write we don't have to wait so long. 208 mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs; 209 if (DEBUG) { 210 Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs 211 + " msec. (" + mNextWriteTime + ")"); 212 } 213 } 214 215 while (mWriteQueue.isEmpty()) { 216 if (mNextWriteTime != 0) { 217 mNextWriteTime = 0; // idle. 218 notify(); // May need to wake up flush(). 219 } 220 // Make sure we exit this thread correctly when interrupted before going to 221 // indefinite wait. 222 if (Thread.currentThread().isInterrupted()) { 223 throw new InterruptedException(); 224 } 225 if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely."); 226 wait(); 227 // Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS 228 // from now. 229 } 230 item = mWriteQueue.remove(0); 231 232 long now = SystemClock.uptimeMillis(); 233 if (DEBUG) { 234 Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime 235 + " mWriteQueue.size=" + mWriteQueue.size()); 236 } 237 while (now < mNextWriteTime) { 238 if (DEBUG) { 239 Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now)); 240 } 241 wait(mNextWriteTime - now); 242 now = SystemClock.uptimeMillis(); 243 } 244 245 // Got something to do. 246 } 247 248 item.process(); 249 } 250 251 interface WriteQueueItem<T extends WriteQueueItem<T>> { process()252 void process(); 253 updateFrom(T item)254 default void updateFrom(T item) {} 255 matches(T item)256 default boolean matches(T item) { 257 return false; 258 } 259 } 260 261 interface Listener { 262 /** 263 * Called before {@link PersisterQueue} tries to process next item. 264 * 265 * Note if the queue is empty, this callback will be called before the indefinite wait. This 266 * will be called once when {@link PersisterQueue} starts the internal thread before the 267 * indefinite wait. 268 * 269 * This callback is called w/o locking the instance of {@link PersisterQueue}. 270 * 271 * @param queueEmpty {@code true} if the queue is empty, which indicates {@link 272 * PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still 273 * item to process. 274 */ onPreProcessItem(boolean queueEmpty)275 void onPreProcessItem(boolean queueEmpty); 276 } 277 278 private class LazyTaskWriterThread extends Thread { 279 LazyTaskWriterThread(String name)280 private LazyTaskWriterThread(String name) { 281 super(name); 282 } 283 284 @Override run()285 public void run() { 286 Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); 287 try { 288 while (true) { 289 final boolean probablyDone; 290 synchronized (PersisterQueue.this) { 291 probablyDone = mWriteQueue.isEmpty(); 292 } 293 294 for (int i = mListeners.size() - 1; i >= 0; --i) { 295 mListeners.get(i).onPreProcessItem(probablyDone); 296 } 297 298 processNextItem(); 299 } 300 } catch (InterruptedException e) { 301 Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but" 302 + "it's OK in tests."); 303 } 304 } 305 } 306 } 307