1 /* 2 * Copyright 2018 Google LLC 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google LLC nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 31 package com.google.api.gax.batching; 32 33 import com.google.common.base.Preconditions; 34 import java.util.concurrent.atomic.AtomicLong; 35 36 /** A {@link Semaphore64} that immediately returns with failure if permits are not available. */ 37 class NonBlockingSemaphore implements Semaphore64 { 38 private AtomicLong acquiredPermits; 39 private AtomicLong limit; 40 checkNotNegative(long l)41 private static void checkNotNegative(long l) { 42 Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l); 43 } 44 NonBlockingSemaphore(long permits)45 NonBlockingSemaphore(long permits) { 46 checkNotNegative(permits); 47 this.acquiredPermits = new AtomicLong(0); 48 this.limit = new AtomicLong(permits); 49 } 50 51 @Override release(long permits)52 public void release(long permits) { 53 checkNotNegative(permits); 54 while (true) { 55 long old = acquiredPermits.get(); 56 // TODO: throw exceptions when the permits overflow 57 long newAcquired = Math.max(0, old - permits); 58 if (acquiredPermits.compareAndSet(old, newAcquired)) { 59 return; 60 } 61 } 62 } 63 64 @Override acquire(long permits)65 public boolean acquire(long permits) { 66 checkNotNegative(permits); 67 while (true) { 68 long old = acquiredPermits.get(); 69 if (old + permits > limit.get()) { 70 return false; 71 } 72 if (acquiredPermits.compareAndSet(old, old + permits)) { 73 return true; 74 } 75 } 76 } 77 78 @Override acquirePartial(long permits)79 public boolean acquirePartial(long permits) { 80 checkNotNegative(permits); 81 // To allow individual oversized requests to be sent, clamp the requested permits to the maximum 82 // limit. This will allow individual large requests to be sent. Please note that this behavior 83 // will result in acquiredPermits going over limit. 84 while (true) { 85 long old = acquiredPermits.get(); 86 if (old + permits > limit.get() && old > 0) { 87 return false; 88 } 89 if (acquiredPermits.compareAndSet(old, old + permits)) { 90 return true; 91 } 92 } 93 } 94 95 @Override increasePermitLimit(long permits)96 public void increasePermitLimit(long permits) { 97 checkNotNegative(permits); 98 limit.addAndGet(permits); 99 } 100 101 @Override reducePermitLimit(long reduction)102 public void reducePermitLimit(long reduction) { 103 checkNotNegative(reduction); 104 105 while (true) { 106 long oldLimit = limit.get(); 107 Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow"); 108 if (limit.compareAndSet(oldLimit, oldLimit - reduction)) { 109 return; 110 } 111 } 112 } 113 114 @Override getPermitLimit()115 public long getPermitLimit() { 116 return limit.get(); 117 } 118 } 119