/*
* Copyright (C) 2014 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.android.camera.async;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
/**
* A {@link BufferQueue} implementation useful for thread-safe producer-consumer
* interactions.
* Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows
* closing the queue from either the producer or consumer side and enables
* precise accounting of objects which are never read by the consumer. Notably,
* this enables cleanly shutting down producer-consumer interactions without
* leaking managed resources which might otherwise be left dangling in the
* queue.
*/
public class ConcurrentBufferQueue implements BufferQueue, BufferQueueController,
SafeCloseable {
/**
* A callback to be invoked with all of the elements of the sequence which
* are added but never retrieved via {@link #getNext}.
*/
public static interface UnusedElementProcessor {
/**
* Implementations should properly close the discarded element, if
* necessary.
*/
public void process(T element);
}
/**
* An entry can either be a {@link T} or a special "poison-pill" marker
* indicating that the sequence has been closed.
*/
private static class Entry {
private final T mValue;
private final boolean mClosing;
private Entry(T value, boolean closing) {
mValue = value;
mClosing = closing;
}
public boolean isClosingMarker() {
return mClosing;
}
public T getValue() {
return mValue;
}
}
/**
* Lock used for mQueue modification and mClosed.
*/
private final Object mLock;
/**
* The queue in which to store elements of the sequence as they arrive.
*/
private final BlockingQueue> mQueue;
/**
* Whether this sequence is closed.
*/
private final AtomicBoolean mClosed;
/**
* The callback to use to process all elements which are discarded by the
* queue.
*/
private final UnusedElementProcessor mUnusedElementProcessor;
public ConcurrentBufferQueue(UnusedElementProcessor unusedElementProcessor) {
mUnusedElementProcessor = unusedElementProcessor;
mLock = new Object();
mQueue = new LinkedBlockingQueue<>();
mClosed = new AtomicBoolean();
}
public ConcurrentBufferQueue() {
// Instantiate with a DiscardedElementProcessor which does nothing.
this(new UnusedElementProcessor() {
@Override
public void process(T element) {
}
});
}
@Override
public void close() {
List> remainingElements = new ArrayList<>();
synchronized (mLock) {
// Mark as closed so that no more threads wait in getNext().
// Any additional calls to close() will return immediately.
boolean alreadyClosed = mClosed.getAndSet(true);
if (alreadyClosed) {
return;
}
mQueue.drainTo(remainingElements);
// Keep feeding any currently-waiting consumer threads "poison pill"
// {@link Entry}s indicating that the sequence has ended so they
// wake up. When no more threads are waiting for another value from
// mQueue, the call to peek() from this thread will see a value.
// Note that this also ensures that there is a poison pill in the
// queue
// to keep waking-up any threads which manage to block in getNext()
// even after marking mClosed.
while (mQueue.peek() == null) {
mQueue.add(makeClosingMarker());
}
}
for (Entry entry : remainingElements) {
if (!entry.isClosingMarker()) {
mUnusedElementProcessor.process(entry.getValue());
}
}
}
@Override
public void update(@Nonnull T element) {
boolean closed = false;
synchronized (mLock) {
closed = mClosed.get();
if (!closed) {
mQueue.add(makeEntry(element));
}
}
if (closed) {
mUnusedElementProcessor.process(element);
}
}
private T doWithNextEntry(Entry nextEntry) throws BufferQueueClosedException {
if (nextEntry.isClosingMarker()) {
// Always keep a poison-pill in the queue to avoid a race condition
// in which a thread reaches the mQueue.take() call after close().
mQueue.add(nextEntry);
throw new BufferQueueClosedException();
} else {
return nextEntry.getValue();
}
}
@Override
public T getNext() throws InterruptedException, BufferQueueClosedException {
Entry nextEntry = mQueue.take();
return doWithNextEntry(nextEntry);
}
@Override
public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
BufferQueueClosedException {
Entry nextEntry = mQueue.poll(timeout, unit);
if (nextEntry == null) {
throw new TimeoutException();
}
return doWithNextEntry(nextEntry);
}
@Override
public T peekNext() {
Entry nextEntry = mQueue.peek();
if (nextEntry == null) {
return null;
} else if (nextEntry.isClosingMarker()) {
return null;
} else {
return nextEntry.getValue();
}
}
@Override
public void discardNext() {
try {
Entry nextEntry = mQueue.remove();
if (nextEntry.isClosingMarker()) {
// Always keep a poison-pill in the queue to avoid a race
// condition in which a thread reaches the mQueue.take() call
// after close().
mQueue.add(nextEntry);
} else {
mUnusedElementProcessor.process(nextEntry.getValue());
}
} catch (NoSuchElementException e) {
// If the queue is already empty, do nothing.
return;
}
}
@Override
public boolean isClosed() {
return mClosed.get();
}
private Entry makeEntry(T value) {
return new Entry(value, false);
}
private Entry makeClosingMarker() {
return new Entry(null, true);
}
}