/*
* Copyright (C) 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.camera.util;
import android.os.Handler;
import android.util.Pair;
import com.android.camera.debug.Log.Tag;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Semaphore;
/**
* Implements a thread-safe fixed-size pool map of integers to objects such that
* the least element may be swapped out for a new element at any time. Elements
* may be temporarily "pinned" for processing in separate threads, during which
* they will not be swapped out.
* This class enforces the invariant that a new element can always be swapped
* in. Thus, requests to pin an element for a particular task may be denied if
* there are not enough unpinned elements which can be removed.
*/
public class ConcurrentSharedRingBuffer {
private static final Tag TAG = new Tag("CncrrntShrdRingBuf");
/**
* Callback interface for swapping elements at the head of the buffer.
*/
public static interface SwapTask {
/**
* Called if the buffer is under-capacity and a new element is being
* added.
*
* @return the new element to add.
*/
public E create();
/**
* Called if the buffer is full and an old element must be swapped out
* to make room for the new element.
*
* @param oldElement the element being removed from the buffer.
* @return the new element to add.
*/
public E swap(E oldElement);
/**
* Called if the buffer already has an element with the specified key.
* Note that the element may currently be pinned for processing by other
* elements. Therefore, implementations must be thread safe with respect
* to any other operations which may be applied to pinned tasks.
*
* @param existingElement the element to be updated.
*/
public void update(E existingElement);
/**
* Returns the key of the element that the ring buffer should prefer
* when considering a swapping candidate. If the returned key is not an
* unpinned element then ring buffer will replace the element with least
* key.
*
* @return a key of an existing unpinned element or a negative value.
*/
public long getSwapKey();
}
/**
* Callback for selecting an element to pin. See
* {@link tryPinGreatestSelected}.
*/
public static interface Selector {
/**
* @param element The element to select or not select.
* @return true if the element should be selected, false otherwise.
*/
public boolean select(E element);
}
public static interface PinStateListener {
/**
* Invoked whenever the ability to pin an element for processing
* changes.
*
* @param pinsAvailable If true, requests to pin elements (e.g. calls to
* pinGreatest()) are less-likely to fail. If false, they are
* more-likely to fail.
*/
public void onPinStateChange(boolean pinsAvailable);
}
/**
* Wraps E with reference counting.
*/
private static class Pinnable {
private E mElement;
/** Reference-counting for the number of tasks holding this element. */
private int mPins;
public Pinnable(E element) {
mElement = element;
mPins = 0;
}
public E getElement() {
return mElement;
}
private boolean isPinned() {
return mPins > 0;
}
}
/**
* A Semaphore that allows to reduce permits to negative values.
*/
private static class NegativePermitsSemaphore extends Semaphore {
public NegativePermitsSemaphore(int permits) {
super(permits);
}
/**
* Reduces the number of permits by permits
.
*
* This method can only be called when number of available permits is
* zero.
*/
@Override
public void reducePermits(int permits) {
if (availablePermits() != 0) {
throw new IllegalStateException("Called without draining the semaphore.");
}
super.reducePermits(permits);
}
}
/** Allow only one swapping operation at a time. */
private final Object mSwapLock = new Object();
/**
* Lock all transactions involving mElements, mUnpinnedElements,
* mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and
* mPinStateListener and the state of Pinnable instances.
* TODO Replace this with a priority semaphore and allow swapLeast()
* operations to run faster at the expense of slower tryPin()/release()
* calls.
*/
private final Object mLock = new Object();
/** Stores all elements. */
private TreeMap> mElements;
/** Stores the subset of mElements which is not pinned. */
private TreeMap> mUnpinnedElements;
/** Used to acquire space in mElements. */
private final Semaphore mCapacitySemaphore;
/** This must be acquired while an element is pinned. */
private final NegativePermitsSemaphore mPinSemaphore;
private boolean mClosed = false;
private Handler mPinStateHandler = null;
private PinStateListener mPinStateListener = null;
/**
* Constructs a new ring buffer with the specified capacity.
*
* @param capacity the maximum number of elements to store.
*/
public ConcurrentSharedRingBuffer(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("Capacity must be positive.");
}
mElements = new TreeMap>();
mUnpinnedElements = new TreeMap>();
mCapacitySemaphore = new Semaphore(capacity);
// Start with -1 permits to pin elements since we must always have at
// least one unpinned
// element available to swap out as the head of the buffer.
mPinSemaphore = new NegativePermitsSemaphore(-1);
}
/**
* Sets or replaces the listener.
*
* @param handler The handler on which to invoke the listener.
* @param listener The listener to be called whenever the ability to pin an
* element changes.
*/
public void setListener(Handler handler, PinStateListener listener) {
synchronized (mLock) {
mPinStateHandler = handler;
mPinStateListener = listener;
}
}
/**
* Places a new element in the ring buffer, removing the least (by key)
* non-pinned element if necessary. The existing element (or {@code null} if
* the buffer is under-capacity) is passed to {@code swapper.swap()} and the
* result is saved to the buffer. If an entry with {@code newKey} already
* exists in the ring-buffer, then {@code swapper.update()} is called and
* may modify the element in-place. See {@link SwapTask}.
* Note that this method is the only way to add new elements to the buffer
* and will never be blocked on pinned tasks.
*
* @param newKey the key with which to store the swapped-in element.
* @param swapper the callback used to perform the swap.
* @return true if the swap was successful and the new element was saved to
* the buffer, false if the swap was not possible and the element
* was not saved to the buffer. Note that if the swap failed,
* {@code swapper.create()} may or may not have been invoked.
*/
public boolean swapLeast(long newKey, SwapTask swapper) {
synchronized (mSwapLock) {
Pinnable existingElement = null;
synchronized (mLock) {
if (mClosed) {
return false;
}
existingElement = mElements.get(newKey);
}
if (existingElement != null) {
swapper.update(existingElement.getElement());
return true;
}
if (mCapacitySemaphore.tryAcquire()) {
// If we are under capacity, insert the new element and return.
Pinnable p = new Pinnable(swapper.create());
synchronized (mLock) {
if (mClosed) {
return false;
}
// Add the new element and release another permit to pin
// allow pinning another element.
mElements.put(newKey, p);
mUnpinnedElements.put(newKey, p);
mPinSemaphore.release();
if (mPinSemaphore.availablePermits() == 1) {
notifyPinStateChange(true);
}
}
return true;
} else {
Pinnable toSwap;
// Note that this method must be synchronized to avoid
// attempting to remove more than one unpinned element at a
// time.
synchronized (mLock) {
if (mClosed) {
return false;
}
Pair> toSwapEntry = null;
long swapKey = swapper.getSwapKey();
// If swapKey is same as the inserted key return early.
if (swapKey == newKey) {
return false;
}
if (mUnpinnedElements.containsKey(swapKey)) {
toSwapEntry = Pair.create(swapKey, mUnpinnedElements.remove(swapKey));
} else {
// The returned key from getSwapKey was not found in the
// unpinned elements use the least entry from the
// unpinned elements.
Map.Entry> swapEntry = mUnpinnedElements.pollFirstEntry();
if (swapEntry != null) {
toSwapEntry = Pair.create(swapEntry.getKey(), swapEntry.getValue());
}
}
if (toSwapEntry == null) {
// We can get here if no unpinned element was found.
return false;
}
toSwap = toSwapEntry.second;
// We must remove the element from both mElements and
// mUnpinnedElements because it must be re-added after the
// swap to be placed in the correct order with newKey.
mElements.remove(toSwapEntry.first);
}
try {
toSwap.mElement = swapper.swap(toSwap.mElement);
} finally {
synchronized (mLock) {
if (mClosed) {
return false;
}
mElements.put(newKey, toSwap);
mUnpinnedElements.put(newKey, toSwap);
}
}
return true;
}
}
}
/**
* Attempts to pin the element with the given key and return it.
* Note that, if a non-null pair is returned, the caller must call
* {@link #release} with the key.
*
* @return the key and object of the pinned element, if one could be pinned,
* or null.
*/
public Pair tryPin(long key) {
boolean acquiredLastPin = false;
Pinnable entry = null;
synchronized (mLock) {
if (mClosed) {
return null;
}
if (mElements.isEmpty()) {
return null;
}
entry = mElements.get(key);
if (entry == null) {
return null;
}
if (entry.isPinned()) {
// If the element is already pinned by another task, simply
// increment the pin count.
entry.mPins++;
} else {
// We must ensure that there will still be an unpinned element
// after we pin this one.
if (mPinSemaphore.tryAcquire()) {
mUnpinnedElements.remove(key);
entry.mPins++;
acquiredLastPin = mPinSemaphore.availablePermits() <= 0;
} else {
return null;
}
}
}
// If we just grabbed the last permit, we must notify listeners of the
// pin
// state change.
if (acquiredLastPin) {
notifyPinStateChange(false);
}
return Pair.create(key, entry.getElement());
}
public void release(long key) {
synchronized (mLock) {
// Note that this must proceed even if the buffer has been closed.
Pinnable element = mElements.get(key);
if (element == null) {
throw new InvalidParameterException(
"No entry found for the given key: " + key + ".");
}
if (!element.isPinned()) {
throw new IllegalArgumentException("Calling release() with unpinned element.");
}
// Unpin the element
element.mPins--;
if (!element.isPinned()) {
// If there are now 0 tasks pinning this element...
mUnpinnedElements.put(key, element);
// Allow pinning another element.
mPinSemaphore.release();
if (mPinSemaphore.availablePermits() == 1) {
notifyPinStateChange(true);
}
}
}
}
/**
* Attempts to pin the greatest element and return it.
* Note that, if a non-null element is returned, the caller must
* call {@link #release} with the element. Furthermore, behavior is
* undefined if the element's {@code compareTo} behavior changes between
* these calls.
*
* @return the key and object of the pinned element, if one could be pinned,
* or null.
*/
public Pair tryPinGreatest() {
synchronized (mLock) {
if (mClosed) {
return null;
}
if (mElements.isEmpty()) {
return null;
}
return tryPin(mElements.lastKey());
}
}
/**
* Attempts to pin the greatest element for which {@code selector} returns
* true.
*
* @see #pinGreatest
*/
public Pair tryPinGreatestSelected(Selector selector) {
// (Quickly) get the list of elements to search through.
ArrayList keys = new ArrayList();
synchronized (mLock) {
if (mClosed) {
return null;
}
if (mElements.isEmpty()) {
return null;
}
keys.addAll(mElements.keySet());
}
Collections.sort(keys);
// Pin each element, from greatest key to least, until we find the one
// we want (the element with the greatest key for which
// selector.selected() returns true).
for (int i = keys.size() - 1; i >= 0; i--) {
Pair pinnedCandidate = tryPin(keys.get(i));
if (pinnedCandidate != null) {
boolean selected = false;
try {
selected = selector.select(pinnedCandidate.second);
} finally {
// Don't leak pinnedCandidate if the above select() threw an
// exception.
if (selected) {
return pinnedCandidate;
} else {
release(pinnedCandidate.first);
}
}
}
}
return null;
}
/**
* Removes all elements from the buffer, running {@code task} on each one,
* and waiting, if necessary, for all pins to be released.
*
* @param task
* @throws InterruptedException
*/
public void close(Task task) throws InterruptedException {
int numPinnedElements;
// Ensure that any pending swap tasks complete before closing.
synchronized (mSwapLock) {
synchronized (mLock) {
mClosed = true;
numPinnedElements = mElements.size() - mUnpinnedElements.size();
}
}
notifyPinStateChange(false);
// Wait for all pinned tasks to complete.
if (numPinnedElements > 0) {
mPinSemaphore.acquire(numPinnedElements);
}
for (Pinnable element : mElements.values()) {
task.run(element.mElement);
// Release the capacity permits.
mCapacitySemaphore.release();
}
mUnpinnedElements.clear();
mElements.clear();
}
/**
* Attempts to get a pinned element for the given key.
*
* @param key the key of the pinned element.
* @return (key, value) pair if found otherwise null.
*/
public Pair tryGetPinned(long key) {
synchronized (mLock) {
if (mClosed) {
return null;
}
for (java.util.Map.Entry> element : mElements.entrySet()) {
if (element.getKey() == key) {
if (element.getValue().isPinned()) {
return Pair.create(element.getKey(), element.getValue().getElement());
} else {
return null;
}
}
}
}
return null;
}
/**
* Reopens previously closed buffer.
*
* Buffer should be closed before calling this method. If called with an
* open buffer an {@link IllegalStateException} is thrown.
*
* @param unpinnedReservedSlotCount a non-negative integer for number of
* slots to reserve for unpinned elements. These slots can never
* be pinned and will always be available for swapping.
* @throws InterruptedException
*/
public void reopenBuffer(int unpinnedReservedSlotCount)
throws InterruptedException {
if (unpinnedReservedSlotCount < 0
|| unpinnedReservedSlotCount >= mCapacitySemaphore.availablePermits()) {
throw new IllegalArgumentException("Invalid unpinned reserved slot count: " +
unpinnedReservedSlotCount);
}
// Ensure that any pending swap tasks complete before closing.
synchronized (mSwapLock) {
synchronized (mLock) {
if (!mClosed) {
throw new IllegalStateException(
"Attempt to reopen the buffer when it is not closed.");
}
mPinSemaphore.drainPermits();
mPinSemaphore.reducePermits(unpinnedReservedSlotCount);
mClosed = false;
}
}
}
/**
* Releases a pinned element for the given key.
*
* If element is unpinned, it is not released.
*
* @param key the key of the element, if the element is not present an
* {@link IllegalArgumentException} is thrown.
*/
public void releaseIfPinned(long key) {
synchronized (mLock) {
Pinnable element = mElements.get(key);
if (element == null) {
throw new IllegalArgumentException("Invalid key." + key);
}
if (element.isPinned()) {
release(key);
}
}
}
/**
* Releases all pinned elements in the buffer.
*
* Note: it only calls {@link #release(long)} only once on a pinned element.
*/
public void releaseAll() {
synchronized (mSwapLock) {
synchronized (mLock) {
if (mClosed || mElements.isEmpty()
|| mElements.size() == mUnpinnedElements.size()) {
return;
}
for (java.util.Map.Entry> entry : mElements.entrySet()) {
if (entry.getValue().isPinned()) {
release(entry.getKey());
}
}
}
}
}
private void notifyPinStateChange(final boolean pinsAvailable) {
synchronized (mLock) {
// We must synchronize on mPinStateHandler and mPinStateListener.
if (mPinStateHandler != null) {
final PinStateListener listener = mPinStateListener;
mPinStateHandler.post(new Runnable() {
@Override
public void run() {
listener.onPinStateChange(pinsAvailable);
}
});
}
}
}
}