• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.collect;
16 
17 import static com.google.common.base.Preconditions.checkNotNull;
18 import static com.google.common.base.Preconditions.checkState;
19 
20 import com.google.common.base.Equivalence;
21 import com.google.common.base.Function;
22 import com.google.common.base.Throwables;
23 import com.google.common.collect.MapMaker.RemovalCause;
24 import com.google.common.collect.MapMaker.RemovalListener;
25 
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.io.Serializable;
30 import java.lang.ref.ReferenceQueue;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.atomic.AtomicReferenceArray;
34 
35 import javax.annotation.Nullable;
36 import javax.annotation.concurrent.GuardedBy;
37 
38 /**
39  * Adds computing functionality to {@link MapMakerInternalMap}.
40  *
41  * @author Bob Lee
42  * @author Charles Fry
43  */
44 class ComputingConcurrentHashMap<K, V> extends MapMakerInternalMap<K, V> {
45   final Function<? super K, ? extends V> computingFunction;
46 
47   /**
48    * Creates a new, empty map with the specified strategy, initial capacity, load factor and
49    * concurrency level.
50    */
ComputingConcurrentHashMap(MapMaker builder, Function<? super K, ? extends V> computingFunction)51   ComputingConcurrentHashMap(MapMaker builder,
52       Function<? super K, ? extends V> computingFunction) {
53     super(builder);
54     this.computingFunction = checkNotNull(computingFunction);
55   }
56 
57   @Override
createSegment(int initialCapacity, int maxSegmentSize)58   Segment<K, V> createSegment(int initialCapacity, int maxSegmentSize) {
59     return new ComputingSegment<K, V>(this, initialCapacity, maxSegmentSize);
60   }
61 
62   @Override
segmentFor(int hash)63   ComputingSegment<K, V> segmentFor(int hash) {
64     return (ComputingSegment<K, V>) super.segmentFor(hash);
65   }
66 
getOrCompute(K key)67   V getOrCompute(K key) throws ExecutionException {
68     int hash = hash(checkNotNull(key));
69     return segmentFor(hash).getOrCompute(key, hash, computingFunction);
70   }
71 
72   @SuppressWarnings("serial") // This class is never serialized.
73   static final class ComputingSegment<K, V> extends Segment<K, V> {
ComputingSegment(MapMakerInternalMap<K, V> map, int initialCapacity, int maxSegmentSize)74     ComputingSegment(MapMakerInternalMap<K, V> map, int initialCapacity, int maxSegmentSize) {
75       super(map, initialCapacity, maxSegmentSize);
76     }
77 
getOrCompute(K key, int hash, Function<? super K, ? extends V> computingFunction)78     V getOrCompute(K key, int hash, Function<? super K, ? extends V> computingFunction)
79         throws ExecutionException {
80       try {
81         outer: while (true) {
82           // don't call getLiveEntry, which would ignore computing values
83           ReferenceEntry<K, V> e = getEntry(key, hash);
84           if (e != null) {
85             V value = getLiveValue(e);
86             if (value != null) {
87               recordRead(e);
88               return value;
89             }
90           }
91 
92           // at this point e is either null, computing, or expired;
93           // avoid locking if it's already computing
94           if (e == null || !e.getValueReference().isComputingReference()) {
95             boolean createNewEntry = true;
96             ComputingValueReference<K, V> computingValueReference = null;
97             lock();
98             try {
99               preWriteCleanup();
100 
101               int newCount = this.count - 1;
102               AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
103               int index = hash & (table.length() - 1);
104               ReferenceEntry<K, V> first = table.get(index);
105 
106               for (e = first; e != null; e = e.getNext()) {
107                 K entryKey = e.getKey();
108                 if (e.getHash() == hash && entryKey != null
109                     && map.keyEquivalence.equivalent(key, entryKey)) {
110                   ValueReference<K, V> valueReference = e.getValueReference();
111                   if (valueReference.isComputingReference()) {
112                     createNewEntry = false;
113                   } else {
114                     V value = e.getValueReference().get();
115                     if (value == null) {
116                       enqueueNotification(entryKey, hash, value, RemovalCause.COLLECTED);
117                     } else if (map.expires() && map.isExpired(e)) {
118                       // This is a duplicate check, as preWriteCleanup already purged expired
119                       // entries, but let's accomodate an incorrect expiration queue.
120                       enqueueNotification(entryKey, hash, value, RemovalCause.EXPIRED);
121                     } else {
122                       recordLockedRead(e);
123                       return value;
124                     }
125 
126                     // immediately reuse invalid entries
127                     evictionQueue.remove(e);
128                     expirationQueue.remove(e);
129                     this.count = newCount; // write-volatile
130                   }
131                   break;
132                 }
133               }
134 
135               if (createNewEntry) {
136                 computingValueReference = new ComputingValueReference<K, V>(computingFunction);
137 
138                 if (e == null) {
139                   e = newEntry(key, hash, first);
140                   e.setValueReference(computingValueReference);
141                   table.set(index, e);
142                 } else {
143                   e.setValueReference(computingValueReference);
144                 }
145               }
146             } finally {
147               unlock();
148               postWriteCleanup();
149             }
150 
151             if (createNewEntry) {
152               // This thread solely created the entry.
153               return compute(key, hash, e, computingValueReference);
154             }
155           }
156 
157           // The entry already exists. Wait for the computation.
158           checkState(!Thread.holdsLock(e), "Recursive computation");
159           // don't consider expiration as we're concurrent with computation
160           V value = e.getValueReference().waitForValue();
161           if (value != null) {
162             recordRead(e);
163             return value;
164           }
165           // else computing thread will clearValue
166           continue outer;
167         }
168       } finally {
169         postReadCleanup();
170       }
171     }
172 
compute(K key, int hash, ReferenceEntry<K, V> e, ComputingValueReference<K, V> computingValueReference)173     V compute(K key, int hash, ReferenceEntry<K, V> e,
174         ComputingValueReference<K, V> computingValueReference)
175         throws ExecutionException {
176       V value = null;
177       long start = System.nanoTime();
178       long end = 0;
179       try {
180         // Synchronizes on the entry to allow failing fast when a recursive computation is
181         // detected. This is not fool-proof since the entry may be copied when the segment
182         // is written to.
183         synchronized (e) {
184           value = computingValueReference.compute(key, hash);
185           end = System.nanoTime();
186         }
187         if (value != null) {
188           // putIfAbsent
189           V oldValue = put(key, hash, value, true);
190           if (oldValue != null) {
191             // the computed value was already clobbered
192             enqueueNotification(key, hash, value, RemovalCause.REPLACED);
193           }
194         }
195         return value;
196       } finally {
197         if (end == 0) {
198           end = System.nanoTime();
199         }
200         if (value == null) {
201           clearValue(key, hash, computingValueReference);
202         }
203       }
204     }
205   }
206 
207   /**
208    * Used to provide computation exceptions to other threads.
209    */
210   private static final class ComputationExceptionReference<K, V> implements ValueReference<K, V> {
211     final Throwable t;
212 
ComputationExceptionReference(Throwable t)213     ComputationExceptionReference(Throwable t) {
214       this.t = t;
215     }
216 
217     @Override
get()218     public V get() {
219       return null;
220     }
221 
222     @Override
getEntry()223     public ReferenceEntry<K, V> getEntry() {
224       return null;
225     }
226 
227     @Override
copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry)228     public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
229       return this;
230     }
231 
232     @Override
isComputingReference()233     public boolean isComputingReference() {
234       return false;
235     }
236 
237     @Override
waitForValue()238     public V waitForValue() throws ExecutionException {
239       throw new ExecutionException(t);
240     }
241 
242     @Override
clear(ValueReference<K, V> newValue)243     public void clear(ValueReference<K, V> newValue) {}
244   }
245 
246   /**
247    * Used to provide computation result to other threads.
248    */
249   private static final class ComputedReference<K, V> implements ValueReference<K, V> {
250     final V value;
251 
ComputedReference(@ullable V value)252     ComputedReference(@Nullable V value) {
253       this.value = value;
254     }
255 
256     @Override
get()257     public V get() {
258       return value;
259     }
260 
261     @Override
getEntry()262     public ReferenceEntry<K, V> getEntry() {
263       return null;
264     }
265 
266     @Override
copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry)267     public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
268       return this;
269     }
270 
271     @Override
isComputingReference()272     public boolean isComputingReference() {
273       return false;
274     }
275 
276     @Override
waitForValue()277     public V waitForValue() {
278       return get();
279     }
280 
281     @Override
clear(ValueReference<K, V> newValue)282     public void clear(ValueReference<K, V> newValue) {}
283   }
284 
285   private static final class ComputingValueReference<K, V> implements ValueReference<K, V> {
286     final Function<? super K, ? extends V> computingFunction;
287 
288     @GuardedBy("ComputingValueReference.this") // writes
289     volatile ValueReference<K, V> computedReference = unset();
290 
ComputingValueReference(Function<? super K, ? extends V> computingFunction)291     public ComputingValueReference(Function<? super K, ? extends V> computingFunction) {
292       this.computingFunction = computingFunction;
293     }
294 
295     @Override
get()296     public V get() {
297       // All computation lookups go through waitForValue. This method thus is
298       // only used by put, to whom we always want to appear absent.
299       return null;
300     }
301 
302     @Override
getEntry()303     public ReferenceEntry<K, V> getEntry() {
304       return null;
305     }
306 
307     @Override
copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry)308     public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
309       return this;
310     }
311 
312     @Override
isComputingReference()313     public boolean isComputingReference() {
314       return true;
315     }
316 
317     /**
318      * Waits for a computation to complete. Returns the result of the computation.
319      */
320     @Override
waitForValue()321     public V waitForValue() throws ExecutionException {
322       if (computedReference == UNSET) {
323         boolean interrupted = false;
324         try {
325           synchronized (this) {
326             while (computedReference == UNSET) {
327               try {
328                 wait();
329               } catch (InterruptedException ie) {
330                 interrupted = true;
331               }
332             }
333           }
334         } finally {
335           if (interrupted) {
336             Thread.currentThread().interrupt();
337           }
338         }
339       }
340       return computedReference.waitForValue();
341     }
342 
343     @Override
clear(ValueReference<K, V> newValue)344     public void clear(ValueReference<K, V> newValue) {
345       // The pending computation was clobbered by a manual write. Unblock all
346       // pending gets, and have them return the new value.
347       setValueReference(newValue);
348 
349       // TODO(fry): could also cancel computation if we had a thread handle
350     }
351 
compute(K key, int hash)352     V compute(K key, int hash) throws ExecutionException {
353       V value;
354       try {
355         value = computingFunction.apply(key);
356       } catch (Throwable t) {
357         setValueReference(new ComputationExceptionReference<K, V>(t));
358         throw new ExecutionException(t);
359       }
360 
361       setValueReference(new ComputedReference<K, V>(value));
362       return value;
363     }
364 
setValueReference(ValueReference<K, V> valueReference)365     void setValueReference(ValueReference<K, V> valueReference) {
366       synchronized (this) {
367         if (computedReference == UNSET) {
368           computedReference = valueReference;
369           notifyAll();
370         }
371       }
372     }
373   }
374 
375   /**
376    * Overrides get() to compute on demand. Also throws an exception when {@code null} is returned
377    * from a computation.
378    */
379   static final class ComputingMapAdapter<K, V>
380       extends ComputingConcurrentHashMap<K, V> implements Serializable {
381     private static final long serialVersionUID = 0;
382 
ComputingMapAdapter(MapMaker mapMaker, Function<? super K, ? extends V> computingFunction)383     ComputingMapAdapter(MapMaker mapMaker,
384         Function<? super K, ? extends V> computingFunction) {
385       super(mapMaker, computingFunction);
386     }
387 
388     @SuppressWarnings("unchecked") // unsafe, which is one advantage of Cache over Map
389     @Override
get(Object key)390     public V get(Object key) {
391       V value;
392       try {
393         value = getOrCompute((K) key);
394       } catch (ExecutionException e) {
395         Throwable cause = e.getCause();
396         Throwables.propagateIfInstanceOf(cause, ComputationException.class);
397         throw new ComputationException(cause);
398       }
399 
400       if (value == null) {
401         throw new NullPointerException(computingFunction + " returned null for key " + key + ".");
402       }
403       return value;
404     }
405   }
406 
407   // Serialization Support
408 
409   private static final long serialVersionUID = 4;
410 
411   @Override
writeReplace()412   Object writeReplace() {
413     return new ComputingSerializationProxy<K, V>(keyStrength, valueStrength, keyEquivalence,
414         valueEquivalence, expireAfterWriteNanos, expireAfterAccessNanos, maximumSize,
415         concurrencyLevel, removalListener, this, computingFunction);
416   }
417 
418   static final class ComputingSerializationProxy<K, V> extends AbstractSerializationProxy<K, V> {
419 
420     final Function<? super K, ? extends V> computingFunction;
421 
ComputingSerializationProxy(Strength keyStrength, Strength valueStrength, Equivalence<Object> keyEquivalence, Equivalence<Object> valueEquivalence, long expireAfterWriteNanos, long expireAfterAccessNanos, int maximumSize, int concurrencyLevel, RemovalListener<? super K, ? super V> removalListener, ConcurrentMap<K, V> delegate, Function<? super K, ? extends V> computingFunction)422     ComputingSerializationProxy(Strength keyStrength, Strength valueStrength,
423         Equivalence<Object> keyEquivalence, Equivalence<Object> valueEquivalence,
424         long expireAfterWriteNanos, long expireAfterAccessNanos, int maximumSize,
425         int concurrencyLevel, RemovalListener<? super K, ? super V> removalListener,
426         ConcurrentMap<K, V> delegate, Function<? super K, ? extends V> computingFunction) {
427       super(keyStrength, valueStrength, keyEquivalence, valueEquivalence, expireAfterWriteNanos,
428           expireAfterAccessNanos, maximumSize, concurrencyLevel, removalListener, delegate);
429       this.computingFunction = computingFunction;
430     }
431 
writeObject(ObjectOutputStream out)432     private void writeObject(ObjectOutputStream out) throws IOException {
433       out.defaultWriteObject();
434       writeMapTo(out);
435     }
436 
437     @SuppressWarnings("deprecation") // self-use
readObject(ObjectInputStream in)438     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
439       in.defaultReadObject();
440       MapMaker mapMaker = readMapMaker(in);
441       delegate = mapMaker.makeComputingMap(computingFunction);
442       readEntries(in);
443     }
444 
readResolve()445     Object readResolve() {
446       return delegate;
447     }
448 
449     private static final long serialVersionUID = 4;
450   }
451 }
452