1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.camera.async; 18 19 import java.util.ArrayList; 20 import java.util.List; 21 import java.util.NoSuchElementException; 22 import java.util.concurrent.BlockingQueue; 23 import java.util.concurrent.LinkedBlockingQueue; 24 import java.util.concurrent.TimeUnit; 25 import java.util.concurrent.TimeoutException; 26 import java.util.concurrent.atomic.AtomicBoolean; 27 28 import javax.annotation.Nonnull; 29 30 /** 31 * A {@link BufferQueue} implementation useful for thread-safe producer-consumer 32 * interactions.<br> 33 * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows 34 * closing the queue from either the producer or consumer side and enables 35 * precise accounting of objects which are never read by the consumer. Notably, 36 * this enables cleanly shutting down producer-consumer interactions without 37 * leaking managed resources which might otherwise be left dangling in the 38 * queue. 39 */ 40 public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>, 41 SafeCloseable { 42 /** 43 * A callback to be invoked with all of the elements of the sequence which 44 * are added but never retrieved via {@link #getNext}. 45 */ 46 public static interface UnusedElementProcessor<T> { 47 /** 48 * Implementations should properly close the discarded element, if 49 * necessary. 50 */ process(T element)51 public void process(T element); 52 } 53 54 /** 55 * An entry can either be a {@link T} or a special "poison-pill" marker 56 * indicating that the sequence has been closed. 57 */ 58 private static class Entry<T> { 59 private final T mValue; 60 private final boolean mClosing; 61 Entry(T value, boolean closing)62 private Entry(T value, boolean closing) { 63 mValue = value; 64 mClosing = closing; 65 } 66 isClosingMarker()67 public boolean isClosingMarker() { 68 return mClosing; 69 } 70 getValue()71 public T getValue() { 72 return mValue; 73 } 74 } 75 /** 76 * Lock used for mQueue modification and mClosed. 77 */ 78 private final Object mLock; 79 /** 80 * The queue in which to store elements of the sequence as they arrive. 81 */ 82 private final BlockingQueue<Entry<T>> mQueue; 83 /** 84 * Whether this sequence is closed. 85 */ 86 private final AtomicBoolean mClosed; 87 /** 88 * The callback to use to process all elements which are discarded by the 89 * queue. 90 */ 91 private final UnusedElementProcessor<T> mUnusedElementProcessor; 92 ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor)93 public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) { 94 mUnusedElementProcessor = unusedElementProcessor; 95 mLock = new Object(); 96 mQueue = new LinkedBlockingQueue<>(); 97 mClosed = new AtomicBoolean(); 98 } 99 ConcurrentBufferQueue()100 public ConcurrentBufferQueue() { 101 // Instantiate with a DiscardedElementProcessor which does nothing. 102 this(new UnusedElementProcessor<T>() { 103 @Override 104 public void process(T element) { 105 } 106 }); 107 } 108 109 @Override close()110 public void close() { 111 List<Entry<T>> remainingElements = new ArrayList<>(); 112 synchronized (mLock) { 113 // Mark as closed so that no more threads wait in getNext(). 114 // Any additional calls to close() will return immediately. 115 boolean alreadyClosed = mClosed.getAndSet(true); 116 if (alreadyClosed) { 117 return; 118 } 119 120 mQueue.drainTo(remainingElements); 121 122 // Keep feeding any currently-waiting consumer threads "poison pill" 123 // {@link Entry}s indicating that the sequence has ended so they 124 // wake up. When no more threads are waiting for another value from 125 // mQueue, the call to peek() from this thread will see a value. 126 // Note that this also ensures that there is a poison pill in the 127 // queue 128 // to keep waking-up any threads which manage to block in getNext() 129 // even after marking mClosed. 130 while (mQueue.peek() == null) { 131 mQueue.add(makeClosingMarker()); 132 } 133 } 134 135 for (Entry<T> entry : remainingElements) { 136 if (!entry.isClosingMarker()) { 137 mUnusedElementProcessor.process(entry.getValue()); 138 } 139 } 140 } 141 142 @Override update(@onnull T element)143 public void update(@Nonnull T element) { 144 boolean closed = false; 145 synchronized (mLock) { 146 closed = mClosed.get(); 147 if (!closed) { 148 mQueue.add(makeEntry(element)); 149 } 150 } 151 if (closed) { 152 mUnusedElementProcessor.process(element); 153 } 154 } 155 doWithNextEntry(Entry<T> nextEntry)156 private T doWithNextEntry(Entry<T> nextEntry) throws BufferQueueClosedException { 157 if (nextEntry.isClosingMarker()) { 158 // Always keep a poison-pill in the queue to avoid a race condition 159 // in which a thread reaches the mQueue.take() call after close(). 160 mQueue.add(nextEntry); 161 throw new BufferQueueClosedException(); 162 } else { 163 return nextEntry.getValue(); 164 } 165 } 166 167 @Override getNext()168 public T getNext() throws InterruptedException, BufferQueueClosedException { 169 Entry<T> nextEntry = mQueue.take(); 170 return doWithNextEntry(nextEntry); 171 } 172 173 @Override getNext(long timeout, TimeUnit unit)174 public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, 175 BufferQueueClosedException { 176 Entry<T> nextEntry = mQueue.poll(timeout, unit); 177 if (nextEntry == null) { 178 throw new TimeoutException(); 179 } 180 return doWithNextEntry(nextEntry); 181 } 182 183 @Override peekNext()184 public T peekNext() { 185 Entry<T> nextEntry = mQueue.peek(); 186 if (nextEntry == null) { 187 return null; 188 } else if (nextEntry.isClosingMarker()) { 189 return null; 190 } else { 191 return nextEntry.getValue(); 192 } 193 } 194 195 @Override discardNext()196 public void discardNext() { 197 try { 198 Entry<T> nextEntry = mQueue.remove(); 199 if (nextEntry.isClosingMarker()) { 200 // Always keep a poison-pill in the queue to avoid a race 201 // condition in which a thread reaches the mQueue.take() call 202 // after close(). 203 mQueue.add(nextEntry); 204 } else { 205 mUnusedElementProcessor.process(nextEntry.getValue()); 206 } 207 } catch (NoSuchElementException e) { 208 // If the queue is already empty, do nothing. 209 return; 210 } 211 } 212 213 @Override isClosed()214 public boolean isClosed() { 215 return mClosed.get(); 216 } 217 makeEntry(T value)218 private Entry makeEntry(T value) { 219 return new Entry(value, false); 220 } 221 makeClosingMarker()222 private Entry makeClosingMarker() { 223 return new Entry(null, true); 224 } 225 } 226