• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.camera.util;
18 
19 import android.os.Handler;
20 import android.util.Pair;
21 
22 import com.android.camera.debug.Log.Tag;
23 
24 import java.security.InvalidParameterException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.concurrent.Semaphore;
30 
31 /**
32  * Implements a thread-safe fixed-size pool map of integers to objects such that
33  * the least element may be swapped out for a new element at any time. Elements
34  * may be temporarily "pinned" for processing in separate threads, during which
35  * they will not be swapped out. <br>
36  * This class enforces the invariant that a new element can always be swapped
37  * in. Thus, requests to pin an element for a particular task may be denied if
38  * there are not enough unpinned elements which can be removed. <br>
39  */
40 public class ConcurrentSharedRingBuffer<E> {
41     private static final Tag TAG = new Tag("CncrrntShrdRingBuf");
42 
43     /**
44      * Callback interface for swapping elements at the head of the buffer.
45      */
46     public static interface SwapTask<E> {
47         /**
48          * Called if the buffer is under-capacity and a new element is being
49          * added.
50          *
51          * @return the new element to add.
52          */
create()53         public E create();
54 
55         /**
56          * Called if the buffer is full and an old element must be swapped out
57          * to make room for the new element.
58          *
59          * @param oldElement the element being removed from the buffer.
60          * @return the new element to add.
61          */
swap(E oldElement)62         public E swap(E oldElement);
63 
64         /**
65          * Called if the buffer already has an element with the specified key.
66          * Note that the element may currently be pinned for processing by other
67          * elements. Therefore, implementations must be thread safe with respect
68          * to any other operations which may be applied to pinned tasks.
69          *
70          * @param existingElement the element to be updated.
71          */
update(E existingElement)72         public void update(E existingElement);
73     }
74 
75     /**
76      * Callback for selecting an element to pin. See
77      * {@link tryPinGreatestSelected}.
78      */
79     public static interface Selector<E> {
80         /**
81          * @param element The element to select or not select.
82          * @return true if the element should be selected, false otherwise.
83          */
select(E element)84         public boolean select(E element);
85     }
86 
87     public static interface PinStateListener {
88         /**
89          * Invoked whenever the ability to pin an element for processing
90          * changes.
91          *
92          * @param pinsAvailable If true, requests to pin elements (e.g. calls to
93          *            pinGreatest()) are less-likely to fail. If false, they are
94          *            more-likely to fail.
95          */
onPinStateChange(boolean pinsAvailable)96         public void onPinStateChange(boolean pinsAvailable);
97     }
98 
99     /**
100      * Wraps E with reference counting.
101      */
102     private static class Pinnable<E> {
103         private E mElement;
104 
105         /** Reference-counting for the number of tasks holding this element. */
106         private int mPins;
107 
Pinnable(E element)108         public Pinnable(E element) {
109             mElement = element;
110             mPins = 0;
111         }
112 
getElement()113         public E getElement() {
114             return mElement;
115         }
116 
isPinned()117         private boolean isPinned() {
118             return mPins > 0;
119         }
120     }
121 
122     /** Allow only one swapping operation at a time. */
123     private final Object mSwapLock = new Object();
124     /**
125      * Lock all transactions involving mElements, mUnpinnedElements,
126      * mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and
127      * mPinStateListener and the state of Pinnable instances. <br>
128      * TODO Replace this with a priority semaphore and allow swapLeast()
129      * operations to run faster at the expense of slower tryPin()/release()
130      * calls.
131      */
132     private final Object mLock = new Object();
133     /** Stores all elements. */
134     private TreeMap<Long, Pinnable<E>> mElements;
135     /** Stores the subset of mElements which is not pinned. */
136     private TreeMap<Long, Pinnable<E>> mUnpinnedElements;
137     /** Used to acquire space in mElements. */
138     private final Semaphore mCapacitySemaphore;
139     /** This must be acquired while an element is pinned. */
140     private final Semaphore mPinSemaphore;
141     private boolean mClosed = false;
142 
143     private Handler mPinStateHandler = null;
144     private PinStateListener mPinStateListener = null;
145 
146     /**
147      * Constructs a new ring buffer with the specified capacity.
148      *
149      * @param capacity the maximum number of elements to store.
150      */
ConcurrentSharedRingBuffer(int capacity)151     public ConcurrentSharedRingBuffer(int capacity) {
152         if (capacity <= 0) {
153             throw new IllegalArgumentException("Capacity must be positive.");
154         }
155 
156         mElements = new TreeMap<Long, Pinnable<E>>();
157         mUnpinnedElements = new TreeMap<Long, Pinnable<E>>();
158         mCapacitySemaphore = new Semaphore(capacity);
159         // Start with -1 permits to pin elements since we must always have at
160         // least one unpinned
161         // element available to swap out as the head of the buffer.
162         mPinSemaphore = new Semaphore(-1);
163     }
164 
165     /**
166      * Sets or replaces the listener.
167      *
168      * @param handler The handler on which to invoke the listener.
169      * @param listener The listener to be called whenever the ability to pin an
170      *            element changes.
171      */
setListener(Handler handler, PinStateListener listener)172     public void setListener(Handler handler, PinStateListener listener) {
173         synchronized (mLock) {
174             mPinStateHandler = handler;
175             mPinStateListener = listener;
176         }
177     }
178 
179     /**
180      * Places a new element in the ring buffer, removing the least (by key)
181      * non-pinned element if necessary. The existing element (or {@code null} if
182      * the buffer is under-capacity) is passed to {@code swapper.swap()} and the
183      * result is saved to the buffer. If an entry with {@code newKey} already
184      * exists in the ring-buffer, then {@code swapper.update()} is called and
185      * may modify the element in-place. See {@link SwapTask}. <br>
186      * Note that this method is the only way to add new elements to the buffer
187      * and will never be blocked on pinned tasks.
188      *
189      * @param newKey the key with which to store the swapped-in element.
190      * @param swapper the callback used to perform the swap.
191      * @return true if the swap was successful and the new element was saved to
192      *         the buffer, false if the swap was not possible and the element
193      *         was not saved to the buffer. Note that if the swap failed,
194      *         {@code swapper.create()} may or may not have been invoked.
195      */
swapLeast(long newKey, SwapTask<E> swapper)196     public boolean swapLeast(long newKey, SwapTask<E> swapper) {
197         synchronized (mSwapLock) {
198             Pinnable<E> existingElement = null;
199 
200             synchronized (mLock) {
201                 if (mClosed) {
202                     return false;
203                 }
204                 existingElement = mElements.get(newKey);
205             }
206 
207             if (existingElement != null) {
208                 swapper.update(existingElement.getElement());
209                 return true;
210             }
211 
212             if (mCapacitySemaphore.tryAcquire()) {
213                 // If we are under capacity, insert the new element and return.
214                 Pinnable<E> p = new Pinnable<E>(swapper.create());
215 
216                 synchronized (mLock) {
217                     if (mClosed) {
218                         return false;
219                     }
220 
221                     // Add the new element and release another permit to pin
222                     // allow pinning another element.
223                     mElements.put(newKey, p);
224                     mUnpinnedElements.put(newKey, p);
225                     mPinSemaphore.release();
226                     if (mPinSemaphore.availablePermits() == 1) {
227                         notifyPinStateChange(true);
228                     }
229                 }
230 
231                 return true;
232             } else {
233                 Pinnable<E> toSwap;
234 
235                 // Note that this method must be synchronized to avoid
236                 // attempting to remove more than one unpinned element at a
237                 // time.
238                 synchronized (mLock) {
239                     if (mClosed) {
240                         return false;
241                     }
242 
243                     Map.Entry<Long, Pinnable<E>> toSwapEntry = mUnpinnedElements.pollFirstEntry();
244 
245                     if (toSwapEntry == null) {
246                         // We should never get here.
247                         throw new RuntimeException("No unpinned element available.");
248                     }
249 
250                     toSwap = toSwapEntry.getValue();
251 
252                     // We must remove the element from both mElements and
253                     // mUnpinnedElements because it must be re-added after the
254                     // swap to be placed in the correct order with newKey.
255                     mElements.remove(toSwapEntry.getKey());
256                 }
257 
258                 try {
259                     toSwap.mElement = swapper.swap(toSwap.mElement);
260                 } finally {
261                     synchronized (mLock) {
262                         if (mClosed) {
263                             return false;
264                         }
265 
266                         mElements.put(newKey, toSwap);
267                         mUnpinnedElements.put(newKey, toSwap);
268                     }
269                 }
270                 return true;
271             }
272         }
273     }
274 
275     /**
276      * Attempts to pin the element with the given key and return it. <br>
277      * Note that, if a non-null pair is returned, the caller <em>must</em> call
278      * {@link #release} with the key.
279      *
280      * @return the key and object of the pinned element, if one could be pinned,
281      *         or null.
282      */
tryPin(long key)283     public Pair<Long, E> tryPin(long key) {
284 
285         boolean acquiredLastPin = false;
286         Pinnable<E> entry = null;
287 
288         synchronized (mLock) {
289             if (mClosed) {
290                 return null;
291             }
292 
293             if (mElements.isEmpty()) {
294                 return null;
295             }
296 
297             entry = mElements.get(key);
298 
299             if (entry == null) {
300                 return null;
301             }
302 
303             if (entry.isPinned()) {
304                 // If the element is already pinned by another task, simply
305                 // increment the pin count.
306                 entry.mPins++;
307             } else {
308                 // We must ensure that there will still be an unpinned element
309                 // after we pin this one.
310                 if (mPinSemaphore.tryAcquire()) {
311                     mUnpinnedElements.remove(key);
312                     entry.mPins++;
313 
314                     acquiredLastPin = mPinSemaphore.availablePermits() <= 0;
315                 } else {
316                     return null;
317                 }
318             }
319         }
320 
321         // If we just grabbed the last permit, we must notify listeners of the
322         // pin
323         // state change.
324         if (acquiredLastPin) {
325             notifyPinStateChange(false);
326         }
327 
328         return Pair.create(key, entry.getElement());
329     }
330 
release(long key)331     public void release(long key) {
332         synchronized (mLock) {
333             // Note that this must proceed even if the buffer has been closed.
334 
335             Pinnable<E> element = mElements.get(key);
336 
337             if (element == null) {
338                 throw new InvalidParameterException("No entry found for the given key.");
339             }
340 
341             if (!element.isPinned()) {
342                 throw new IllegalArgumentException("Calling release() with unpinned element.");
343             }
344 
345             // Unpin the element
346             element.mPins--;
347 
348             if (!element.isPinned()) {
349                 // If there are now 0 tasks pinning this element...
350                 mUnpinnedElements.put(key, element);
351 
352                 // Allow pinning another element.
353                 mPinSemaphore.release();
354 
355                 if (mPinSemaphore.availablePermits() == 1) {
356                     notifyPinStateChange(true);
357                 }
358             }
359         }
360     }
361 
362     /**
363      * Attempts to pin the greatest element and return it. <br>
364      * Note that, if a non-null element is returned, the caller <em>must</em>
365      * call {@link #release} with the element. Furthermore, behavior is
366      * undefined if the element's {@code compareTo} behavior changes between
367      * these calls.
368      *
369      * @return the key and object of the pinned element, if one could be pinned,
370      *         or null.
371      */
tryPinGreatest()372     public Pair<Long, E> tryPinGreatest() {
373         synchronized (mLock) {
374             if (mClosed) {
375                 return null;
376             }
377 
378             if (mElements.isEmpty()) {
379                 return null;
380             }
381 
382             return tryPin(mElements.lastKey());
383         }
384     }
385 
386     /**
387      * Attempts to pin the greatest element for which {@code selector} returns
388      * true. <br>
389      *
390      * @see #pinGreatest
391      */
tryPinGreatestSelected(Selector<E> selector)392     public Pair<Long, E> tryPinGreatestSelected(Selector<E> selector) {
393         // (Quickly) get the list of elements to search through.
394         ArrayList<Long> keys = new ArrayList<Long>();
395         synchronized (mLock) {
396             if (mClosed) {
397                 return null;
398             }
399 
400             if (mElements.isEmpty()) {
401                 return null;
402             }
403 
404             keys.addAll(mElements.keySet());
405         }
406 
407         Collections.sort(keys);
408 
409         // Pin each element, from greatest key to least, until we find the one
410         // we want (the element with the greatest key for which
411         // selector.selected() returns true).
412         for (int i = keys.size() - 1; i >= 0; i--) {
413             Pair<Long, E> pinnedCandidate = tryPin(keys.get(i));
414             if (pinnedCandidate != null) {
415                 boolean selected = false;
416 
417                 try {
418                     selected = selector.select(pinnedCandidate.second);
419                 } finally {
420                     // Don't leak pinnedCandidate if the above select() threw an
421                     // exception.
422                     if (selected) {
423                         return pinnedCandidate;
424                     } else {
425                         release(pinnedCandidate.first);
426                     }
427                 }
428             }
429         }
430 
431         return null;
432     }
433 
434     /**
435      * Removes all elements from the buffer, running {@code task} on each one,
436      * and waiting, if necessary, for all pins to be released.
437      *
438      * @param task
439      * @throws InterruptedException
440      */
close(Task<E> task)441     public void close(Task<E> task) throws InterruptedException {
442         int numPinnedElements;
443 
444         // Ensure that any pending swap tasks complete before closing.
445         synchronized (mSwapLock) {
446             synchronized (mLock) {
447                 mClosed = true;
448                 numPinnedElements = mElements.size() - mUnpinnedElements.size();
449             }
450         }
451 
452         notifyPinStateChange(false);
453 
454         // Wait for all pinned tasks to complete.
455         if (numPinnedElements > 0) {
456             mPinSemaphore.acquire(numPinnedElements);
457         }
458 
459         for (Pinnable<E> element : mElements.values()) {
460             task.run(element.mElement);
461         }
462 
463         mUnpinnedElements.clear();
464 
465         mElements.clear();
466     }
467 
notifyPinStateChange(final boolean pinsAvailable)468     private void notifyPinStateChange(final boolean pinsAvailable) {
469         synchronized (mLock) {
470             // We must synchronize on mPinStateHandler and mPinStateListener.
471             if (mPinStateHandler != null) {
472                 final PinStateListener listener = mPinStateListener;
473                 mPinStateHandler.post(new Runnable() {
474                         @Override
475                     public void run() {
476                         listener.onPinStateChange(pinsAvailable);
477                     }
478                 });
479             }
480         }
481     }
482 }
483