1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.camera.async; 18 19 import java.util.concurrent.TimeUnit; 20 import java.util.concurrent.TimeoutException; 21 22 import javax.annotation.Nonnull; 23 24 /** 25 * Like {@link ConcurrentBufferQueue}, but also tracks the number of objects 26 * currently in the queue. 27 */ 28 public class CountableBufferQueue<T> implements BufferQueueController<T>, BufferQueue<T> { 29 private class DecrementingProcessor<T> implements 30 ConcurrentBufferQueue.UnusedElementProcessor<T> { 31 private final ConcurrentBufferQueue.UnusedElementProcessor mProcessor; 32 DecrementingProcessor(ConcurrentBufferQueue.UnusedElementProcessor<T> processor)33 private DecrementingProcessor(ConcurrentBufferQueue.UnusedElementProcessor<T> processor) { 34 mProcessor = processor; 35 } 36 37 @Override process(T element)38 public void process(T element) { 39 mProcessor.process(element); 40 decrementSize(); 41 } 42 } 43 44 private final ConcurrentBufferQueue<T> mBufferQueue; 45 private final Object mCountLock; 46 private final Updatable<Integer> mSizeCallback; 47 private int mCount; 48 49 /** 50 * @param sizeCallback A thread-safe callback to be updated with the size 51 * of the queue. 52 * @param processor The callback for processing elements discarded from the 53 * queue. 54 */ CountableBufferQueue(Updatable<Integer> sizeCallback, ConcurrentBufferQueue .UnusedElementProcessor<T> processor)55 public CountableBufferQueue(Updatable<Integer> sizeCallback, ConcurrentBufferQueue 56 .UnusedElementProcessor<T> processor) { 57 mBufferQueue = new ConcurrentBufferQueue<T>(new DecrementingProcessor<T>(processor)); 58 mCountLock = new Object(); 59 mCount = 0; 60 mSizeCallback = sizeCallback; 61 } 62 CountableBufferQueue(ConcurrentState<Integer> sizeCallback)63 public CountableBufferQueue(ConcurrentState<Integer> sizeCallback) { 64 this(sizeCallback, new ConcurrentBufferQueue.UnusedElementProcessor<T>() { 65 @Override 66 public void process(T element) { 67 // Do nothing by default. 68 } 69 }); 70 } 71 decrementSize()72 private void decrementSize() { 73 int count; 74 synchronized (mCountLock) { 75 mCount--; 76 count = mCount; 77 } 78 mSizeCallback.update(count); 79 } 80 81 @Override getNext()82 public T getNext() throws InterruptedException, BufferQueueClosedException { 83 T result = mBufferQueue.getNext(); 84 decrementSize(); 85 return result; 86 } 87 88 @Override getNext(long timeout, TimeUnit unit)89 public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, 90 BufferQueueClosedException { 91 T result = mBufferQueue.getNext(timeout, unit); 92 decrementSize(); 93 return result; 94 } 95 96 @Override peekNext()97 public T peekNext() { 98 return mBufferQueue.peekNext(); 99 } 100 101 @Override discardNext()102 public void discardNext() { 103 mBufferQueue.discardNext(); 104 } 105 106 @Override update(@onnull T element)107 public void update(@Nonnull T element) { 108 // This is tricky since mBufferQueue.update() may immediately discard 109 // the element if the queue is closed. Sending redundant updates for 0 110 // size is acceptable, but sending updates indicating that the size has 111 // increased and then decreased, even after the queue is closed, would 112 // be bad. Thus, the following will filter these out. 113 int preCount; 114 int postCount; 115 synchronized (mCountLock) { 116 preCount = mCount; 117 mCount++; 118 mBufferQueue.update(element); 119 postCount = mCount; 120 } 121 if (preCount != postCount) { 122 mSizeCallback.update(postCount); 123 } 124 } 125 126 @Override close()127 public void close() { 128 mBufferQueue.close(); 129 } 130 131 @Override isClosed()132 public boolean isClosed() { 133 return mBufferQueue.isClosed(); 134 } 135 } 136