1 /* 2 * Copyright (C) 2010 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15 package com.google.common.collect; 16 17 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkState; 19 20 import com.google.common.base.Equivalence; 21 import com.google.common.base.Function; 22 import com.google.common.base.Throwables; 23 import com.google.common.collect.MapMaker.RemovalCause; 24 import com.google.common.collect.MapMaker.RemovalListener; 25 26 import java.io.IOException; 27 import java.io.ObjectInputStream; 28 import java.io.ObjectOutputStream; 29 import java.io.Serializable; 30 import java.lang.ref.ReferenceQueue; 31 import java.util.concurrent.ConcurrentMap; 32 import java.util.concurrent.ExecutionException; 33 import java.util.concurrent.atomic.AtomicReferenceArray; 34 35 import javax.annotation.Nullable; 36 import javax.annotation.concurrent.GuardedBy; 37 38 /** 39 * Adds computing functionality to {@link MapMakerInternalMap}. 40 * 41 * @author Bob Lee 42 * @author Charles Fry 43 */ 44 class ComputingConcurrentHashMap<K, V> extends MapMakerInternalMap<K, V> { 45 final Function<? super K, ? extends V> computingFunction; 46 47 /** 48 * Creates a new, empty map with the specified strategy, initial capacity, load factor and 49 * concurrency level. 50 */ ComputingConcurrentHashMap(MapMaker builder, Function<? super K, ? extends V> computingFunction)51 ComputingConcurrentHashMap(MapMaker builder, 52 Function<? super K, ? extends V> computingFunction) { 53 super(builder); 54 this.computingFunction = checkNotNull(computingFunction); 55 } 56 57 @Override createSegment(int initialCapacity, int maxSegmentSize)58 Segment<K, V> createSegment(int initialCapacity, int maxSegmentSize) { 59 return new ComputingSegment<K, V>(this, initialCapacity, maxSegmentSize); 60 } 61 62 @Override segmentFor(int hash)63 ComputingSegment<K, V> segmentFor(int hash) { 64 return (ComputingSegment<K, V>) super.segmentFor(hash); 65 } 66 getOrCompute(K key)67 V getOrCompute(K key) throws ExecutionException { 68 int hash = hash(checkNotNull(key)); 69 return segmentFor(hash).getOrCompute(key, hash, computingFunction); 70 } 71 72 @SuppressWarnings("serial") // This class is never serialized. 73 static final class ComputingSegment<K, V> extends Segment<K, V> { ComputingSegment(MapMakerInternalMap<K, V> map, int initialCapacity, int maxSegmentSize)74 ComputingSegment(MapMakerInternalMap<K, V> map, int initialCapacity, int maxSegmentSize) { 75 super(map, initialCapacity, maxSegmentSize); 76 } 77 getOrCompute(K key, int hash, Function<? super K, ? extends V> computingFunction)78 V getOrCompute(K key, int hash, Function<? super K, ? extends V> computingFunction) 79 throws ExecutionException { 80 try { 81 outer: while (true) { 82 // don't call getLiveEntry, which would ignore computing values 83 ReferenceEntry<K, V> e = getEntry(key, hash); 84 if (e != null) { 85 V value = getLiveValue(e); 86 if (value != null) { 87 recordRead(e); 88 return value; 89 } 90 } 91 92 // at this point e is either null, computing, or expired; 93 // avoid locking if it's already computing 94 if (e == null || !e.getValueReference().isComputingReference()) { 95 boolean createNewEntry = true; 96 ComputingValueReference<K, V> computingValueReference = null; 97 lock(); 98 try { 99 preWriteCleanup(); 100 101 int newCount = this.count - 1; 102 AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; 103 int index = hash & (table.length() - 1); 104 ReferenceEntry<K, V> first = table.get(index); 105 106 for (e = first; e != null; e = e.getNext()) { 107 K entryKey = e.getKey(); 108 if (e.getHash() == hash && entryKey != null 109 && map.keyEquivalence.equivalent(key, entryKey)) { 110 ValueReference<K, V> valueReference = e.getValueReference(); 111 if (valueReference.isComputingReference()) { 112 createNewEntry = false; 113 } else { 114 V value = e.getValueReference().get(); 115 if (value == null) { 116 enqueueNotification(entryKey, hash, value, RemovalCause.COLLECTED); 117 } else if (map.expires() && map.isExpired(e)) { 118 // This is a duplicate check, as preWriteCleanup already purged expired 119 // entries, but let's accomodate an incorrect expiration queue. 120 enqueueNotification(entryKey, hash, value, RemovalCause.EXPIRED); 121 } else { 122 recordLockedRead(e); 123 return value; 124 } 125 126 // immediately reuse invalid entries 127 evictionQueue.remove(e); 128 expirationQueue.remove(e); 129 this.count = newCount; // write-volatile 130 } 131 break; 132 } 133 } 134 135 if (createNewEntry) { 136 computingValueReference = new ComputingValueReference<K, V>(computingFunction); 137 138 if (e == null) { 139 e = newEntry(key, hash, first); 140 e.setValueReference(computingValueReference); 141 table.set(index, e); 142 } else { 143 e.setValueReference(computingValueReference); 144 } 145 } 146 } finally { 147 unlock(); 148 postWriteCleanup(); 149 } 150 151 if (createNewEntry) { 152 // This thread solely created the entry. 153 return compute(key, hash, e, computingValueReference); 154 } 155 } 156 157 // The entry already exists. Wait for the computation. 158 checkState(!Thread.holdsLock(e), "Recursive computation"); 159 // don't consider expiration as we're concurrent with computation 160 V value = e.getValueReference().waitForValue(); 161 if (value != null) { 162 recordRead(e); 163 return value; 164 } 165 // else computing thread will clearValue 166 continue outer; 167 } 168 } finally { 169 postReadCleanup(); 170 } 171 } 172 compute(K key, int hash, ReferenceEntry<K, V> e, ComputingValueReference<K, V> computingValueReference)173 V compute(K key, int hash, ReferenceEntry<K, V> e, 174 ComputingValueReference<K, V> computingValueReference) 175 throws ExecutionException { 176 V value = null; 177 long start = System.nanoTime(); 178 long end = 0; 179 try { 180 // Synchronizes on the entry to allow failing fast when a recursive computation is 181 // detected. This is not fool-proof since the entry may be copied when the segment 182 // is written to. 183 synchronized (e) { 184 value = computingValueReference.compute(key, hash); 185 end = System.nanoTime(); 186 } 187 if (value != null) { 188 // putIfAbsent 189 V oldValue = put(key, hash, value, true); 190 if (oldValue != null) { 191 // the computed value was already clobbered 192 enqueueNotification(key, hash, value, RemovalCause.REPLACED); 193 } 194 } 195 return value; 196 } finally { 197 if (end == 0) { 198 end = System.nanoTime(); 199 } 200 if (value == null) { 201 clearValue(key, hash, computingValueReference); 202 } 203 } 204 } 205 } 206 207 /** 208 * Used to provide computation exceptions to other threads. 209 */ 210 private static final class ComputationExceptionReference<K, V> implements ValueReference<K, V> { 211 final Throwable t; 212 ComputationExceptionReference(Throwable t)213 ComputationExceptionReference(Throwable t) { 214 this.t = t; 215 } 216 217 @Override get()218 public V get() { 219 return null; 220 } 221 222 @Override getEntry()223 public ReferenceEntry<K, V> getEntry() { 224 return null; 225 } 226 227 @Override copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry)228 public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) { 229 return this; 230 } 231 232 @Override isComputingReference()233 public boolean isComputingReference() { 234 return false; 235 } 236 237 @Override waitForValue()238 public V waitForValue() throws ExecutionException { 239 throw new ExecutionException(t); 240 } 241 242 @Override clear(ValueReference<K, V> newValue)243 public void clear(ValueReference<K, V> newValue) {} 244 } 245 246 /** 247 * Used to provide computation result to other threads. 248 */ 249 private static final class ComputedReference<K, V> implements ValueReference<K, V> { 250 final V value; 251 ComputedReference(@ullable V value)252 ComputedReference(@Nullable V value) { 253 this.value = value; 254 } 255 256 @Override get()257 public V get() { 258 return value; 259 } 260 261 @Override getEntry()262 public ReferenceEntry<K, V> getEntry() { 263 return null; 264 } 265 266 @Override copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry)267 public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) { 268 return this; 269 } 270 271 @Override isComputingReference()272 public boolean isComputingReference() { 273 return false; 274 } 275 276 @Override waitForValue()277 public V waitForValue() { 278 return get(); 279 } 280 281 @Override clear(ValueReference<K, V> newValue)282 public void clear(ValueReference<K, V> newValue) {} 283 } 284 285 private static final class ComputingValueReference<K, V> implements ValueReference<K, V> { 286 final Function<? super K, ? extends V> computingFunction; 287 288 @GuardedBy("ComputingValueReference.this") // writes 289 volatile ValueReference<K, V> computedReference = unset(); 290 ComputingValueReference(Function<? super K, ? extends V> computingFunction)291 public ComputingValueReference(Function<? super K, ? extends V> computingFunction) { 292 this.computingFunction = computingFunction; 293 } 294 295 @Override get()296 public V get() { 297 // All computation lookups go through waitForValue. This method thus is 298 // only used by put, to whom we always want to appear absent. 299 return null; 300 } 301 302 @Override getEntry()303 public ReferenceEntry<K, V> getEntry() { 304 return null; 305 } 306 307 @Override copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry)308 public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) { 309 return this; 310 } 311 312 @Override isComputingReference()313 public boolean isComputingReference() { 314 return true; 315 } 316 317 /** 318 * Waits for a computation to complete. Returns the result of the computation. 319 */ 320 @Override waitForValue()321 public V waitForValue() throws ExecutionException { 322 if (computedReference == UNSET) { 323 boolean interrupted = false; 324 try { 325 synchronized (this) { 326 while (computedReference == UNSET) { 327 try { 328 wait(); 329 } catch (InterruptedException ie) { 330 interrupted = true; 331 } 332 } 333 } 334 } finally { 335 if (interrupted) { 336 Thread.currentThread().interrupt(); 337 } 338 } 339 } 340 return computedReference.waitForValue(); 341 } 342 343 @Override clear(ValueReference<K, V> newValue)344 public void clear(ValueReference<K, V> newValue) { 345 // The pending computation was clobbered by a manual write. Unblock all 346 // pending gets, and have them return the new value. 347 setValueReference(newValue); 348 349 // TODO(fry): could also cancel computation if we had a thread handle 350 } 351 compute(K key, int hash)352 V compute(K key, int hash) throws ExecutionException { 353 V value; 354 try { 355 value = computingFunction.apply(key); 356 } catch (Throwable t) { 357 setValueReference(new ComputationExceptionReference<K, V>(t)); 358 throw new ExecutionException(t); 359 } 360 361 setValueReference(new ComputedReference<K, V>(value)); 362 return value; 363 } 364 setValueReference(ValueReference<K, V> valueReference)365 void setValueReference(ValueReference<K, V> valueReference) { 366 synchronized (this) { 367 if (computedReference == UNSET) { 368 computedReference = valueReference; 369 notifyAll(); 370 } 371 } 372 } 373 } 374 375 /** 376 * Overrides get() to compute on demand. Also throws an exception when {@code null} is returned 377 * from a computation. 378 */ 379 static final class ComputingMapAdapter<K, V> 380 extends ComputingConcurrentHashMap<K, V> implements Serializable { 381 private static final long serialVersionUID = 0; 382 ComputingMapAdapter(MapMaker mapMaker, Function<? super K, ? extends V> computingFunction)383 ComputingMapAdapter(MapMaker mapMaker, 384 Function<? super K, ? extends V> computingFunction) { 385 super(mapMaker, computingFunction); 386 } 387 388 @SuppressWarnings("unchecked") // unsafe, which is one advantage of Cache over Map 389 @Override get(Object key)390 public V get(Object key) { 391 V value; 392 try { 393 value = getOrCompute((K) key); 394 } catch (ExecutionException e) { 395 Throwable cause = e.getCause(); 396 Throwables.propagateIfInstanceOf(cause, ComputationException.class); 397 throw new ComputationException(cause); 398 } 399 400 if (value == null) { 401 throw new NullPointerException(computingFunction + " returned null for key " + key + "."); 402 } 403 return value; 404 } 405 } 406 407 // Serialization Support 408 409 private static final long serialVersionUID = 4; 410 411 @Override writeReplace()412 Object writeReplace() { 413 return new ComputingSerializationProxy<K, V>(keyStrength, valueStrength, keyEquivalence, 414 valueEquivalence, expireAfterWriteNanos, expireAfterAccessNanos, maximumSize, 415 concurrencyLevel, removalListener, this, computingFunction); 416 } 417 418 static final class ComputingSerializationProxy<K, V> extends AbstractSerializationProxy<K, V> { 419 420 final Function<? super K, ? extends V> computingFunction; 421 ComputingSerializationProxy(Strength keyStrength, Strength valueStrength, Equivalence<Object> keyEquivalence, Equivalence<Object> valueEquivalence, long expireAfterWriteNanos, long expireAfterAccessNanos, int maximumSize, int concurrencyLevel, RemovalListener<? super K, ? super V> removalListener, ConcurrentMap<K, V> delegate, Function<? super K, ? extends V> computingFunction)422 ComputingSerializationProxy(Strength keyStrength, Strength valueStrength, 423 Equivalence<Object> keyEquivalence, Equivalence<Object> valueEquivalence, 424 long expireAfterWriteNanos, long expireAfterAccessNanos, int maximumSize, 425 int concurrencyLevel, RemovalListener<? super K, ? super V> removalListener, 426 ConcurrentMap<K, V> delegate, Function<? super K, ? extends V> computingFunction) { 427 super(keyStrength, valueStrength, keyEquivalence, valueEquivalence, expireAfterWriteNanos, 428 expireAfterAccessNanos, maximumSize, concurrencyLevel, removalListener, delegate); 429 this.computingFunction = computingFunction; 430 } 431 writeObject(ObjectOutputStream out)432 private void writeObject(ObjectOutputStream out) throws IOException { 433 out.defaultWriteObject(); 434 writeMapTo(out); 435 } 436 437 @SuppressWarnings("deprecation") // self-use readObject(ObjectInputStream in)438 private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 439 in.defaultReadObject(); 440 MapMaker mapMaker = readMapMaker(in); 441 delegate = mapMaker.makeComputingMap(computingFunction); 442 readEntries(in); 443 } 444 readResolve()445 Object readResolve() { 446 return delegate; 447 } 448 449 private static final long serialVersionUID = 4; 450 } 451 } 452