• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.camera.async;
18 
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.NoSuchElementException;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 
28 import javax.annotation.Nonnull;
29 
30 /**
31  * A {@link BufferQueue} implementation useful for thread-safe producer-consumer
32  * interactions.<br>
33  * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows
34  * closing the queue from either the producer or consumer side and enables
35  * precise accounting of objects which are never read by the consumer. Notably,
36  * this enables cleanly shutting down producer-consumer interactions without
37  * leaking managed resources which might otherwise be left dangling in the
38  * queue.
39  */
40 public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>,
41         SafeCloseable {
42     /**
43      * A callback to be invoked with all of the elements of the sequence which
44      * are added but never retrieved via {@link #getNext}.
45      */
46     public static interface UnusedElementProcessor<T> {
47         /**
48          * Implementations should properly close the discarded element, if
49          * necessary.
50          */
process(T element)51         public void process(T element);
52     }
53 
54     /**
55      * An entry can either be a {@link T} or a special "poison-pill" marker
56      * indicating that the sequence has been closed.
57      */
58     private static class Entry<T> {
59         private final T mValue;
60         private final boolean mClosing;
61 
Entry(T value, boolean closing)62         private Entry(T value, boolean closing) {
63             mValue = value;
64             mClosing = closing;
65         }
66 
isClosingMarker()67         public boolean isClosingMarker() {
68             return mClosing;
69         }
70 
getValue()71         public T getValue() {
72             return mValue;
73         }
74     }
75     /**
76      * Lock used for mQueue modification and mClosed.
77      */
78     private final Object mLock;
79     /**
80      * The queue in which to store elements of the sequence as they arrive.
81      */
82     private final BlockingQueue<Entry<T>> mQueue;
83     /**
84      * Whether this sequence is closed.
85      */
86     private final AtomicBoolean mClosed;
87     /**
88      * The callback to use to process all elements which are discarded by the
89      * queue.
90      */
91     private final UnusedElementProcessor<T> mUnusedElementProcessor;
92 
ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor)93     public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) {
94         mUnusedElementProcessor = unusedElementProcessor;
95         mLock = new Object();
96         mQueue = new LinkedBlockingQueue<>();
97         mClosed = new AtomicBoolean();
98     }
99 
ConcurrentBufferQueue()100     public ConcurrentBufferQueue() {
101         // Instantiate with a DiscardedElementProcessor which does nothing.
102         this(new UnusedElementProcessor<T>() {
103             @Override
104             public void process(T element) {
105             }
106         });
107     }
108 
109     @Override
close()110     public void close() {
111         List<Entry<T>> remainingElements = new ArrayList<>();
112         synchronized (mLock) {
113             // Mark as closed so that no more threads wait in getNext().
114             // Any additional calls to close() will return immediately.
115             boolean alreadyClosed = mClosed.getAndSet(true);
116             if (alreadyClosed) {
117                 return;
118             }
119 
120             mQueue.drainTo(remainingElements);
121 
122             // Keep feeding any currently-waiting consumer threads "poison pill"
123             // {@link Entry}s indicating that the sequence has ended so they
124             // wake up. When no more threads are waiting for another value from
125             // mQueue, the call to peek() from this thread will see a value.
126             // Note that this also ensures that there is a poison pill in the
127             // queue
128             // to keep waking-up any threads which manage to block in getNext()
129             // even after marking mClosed.
130             while (mQueue.peek() == null) {
131                 mQueue.add(makeClosingMarker());
132             }
133         }
134 
135         for (Entry<T> entry : remainingElements) {
136             if (!entry.isClosingMarker()) {
137                 mUnusedElementProcessor.process(entry.getValue());
138             }
139         }
140     }
141 
142     @Override
update(@onnull T element)143     public void update(@Nonnull T element) {
144         boolean closed = false;
145         synchronized (mLock) {
146             closed = mClosed.get();
147             if (!closed) {
148                 mQueue.add(makeEntry(element));
149             }
150         }
151         if (closed) {
152             mUnusedElementProcessor.process(element);
153         }
154     }
155 
doWithNextEntry(Entry<T> nextEntry)156     private T doWithNextEntry(Entry<T> nextEntry) throws BufferQueueClosedException {
157         if (nextEntry.isClosingMarker()) {
158             // Always keep a poison-pill in the queue to avoid a race condition
159             // in which a thread reaches the mQueue.take() call after close().
160             mQueue.add(nextEntry);
161             throw new BufferQueueClosedException();
162         } else {
163             return nextEntry.getValue();
164         }
165     }
166 
167     @Override
getNext()168     public T getNext() throws InterruptedException, BufferQueueClosedException {
169         Entry<T> nextEntry = mQueue.take();
170         return doWithNextEntry(nextEntry);
171     }
172 
173     @Override
getNext(long timeout, TimeUnit unit)174     public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
175             BufferQueueClosedException {
176         Entry<T> nextEntry = mQueue.poll(timeout, unit);
177         if (nextEntry == null) {
178             throw new TimeoutException();
179         }
180         return doWithNextEntry(nextEntry);
181     }
182 
183     @Override
peekNext()184     public T peekNext() {
185         Entry<T> nextEntry = mQueue.peek();
186         if (nextEntry == null) {
187             return null;
188         } else if (nextEntry.isClosingMarker()) {
189             return null;
190         } else {
191             return nextEntry.getValue();
192         }
193     }
194 
195     @Override
discardNext()196     public void discardNext() {
197         try {
198             Entry<T> nextEntry = mQueue.remove();
199             if (nextEntry.isClosingMarker()) {
200                 // Always keep a poison-pill in the queue to avoid a race
201                 // condition in which a thread reaches the mQueue.take() call
202                 // after close().
203                 mQueue.add(nextEntry);
204             } else {
205                 mUnusedElementProcessor.process(nextEntry.getValue());
206             }
207         } catch (NoSuchElementException e) {
208             // If the queue is already empty, do nothing.
209             return;
210         }
211     }
212 
213     @Override
isClosed()214     public boolean isClosed() {
215         return mClosed.get();
216     }
217 
makeEntry(T value)218     private Entry makeEntry(T value) {
219         return new Entry(value, false);
220     }
221 
makeClosingMarker()222     private Entry makeClosingMarker() {
223         return new Entry(null, true);
224     }
225 }
226