1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.camera.util; 18 19 import android.os.Handler; 20 import android.util.Pair; 21 22 import com.android.camera.debug.Log.Tag; 23 24 import java.security.InvalidParameterException; 25 import java.util.ArrayList; 26 import java.util.Collections; 27 import java.util.Map; 28 import java.util.TreeMap; 29 import java.util.concurrent.Semaphore; 30 31 /** 32 * Implements a thread-safe fixed-size pool map of integers to objects such that 33 * the least element may be swapped out for a new element at any time. Elements 34 * may be temporarily "pinned" for processing in separate threads, during which 35 * they will not be swapped out. <br> 36 * This class enforces the invariant that a new element can always be swapped 37 * in. Thus, requests to pin an element for a particular task may be denied if 38 * there are not enough unpinned elements which can be removed. <br> 39 */ 40 public class ConcurrentSharedRingBuffer<E> { 41 private static final Tag TAG = new Tag("CncrrntShrdRingBuf"); 42 43 /** 44 * Callback interface for swapping elements at the head of the buffer. 45 */ 46 public static interface SwapTask<E> { 47 /** 48 * Called if the buffer is under-capacity and a new element is being 49 * added. 50 * 51 * @return the new element to add. 52 */ create()53 public E create(); 54 55 /** 56 * Called if the buffer is full and an old element must be swapped out 57 * to make room for the new element. 58 * 59 * @param oldElement the element being removed from the buffer. 60 * @return the new element to add. 61 */ swap(E oldElement)62 public E swap(E oldElement); 63 64 /** 65 * Called if the buffer already has an element with the specified key. 66 * Note that the element may currently be pinned for processing by other 67 * elements. Therefore, implementations must be thread safe with respect 68 * to any other operations which may be applied to pinned tasks. 69 * 70 * @param existingElement the element to be updated. 71 */ update(E existingElement)72 public void update(E existingElement); 73 74 /** 75 * Returns the key of the element that the ring buffer should prefer 76 * when considering a swapping candidate. If the returned key is not an 77 * unpinned element then ring buffer will replace the element with least 78 * key. 79 * 80 * @return a key of an existing unpinned element or a negative value. 81 */ getSwapKey()82 public long getSwapKey(); 83 } 84 85 /** 86 * Callback for selecting an element to pin. See 87 * {@link tryPinGreatestSelected}. 88 */ 89 public static interface Selector<E> { 90 /** 91 * @param element The element to select or not select. 92 * @return true if the element should be selected, false otherwise. 93 */ select(E element)94 public boolean select(E element); 95 } 96 97 public static interface PinStateListener { 98 /** 99 * Invoked whenever the ability to pin an element for processing 100 * changes. 101 * 102 * @param pinsAvailable If true, requests to pin elements (e.g. calls to 103 * pinGreatest()) are less-likely to fail. If false, they are 104 * more-likely to fail. 105 */ onPinStateChange(boolean pinsAvailable)106 public void onPinStateChange(boolean pinsAvailable); 107 } 108 109 /** 110 * Wraps E with reference counting. 111 */ 112 private static class Pinnable<E> { 113 private E mElement; 114 115 /** Reference-counting for the number of tasks holding this element. */ 116 private int mPins; 117 Pinnable(E element)118 public Pinnable(E element) { 119 mElement = element; 120 mPins = 0; 121 } 122 getElement()123 public E getElement() { 124 return mElement; 125 } 126 isPinned()127 private boolean isPinned() { 128 return mPins > 0; 129 } 130 } 131 132 /** 133 * A Semaphore that allows to reduce permits to negative values. 134 */ 135 private static class NegativePermitsSemaphore extends Semaphore { NegativePermitsSemaphore(int permits)136 public NegativePermitsSemaphore(int permits) { 137 super(permits); 138 } 139 140 /** 141 * Reduces the number of permits by <code>permits</code>. 142 * <p/> 143 * This method can only be called when number of available permits is 144 * zero. 145 */ 146 @Override reducePermits(int permits)147 public void reducePermits(int permits) { 148 if (availablePermits() != 0) { 149 throw new IllegalStateException("Called without draining the semaphore."); 150 } 151 super.reducePermits(permits); 152 } 153 } 154 155 /** Allow only one swapping operation at a time. */ 156 private final Object mSwapLock = new Object(); 157 /** 158 * Lock all transactions involving mElements, mUnpinnedElements, 159 * mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and 160 * mPinStateListener and the state of Pinnable instances. <br> 161 * TODO Replace this with a priority semaphore and allow swapLeast() 162 * operations to run faster at the expense of slower tryPin()/release() 163 * calls. 164 */ 165 private final Object mLock = new Object(); 166 /** Stores all elements. */ 167 private TreeMap<Long, Pinnable<E>> mElements; 168 /** Stores the subset of mElements which is not pinned. */ 169 private TreeMap<Long, Pinnable<E>> mUnpinnedElements; 170 /** Used to acquire space in mElements. */ 171 private final Semaphore mCapacitySemaphore; 172 /** This must be acquired while an element is pinned. */ 173 private final NegativePermitsSemaphore mPinSemaphore; 174 private boolean mClosed = false; 175 176 private Handler mPinStateHandler = null; 177 private PinStateListener mPinStateListener = null; 178 179 /** 180 * Constructs a new ring buffer with the specified capacity. 181 * 182 * @param capacity the maximum number of elements to store. 183 */ ConcurrentSharedRingBuffer(int capacity)184 public ConcurrentSharedRingBuffer(int capacity) { 185 if (capacity <= 0) { 186 throw new IllegalArgumentException("Capacity must be positive."); 187 } 188 189 mElements = new TreeMap<Long, Pinnable<E>>(); 190 mUnpinnedElements = new TreeMap<Long, Pinnable<E>>(); 191 mCapacitySemaphore = new Semaphore(capacity); 192 // Start with -1 permits to pin elements since we must always have at 193 // least one unpinned 194 // element available to swap out as the head of the buffer. 195 mPinSemaphore = new NegativePermitsSemaphore(-1); 196 } 197 198 /** 199 * Sets or replaces the listener. 200 * 201 * @param handler The handler on which to invoke the listener. 202 * @param listener The listener to be called whenever the ability to pin an 203 * element changes. 204 */ setListener(Handler handler, PinStateListener listener)205 public void setListener(Handler handler, PinStateListener listener) { 206 synchronized (mLock) { 207 mPinStateHandler = handler; 208 mPinStateListener = listener; 209 } 210 } 211 212 /** 213 * Places a new element in the ring buffer, removing the least (by key) 214 * non-pinned element if necessary. The existing element (or {@code null} if 215 * the buffer is under-capacity) is passed to {@code swapper.swap()} and the 216 * result is saved to the buffer. If an entry with {@code newKey} already 217 * exists in the ring-buffer, then {@code swapper.update()} is called and 218 * may modify the element in-place. See {@link SwapTask}. <br> 219 * Note that this method is the only way to add new elements to the buffer 220 * and will never be blocked on pinned tasks. 221 * 222 * @param newKey the key with which to store the swapped-in element. 223 * @param swapper the callback used to perform the swap. 224 * @return true if the swap was successful and the new element was saved to 225 * the buffer, false if the swap was not possible and the element 226 * was not saved to the buffer. Note that if the swap failed, 227 * {@code swapper.create()} may or may not have been invoked. 228 */ swapLeast(long newKey, SwapTask<E> swapper)229 public boolean swapLeast(long newKey, SwapTask<E> swapper) { 230 synchronized (mSwapLock) { 231 Pinnable<E> existingElement = null; 232 233 synchronized (mLock) { 234 if (mClosed) { 235 return false; 236 } 237 existingElement = mElements.get(newKey); 238 } 239 240 if (existingElement != null) { 241 swapper.update(existingElement.getElement()); 242 return true; 243 } 244 245 if (mCapacitySemaphore.tryAcquire()) { 246 // If we are under capacity, insert the new element and return. 247 Pinnable<E> p = new Pinnable<E>(swapper.create()); 248 249 synchronized (mLock) { 250 if (mClosed) { 251 return false; 252 } 253 254 // Add the new element and release another permit to pin 255 // allow pinning another element. 256 mElements.put(newKey, p); 257 mUnpinnedElements.put(newKey, p); 258 mPinSemaphore.release(); 259 if (mPinSemaphore.availablePermits() == 1) { 260 notifyPinStateChange(true); 261 } 262 } 263 264 return true; 265 } else { 266 Pinnable<E> toSwap; 267 268 // Note that this method must be synchronized to avoid 269 // attempting to remove more than one unpinned element at a 270 // time. 271 synchronized (mLock) { 272 if (mClosed) { 273 return false; 274 } 275 Pair<Long, Pinnable<E>> toSwapEntry = null; 276 long swapKey = swapper.getSwapKey(); 277 // If swapKey is same as the inserted key return early. 278 if (swapKey == newKey) { 279 return false; 280 } 281 282 if (mUnpinnedElements.containsKey(swapKey)) { 283 toSwapEntry = Pair.create(swapKey, mUnpinnedElements.remove(swapKey)); 284 } else { 285 // The returned key from getSwapKey was not found in the 286 // unpinned elements use the least entry from the 287 // unpinned elements. 288 Map.Entry<Long, Pinnable<E>> swapEntry = mUnpinnedElements.pollFirstEntry(); 289 if (swapEntry != null) { 290 toSwapEntry = Pair.create(swapEntry.getKey(), swapEntry.getValue()); 291 } 292 } 293 294 if (toSwapEntry == null) { 295 // We can get here if no unpinned element was found. 296 return false; 297 } 298 299 toSwap = toSwapEntry.second; 300 301 // We must remove the element from both mElements and 302 // mUnpinnedElements because it must be re-added after the 303 // swap to be placed in the correct order with newKey. 304 mElements.remove(toSwapEntry.first); 305 } 306 307 try { 308 toSwap.mElement = swapper.swap(toSwap.mElement); 309 } finally { 310 synchronized (mLock) { 311 if (mClosed) { 312 return false; 313 } 314 315 mElements.put(newKey, toSwap); 316 mUnpinnedElements.put(newKey, toSwap); 317 } 318 } 319 return true; 320 } 321 } 322 } 323 324 /** 325 * Attempts to pin the element with the given key and return it. <br> 326 * Note that, if a non-null pair is returned, the caller <em>must</em> call 327 * {@link #release} with the key. 328 * 329 * @return the key and object of the pinned element, if one could be pinned, 330 * or null. 331 */ tryPin(long key)332 public Pair<Long, E> tryPin(long key) { 333 334 boolean acquiredLastPin = false; 335 Pinnable<E> entry = null; 336 337 synchronized (mLock) { 338 if (mClosed) { 339 return null; 340 } 341 342 if (mElements.isEmpty()) { 343 return null; 344 } 345 346 entry = mElements.get(key); 347 348 if (entry == null) { 349 return null; 350 } 351 352 if (entry.isPinned()) { 353 // If the element is already pinned by another task, simply 354 // increment the pin count. 355 entry.mPins++; 356 } else { 357 // We must ensure that there will still be an unpinned element 358 // after we pin this one. 359 if (mPinSemaphore.tryAcquire()) { 360 mUnpinnedElements.remove(key); 361 entry.mPins++; 362 363 acquiredLastPin = mPinSemaphore.availablePermits() <= 0; 364 } else { 365 return null; 366 } 367 } 368 } 369 370 // If we just grabbed the last permit, we must notify listeners of the 371 // pin 372 // state change. 373 if (acquiredLastPin) { 374 notifyPinStateChange(false); 375 } 376 377 return Pair.create(key, entry.getElement()); 378 } 379 release(long key)380 public void release(long key) { 381 synchronized (mLock) { 382 // Note that this must proceed even if the buffer has been closed. 383 384 Pinnable<E> element = mElements.get(key); 385 386 if (element == null) { 387 throw new InvalidParameterException( 388 "No entry found for the given key: " + key + "."); 389 } 390 391 if (!element.isPinned()) { 392 throw new IllegalArgumentException("Calling release() with unpinned element."); 393 } 394 395 // Unpin the element 396 element.mPins--; 397 398 if (!element.isPinned()) { 399 // If there are now 0 tasks pinning this element... 400 mUnpinnedElements.put(key, element); 401 402 // Allow pinning another element. 403 mPinSemaphore.release(); 404 405 if (mPinSemaphore.availablePermits() == 1) { 406 notifyPinStateChange(true); 407 } 408 } 409 } 410 } 411 412 /** 413 * Attempts to pin the greatest element and return it. <br> 414 * Note that, if a non-null element is returned, the caller <em>must</em> 415 * call {@link #release} with the element. Furthermore, behavior is 416 * undefined if the element's {@code compareTo} behavior changes between 417 * these calls. 418 * 419 * @return the key and object of the pinned element, if one could be pinned, 420 * or null. 421 */ tryPinGreatest()422 public Pair<Long, E> tryPinGreatest() { 423 synchronized (mLock) { 424 if (mClosed) { 425 return null; 426 } 427 428 if (mElements.isEmpty()) { 429 return null; 430 } 431 432 return tryPin(mElements.lastKey()); 433 } 434 } 435 436 /** 437 * Attempts to pin the greatest element for which {@code selector} returns 438 * true. <br> 439 * 440 * @see #pinGreatest 441 */ tryPinGreatestSelected(Selector<E> selector)442 public Pair<Long, E> tryPinGreatestSelected(Selector<E> selector) { 443 // (Quickly) get the list of elements to search through. 444 ArrayList<Long> keys = new ArrayList<Long>(); 445 synchronized (mLock) { 446 if (mClosed) { 447 return null; 448 } 449 450 if (mElements.isEmpty()) { 451 return null; 452 } 453 454 keys.addAll(mElements.keySet()); 455 } 456 457 Collections.sort(keys); 458 459 // Pin each element, from greatest key to least, until we find the one 460 // we want (the element with the greatest key for which 461 // selector.selected() returns true). 462 for (int i = keys.size() - 1; i >= 0; i--) { 463 Pair<Long, E> pinnedCandidate = tryPin(keys.get(i)); 464 if (pinnedCandidate != null) { 465 boolean selected = false; 466 467 try { 468 selected = selector.select(pinnedCandidate.second); 469 } finally { 470 // Don't leak pinnedCandidate if the above select() threw an 471 // exception. 472 if (selected) { 473 return pinnedCandidate; 474 } else { 475 release(pinnedCandidate.first); 476 } 477 } 478 } 479 } 480 481 return null; 482 } 483 484 /** 485 * Removes all elements from the buffer, running {@code task} on each one, 486 * and waiting, if necessary, for all pins to be released. 487 * 488 * @param task 489 * @throws InterruptedException 490 */ close(Task<E> task)491 public void close(Task<E> task) throws InterruptedException { 492 int numPinnedElements; 493 494 // Ensure that any pending swap tasks complete before closing. 495 synchronized (mSwapLock) { 496 synchronized (mLock) { 497 mClosed = true; 498 numPinnedElements = mElements.size() - mUnpinnedElements.size(); 499 } 500 } 501 502 notifyPinStateChange(false); 503 504 // Wait for all pinned tasks to complete. 505 if (numPinnedElements > 0) { 506 mPinSemaphore.acquire(numPinnedElements); 507 } 508 509 for (Pinnable<E> element : mElements.values()) { 510 task.run(element.mElement); 511 // Release the capacity permits. 512 mCapacitySemaphore.release(); 513 } 514 515 mUnpinnedElements.clear(); 516 517 mElements.clear(); 518 } 519 520 /** 521 * Attempts to get a pinned element for the given key. 522 * 523 * @param key the key of the pinned element. 524 * @return (key, value) pair if found otherwise null. 525 */ tryGetPinned(long key)526 public Pair<Long, E> tryGetPinned(long key) { 527 synchronized (mLock) { 528 if (mClosed) { 529 return null; 530 } 531 for (java.util.Map.Entry<Long, Pinnable<E>> element : mElements.entrySet()) { 532 if (element.getKey() == key) { 533 if (element.getValue().isPinned()) { 534 return Pair.create(element.getKey(), element.getValue().getElement()); 535 } else { 536 return null; 537 } 538 } 539 } 540 } 541 return null; 542 } 543 544 /** 545 * Reopens previously closed buffer. 546 * <p/> 547 * Buffer should be closed before calling this method. If called with an 548 * open buffer an {@link IllegalStateException} is thrown. 549 * 550 * @param unpinnedReservedSlotCount a non-negative integer for number of 551 * slots to reserve for unpinned elements. These slots can never 552 * be pinned and will always be available for swapping. 553 * @throws InterruptedException 554 */ reopenBuffer(int unpinnedReservedSlotCount)555 public void reopenBuffer(int unpinnedReservedSlotCount) 556 throws InterruptedException { 557 if (unpinnedReservedSlotCount < 0 558 || unpinnedReservedSlotCount >= mCapacitySemaphore.availablePermits()) { 559 throw new IllegalArgumentException("Invalid unpinned reserved slot count: " + 560 unpinnedReservedSlotCount); 561 } 562 563 // Ensure that any pending swap tasks complete before closing. 564 synchronized (mSwapLock) { 565 synchronized (mLock) { 566 if (!mClosed) { 567 throw new IllegalStateException( 568 "Attempt to reopen the buffer when it is not closed."); 569 } 570 571 mPinSemaphore.drainPermits(); 572 mPinSemaphore.reducePermits(unpinnedReservedSlotCount); 573 mClosed = false; 574 } 575 } 576 } 577 578 /** 579 * Releases a pinned element for the given key. 580 * <p/> 581 * If element is unpinned, it is not released. 582 * 583 * @param key the key of the element, if the element is not present an 584 * {@link IllegalArgumentException} is thrown. 585 */ releaseIfPinned(long key)586 public void releaseIfPinned(long key) { 587 synchronized (mLock) { 588 Pinnable<E> element = mElements.get(key); 589 590 if (element == null) { 591 throw new IllegalArgumentException("Invalid key." + key); 592 } 593 594 if (element.isPinned()) { 595 release(key); 596 } 597 } 598 } 599 600 /** 601 * Releases all pinned elements in the buffer. 602 * <p/> 603 * Note: it only calls {@link #release(long)} only once on a pinned element. 604 */ releaseAll()605 public void releaseAll() { 606 synchronized (mSwapLock) { 607 synchronized (mLock) { 608 if (mClosed || mElements.isEmpty() 609 || mElements.size() == mUnpinnedElements.size()) { 610 return; 611 } 612 for (java.util.Map.Entry<Long, Pinnable<E>> entry : mElements.entrySet()) { 613 if (entry.getValue().isPinned()) { 614 release(entry.getKey()); 615 } 616 } 617 } 618 } 619 } 620 notifyPinStateChange(final boolean pinsAvailable)621 private void notifyPinStateChange(final boolean pinsAvailable) { 622 synchronized (mLock) { 623 // We must synchronize on mPinStateHandler and mPinStateListener. 624 if (mPinStateHandler != null) { 625 final PinStateListener listener = mPinStateListener; 626 mPinStateHandler.post(new Runnable() { 627 @Override 628 public void run() { 629 listener.onPinStateChange(pinsAvailable); 630 } 631 }); 632 } 633 } 634 } 635 } 636