• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2018 Google LLC
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google LLC nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 package com.google.api.gax.batching;
32 
33 import com.google.common.base.Preconditions;
34 import java.util.concurrent.atomic.AtomicLong;
35 
36 /** A {@link Semaphore64} that immediately returns with failure if permits are not available. */
37 class NonBlockingSemaphore implements Semaphore64 {
38   private AtomicLong acquiredPermits;
39   private AtomicLong limit;
40 
checkNotNegative(long l)41   private static void checkNotNegative(long l) {
42     Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l);
43   }
44 
NonBlockingSemaphore(long permits)45   NonBlockingSemaphore(long permits) {
46     checkNotNegative(permits);
47     this.acquiredPermits = new AtomicLong(0);
48     this.limit = new AtomicLong(permits);
49   }
50 
51   @Override
release(long permits)52   public void release(long permits) {
53     checkNotNegative(permits);
54     while (true) {
55       long old = acquiredPermits.get();
56       // TODO: throw exceptions when the permits overflow
57       long newAcquired = Math.max(0, old - permits);
58       if (acquiredPermits.compareAndSet(old, newAcquired)) {
59         return;
60       }
61     }
62   }
63 
64   @Override
acquire(long permits)65   public boolean acquire(long permits) {
66     checkNotNegative(permits);
67     while (true) {
68       long old = acquiredPermits.get();
69       if (old + permits > limit.get()) {
70         return false;
71       }
72       if (acquiredPermits.compareAndSet(old, old + permits)) {
73         return true;
74       }
75     }
76   }
77 
78   @Override
acquirePartial(long permits)79   public boolean acquirePartial(long permits) {
80     checkNotNegative(permits);
81     // To allow individual oversized requests to be sent, clamp the requested permits to the maximum
82     // limit. This will allow individual large requests to be sent. Please note that this behavior
83     // will result in acquiredPermits going over limit.
84     while (true) {
85       long old = acquiredPermits.get();
86       if (old + permits > limit.get() && old > 0) {
87         return false;
88       }
89       if (acquiredPermits.compareAndSet(old, old + permits)) {
90         return true;
91       }
92     }
93   }
94 
95   @Override
increasePermitLimit(long permits)96   public void increasePermitLimit(long permits) {
97     checkNotNegative(permits);
98     limit.addAndGet(permits);
99   }
100 
101   @Override
reducePermitLimit(long reduction)102   public void reducePermitLimit(long reduction) {
103     checkNotNegative(reduction);
104 
105     while (true) {
106       long oldLimit = limit.get();
107       Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow");
108       if (limit.compareAndSet(oldLimit, oldLimit - reduction)) {
109         return;
110       }
111     }
112   }
113 
114   @Override
getPermitLimit()115   public long getPermitLimit() {
116     return limit.get();
117   }
118 }
119