1 /* 2 * Copyright (C) 2015 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.util.concurrent; 16 17 import static com.google.common.collect.Sets.newConcurrentHashSet; 18 import static java.util.Objects.requireNonNull; 19 import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater; 20 import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 21 22 import com.google.common.annotations.GwtCompatible; 23 import com.google.j2objc.annotations.ReflectionSupport; 24 import java.util.Set; 25 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; 26 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 27 import java.util.logging.Level; 28 import javax.annotation.CheckForNull; 29 import org.checkerframework.checker.nullness.qual.Nullable; 30 31 /** 32 * A helper which does some thread-safe operations for aggregate futures, which must be implemented 33 * differently in GWT. Namely: 34 * 35 * <ul> 36 * <li>Lazily initializes a set of seen exceptions 37 * <li>Decrements a counter atomically 38 * </ul> 39 */ 40 @GwtCompatible(emulated = true) 41 @ReflectionSupport(value = ReflectionSupport.Level.FULL) 42 @ElementTypesAreNonnullByDefault 43 abstract class AggregateFutureState<OutputT extends @Nullable Object> 44 extends AbstractFuture.TrustedFuture<OutputT> { 45 // Lazily initialized the first time we see an exception; not released until all the input futures 46 // have completed and we have processed them all. 47 @CheckForNull private volatile Set<Throwable> seenExceptions = null; 48 49 private volatile int remaining; 50 51 private static final AtomicHelper ATOMIC_HELPER; 52 53 private static final LazyLogger log = new LazyLogger(AggregateFutureState.class); 54 55 static { 56 AtomicHelper helper; 57 Throwable thrownReflectionFailure = null; 58 try { 59 helper = 60 new SafeAtomicHelper( 61 newUpdater(AggregateFutureState.class, Set.class, "seenExceptions"), 62 newUpdater(AggregateFutureState.class, "remaining")); 63 } catch (Throwable reflectionFailure) { // sneaky checked exception 64 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 65 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 66 // For these users fallback to a suboptimal implementation, based on synchronized. This will 67 // be a definite performance hit to those users. 68 thrownReflectionFailure = reflectionFailure; 69 helper = new SynchronizedAtomicHelper(); 70 } 71 ATOMIC_HELPER = helper; 72 // Log after all static init is finished; if an installed logger uses any Futures methods, it 73 // shouldn't break in cases where reflection is missing/broken. 74 if (thrownReflectionFailure != null) { 75 log.get().log(Level.SEVERE, "SafeAtomicHelper is broken!", thrownReflectionFailure); 76 } 77 } 78 AggregateFutureState(int remainingFutures)79 AggregateFutureState(int remainingFutures) { 80 this.remaining = remainingFutures; 81 } 82 getOrInitSeenExceptions()83 final Set<Throwable> getOrInitSeenExceptions() { 84 /* 85 * The initialization of seenExceptions has to be more complicated than we'd like. The simple 86 * approach would be for each caller CAS it from null to a Set populated with its exception. But 87 * there's another race: If the first thread fails with an exception and a second thread 88 * immediately fails with the same exception: 89 * 90 * Thread1: calls setException(), which returns true, context switch before it can CAS 91 * seenExceptions to its exception 92 * 93 * Thread2: calls setException(), which returns false, CASes seenExceptions to its exception, 94 * and wrongly believes that its exception is new (leading it to logging it when it shouldn't) 95 * 96 * Our solution is for threads to CAS seenExceptions from null to a Set populated with _the 97 * initial exception_, no matter which thread does the work. This ensures that seenExceptions 98 * always contains not just the current thread's exception but also the initial thread's. 99 */ 100 Set<Throwable> seenExceptionsLocal = seenExceptions; 101 if (seenExceptionsLocal == null) { 102 // TODO(cpovirk): Should we use a simpler (presumably cheaper) data structure? 103 /* 104 * Using weak references here could let us release exceptions earlier, but: 105 * 106 * 1. On Android, querying a WeakReference blocks if the GC is doing an otherwise-concurrent 107 * pass. 108 * 109 * 2. We would probably choose to compare exceptions using == instead of equals() (for 110 * consistency with how weak references are cleared). That's a behavior change -- arguably the 111 * removal of a feature. 112 * 113 * Fortunately, exceptions rarely contain references to expensive resources. 114 */ 115 116 // 117 seenExceptionsLocal = newConcurrentHashSet(); 118 /* 119 * Other handleException() callers may see this as soon as we publish it. We need to populate 120 * it with the initial failure before we do, or else they may think that the initial failure 121 * has never been seen before. 122 */ 123 addInitialException(seenExceptionsLocal); 124 125 ATOMIC_HELPER.compareAndSetSeenExceptions(this, null, seenExceptionsLocal); 126 /* 127 * If another handleException() caller created the set, we need to use that copy in case yet 128 * other callers have added to it. 129 * 130 * This read is guaranteed to get us the right value because we only set this once (here). 131 * 132 * requireNonNull is safe because either our compareAndSet succeeded or it failed because 133 * another thread did it for us. 134 */ 135 seenExceptionsLocal = requireNonNull(seenExceptions); 136 } 137 return seenExceptionsLocal; 138 } 139 140 /** Populates {@code seen} with the exception that was passed to {@code setException}. */ addInitialException(Set<Throwable> seen)141 abstract void addInitialException(Set<Throwable> seen); 142 decrementRemainingAndGet()143 final int decrementRemainingAndGet() { 144 return ATOMIC_HELPER.decrementAndGetRemainingCount(this); 145 } 146 clearSeenExceptions()147 final void clearSeenExceptions() { 148 seenExceptions = null; 149 } 150 151 private abstract static class AtomicHelper { 152 /** Atomic compare-and-set of the {@link AggregateFutureState#seenExceptions} field. */ compareAndSetSeenExceptions( AggregateFutureState<?> state, @CheckForNull Set<Throwable> expect, Set<Throwable> update)153 abstract void compareAndSetSeenExceptions( 154 AggregateFutureState<?> state, @CheckForNull Set<Throwable> expect, Set<Throwable> update); 155 156 /** Atomic decrement-and-get of the {@link AggregateFutureState#remaining} field. */ decrementAndGetRemainingCount(AggregateFutureState<?> state)157 abstract int decrementAndGetRemainingCount(AggregateFutureState<?> state); 158 } 159 160 private static final class SafeAtomicHelper extends AtomicHelper { 161 final AtomicReferenceFieldUpdater<AggregateFutureState<?>, @Nullable Set<Throwable>> 162 seenExceptionsUpdater; 163 164 final AtomicIntegerFieldUpdater<AggregateFutureState<?>> remainingCountUpdater; 165 166 @SuppressWarnings({"rawtypes", "unchecked"}) // Unavoidable with reflection API SafeAtomicHelper( AtomicReferenceFieldUpdater seenExceptionsUpdater, AtomicIntegerFieldUpdater remainingCountUpdater)167 SafeAtomicHelper( 168 AtomicReferenceFieldUpdater seenExceptionsUpdater, 169 AtomicIntegerFieldUpdater remainingCountUpdater) { 170 this.seenExceptionsUpdater = 171 (AtomicReferenceFieldUpdater<AggregateFutureState<?>, @Nullable Set<Throwable>>) 172 seenExceptionsUpdater; 173 this.remainingCountUpdater = 174 (AtomicIntegerFieldUpdater<AggregateFutureState<?>>) remainingCountUpdater; 175 } 176 177 @Override compareAndSetSeenExceptions( AggregateFutureState<?> state, @CheckForNull Set<Throwable> expect, Set<Throwable> update)178 void compareAndSetSeenExceptions( 179 AggregateFutureState<?> state, @CheckForNull Set<Throwable> expect, Set<Throwable> update) { 180 seenExceptionsUpdater.compareAndSet(state, expect, update); 181 } 182 183 @Override decrementAndGetRemainingCount(AggregateFutureState<?> state)184 int decrementAndGetRemainingCount(AggregateFutureState<?> state) { 185 return remainingCountUpdater.decrementAndGet(state); 186 } 187 } 188 189 private static final class SynchronizedAtomicHelper extends AtomicHelper { 190 @Override compareAndSetSeenExceptions( AggregateFutureState<?> state, @CheckForNull Set<Throwable> expect, Set<Throwable> update)191 void compareAndSetSeenExceptions( 192 AggregateFutureState<?> state, @CheckForNull Set<Throwable> expect, Set<Throwable> update) { 193 synchronized (state) { 194 if (state.seenExceptions == expect) { 195 state.seenExceptions = update; 196 } 197 } 198 } 199 200 @Override decrementAndGetRemainingCount(AggregateFutureState<?> state)201 int decrementAndGetRemainingCount(AggregateFutureState<?> state) { 202 synchronized (state) { 203 return --state.remaining; 204 } 205 } 206 } 207 } 208