• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.server.wm;
18 
19 import android.os.Process;
20 import android.os.SystemClock;
21 import android.util.Slog;
22 
23 import com.android.internal.annotations.VisibleForTesting;
24 
25 import java.util.ArrayList;
26 import java.util.function.Predicate;
27 
28 /**
29  * The common threading logic for persisters to use so that they can run in the same threads.
30  * Methods in this class are synchronized on its instance, so caller could also synchronize on
31  * its instance to perform modifications in items.
32  */
33 class PersisterQueue {
34     private static final String TAG = "PersisterQueue";
35     private static final boolean DEBUG = false;
36 
37     /** When not flushing don't write out files faster than this */
38     private static final long INTER_WRITE_DELAY_MS = 500;
39 
40     /**
41      * When not flushing delay this long before writing the first file out. This gives the next task
42      * being launched a chance to load its resources without this occupying IO bandwidth.
43      */
44     private static final long PRE_TASK_DELAY_MS = 3000;
45 
46     /** The maximum number of entries to keep in the queue before draining it automatically. */
47     private static final int MAX_WRITE_QUEUE_LENGTH = 6;
48 
49     /** Special value for mWriteTime to mean don't wait, just write */
50     private static final long FLUSH_QUEUE = -1;
51 
52     /** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link
53      * Listener#onPreProcessItem}. */
54     static final WriteQueueItem EMPTY_ITEM = () -> { };
55 
56     private final long mInterWriteDelayMs;
57     private final long mPreTaskDelayMs;
58     private final LazyTaskWriterThread mLazyTaskWriterThread;
59     private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>();
60 
61     private final ArrayList<Listener> mListeners = new ArrayList<>();
62 
63     /**
64      * Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes
65      * until the image queue is drained and all tasks needing persisting are written to disk. There
66      * is no delay between writes. == 0 We are Idle. Next writes will be delayed by
67      * #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent
68      * writes will be delayed by #INTER_WRITE_DELAY_MS.
69      */
70     private long mNextWriteTime = 0;
71 
PersisterQueue()72     PersisterQueue() {
73         this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS);
74     }
75 
76     /** Used for tests to reduce waiting time. */
77     @VisibleForTesting
PersisterQueue(long interWriteDelayMs, long preTaskDelayMs)78     PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) {
79         if (interWriteDelayMs < 0 || preTaskDelayMs < 0) {
80             throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to"
81                     + "be non-negative. inter-write delay: " + interWriteDelayMs
82                     + "ms pre-task delay: " + preTaskDelayMs);
83         }
84         mInterWriteDelayMs = interWriteDelayMs;
85         mPreTaskDelayMs = preTaskDelayMs;
86         mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread");
87     }
88 
startPersisting()89     synchronized void startPersisting() {
90         if (!mLazyTaskWriterThread.isAlive()) {
91             mLazyTaskWriterThread.start();
92         }
93     }
94 
95     /** Stops persisting thread. Should only be used in tests. */
96     @VisibleForTesting
stopPersisting()97     void stopPersisting() throws InterruptedException {
98         if (!mLazyTaskWriterThread.isAlive()) {
99             return;
100         }
101 
102         synchronized (this) {
103             mLazyTaskWriterThread.interrupt();
104         }
105         mLazyTaskWriterThread.join();
106     }
107 
addItem(WriteQueueItem item, boolean flush)108     synchronized void addItem(WriteQueueItem item, boolean flush) {
109         mWriteQueue.add(item);
110 
111         if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) {
112             mNextWriteTime = FLUSH_QUEUE;
113         } else if (mNextWriteTime == 0) {
114             mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs;
115         }
116         notify();
117     }
118 
findLastItem(Predicate<T> predicate, Class<T> clazz)119     synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) {
120         for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
121             WriteQueueItem writeQueueItem = mWriteQueue.get(i);
122             if (clazz.isInstance(writeQueueItem)) {
123                 T item = clazz.cast(writeQueueItem);
124                 if (predicate.test(item)) {
125                     return item;
126                 }
127             }
128         }
129 
130         return null;
131     }
132 
133     /**
134      * Updates the last item found in the queue that matches the given item, or adds it to the end
135      * of the queue if no such item is found.
136      */
updateLastOrAddItem(T item, boolean flush)137     synchronized <T extends WriteQueueItem> void updateLastOrAddItem(T item, boolean flush) {
138         final T itemToUpdate = findLastItem(item::matches, (Class<T>) item.getClass());
139         if (itemToUpdate == null) {
140             addItem(item, flush);
141         } else {
142             itemToUpdate.updateFrom(item);
143         }
144 
145         yieldIfQueueTooDeep();
146     }
147 
148     /**
149      * Removes all items with which given predicate returns {@code true}.
150      */
removeItems(Predicate<T> predicate, Class<T> clazz)151     synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate,
152             Class<T> clazz) {
153         for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
154             WriteQueueItem writeQueueItem = mWriteQueue.get(i);
155             if (clazz.isInstance(writeQueueItem)) {
156                 T item = clazz.cast(writeQueueItem);
157                 if (predicate.test(item)) {
158                     if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue.");
159                     mWriteQueue.remove(i);
160                 }
161             }
162         }
163     }
164 
flush()165     synchronized void flush() {
166         mNextWriteTime = FLUSH_QUEUE;
167         notifyAll();
168         do {
169             try {
170                 wait();
171             } catch (InterruptedException e) {
172             }
173         } while (mNextWriteTime == FLUSH_QUEUE);
174     }
175 
yieldIfQueueTooDeep()176     void yieldIfQueueTooDeep() {
177         boolean stall = false;
178         synchronized (this) {
179             if (mNextWriteTime == FLUSH_QUEUE) {
180                 stall = true;
181             }
182         }
183         if (stall) {
184             Thread.yield();
185         }
186     }
187 
addListener(Listener listener)188     void addListener(Listener listener) {
189         mListeners.add(listener);
190     }
191 
192     @VisibleForTesting
removeListener(Listener listener)193     boolean removeListener(Listener listener) {
194         return mListeners.remove(listener);
195     }
196 
processNextItem()197     private void processNextItem() throws InterruptedException {
198         // This part is extracted into a method so that the GC can clearly see the end of the
199         // scope of the variable 'item'.  If this part was in the loop in LazyTaskWriterThread, the
200         // last item it processed would always "leak".
201         // See https://b.corp.google.com/issues/64438652#comment7
202 
203         // If mNextWriteTime, then don't delay between each call to saveToXml().
204         final WriteQueueItem item;
205         synchronized (this) {
206             if (mNextWriteTime != FLUSH_QUEUE) {
207                 // The next write we don't have to wait so long.
208                 mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs;
209                 if (DEBUG) {
210                     Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs
211                             + " msec. (" + mNextWriteTime + ")");
212                 }
213             }
214 
215             while (mWriteQueue.isEmpty()) {
216                 if (mNextWriteTime != 0) {
217                     mNextWriteTime = 0; // idle.
218                     notify(); // May need to wake up flush().
219                 }
220                 // Make sure we exit this thread correctly when interrupted before going to
221                 // indefinite wait.
222                 if (Thread.currentThread().isInterrupted()) {
223                     throw new InterruptedException();
224                 }
225                 if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely.");
226                 wait();
227                 // Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS
228                 // from now.
229             }
230             item = mWriteQueue.remove(0);
231 
232             long now = SystemClock.uptimeMillis();
233             if (DEBUG) {
234                 Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime
235                         + " mWriteQueue.size=" + mWriteQueue.size());
236             }
237             while (now < mNextWriteTime) {
238                 if (DEBUG) {
239                     Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now));
240                 }
241                 wait(mNextWriteTime - now);
242                 now = SystemClock.uptimeMillis();
243             }
244 
245             // Got something to do.
246         }
247 
248         item.process();
249     }
250 
251     interface WriteQueueItem<T extends WriteQueueItem<T>> {
process()252         void process();
253 
updateFrom(T item)254         default void updateFrom(T item) {}
255 
matches(T item)256         default boolean matches(T item) {
257             return false;
258         }
259     }
260 
261     interface Listener {
262         /**
263          * Called before {@link PersisterQueue} tries to process next item.
264          *
265          * Note if the queue is empty, this callback will be called before the indefinite wait. This
266          * will be called once when {@link PersisterQueue} starts the internal thread before the
267          * indefinite wait.
268          *
269          * This callback is called w/o locking the instance of {@link PersisterQueue}.
270          *
271          * @param queueEmpty {@code true} if the queue is empty, which indicates {@link
272          * PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still
273          * item to process.
274          */
onPreProcessItem(boolean queueEmpty)275         void onPreProcessItem(boolean queueEmpty);
276     }
277 
278     private class LazyTaskWriterThread extends Thread {
279 
LazyTaskWriterThread(String name)280         private LazyTaskWriterThread(String name) {
281             super(name);
282         }
283 
284         @Override
run()285         public void run() {
286             Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
287             try {
288                 while (true) {
289                     final boolean probablyDone;
290                     synchronized (PersisterQueue.this) {
291                         probablyDone = mWriteQueue.isEmpty();
292                     }
293 
294                     for (int i = mListeners.size() - 1; i >= 0; --i) {
295                         mListeners.get(i).onPreProcessItem(probablyDone);
296                     }
297 
298                     processNextItem();
299                 }
300             } catch (InterruptedException e) {
301                 Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but"
302                         + "it's OK in tests.");
303             }
304         }
305     }
306 }
307