• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2013 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.jimfs;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.MoreObjects;
25 import com.google.common.collect.ImmutableList;
26 import com.google.common.collect.ImmutableSet;
27 import java.io.IOException;
28 import java.nio.file.ClosedWatchServiceException;
29 import java.nio.file.WatchEvent;
30 import java.nio.file.WatchKey;
31 import java.nio.file.WatchService;
32 import java.nio.file.Watchable;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Objects;
37 import java.util.concurrent.ArrayBlockingQueue;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.checkerframework.checker.nullness.compatqual.NullableDecl;
45 
46 /**
47  * Abstract implementation of {@link WatchService}. Provides the means for registering and managing
48  * keys but does not handle actually watching. Subclasses should implement the means of watching
49  * watchables, posting events to registered keys and queueing keys with the service by signalling
50  * them.
51  *
52  * @author Colin Decker
53  */
54 abstract class AbstractWatchService implements WatchService {
55 
56   private final BlockingQueue<WatchKey> queue = new LinkedBlockingQueue<>();
57   private final WatchKey poison = new Key(this, null, ImmutableSet.<WatchEvent.Kind<?>>of());
58 
59   private final AtomicBoolean open = new AtomicBoolean(true);
60 
61   /**
62    * Registers the given watchable with this service, returning a new watch key for it. This
63    * implementation just checks that the service is open and creates a key; subclasses may override
64    * it to do other things as well.
65    */
register(Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> eventTypes)66   public Key register(Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> eventTypes)
67       throws IOException {
68     checkOpen();
69     return new Key(this, watchable, eventTypes);
70   }
71 
72   /** Returns whether or not this watch service is open. */
73   @VisibleForTesting
isOpen()74   public boolean isOpen() {
75     return open.get();
76   }
77 
78   /** Enqueues the given key if the watch service is open; does nothing otherwise. */
enqueue(Key key)79   final void enqueue(Key key) {
80     if (isOpen()) {
81       queue.add(key);
82     }
83   }
84 
85   /** Called when the given key is cancelled. Does nothing by default. */
cancelled(Key key)86   public void cancelled(Key key) {}
87 
88   @VisibleForTesting
queuedKeys()89   ImmutableList<WatchKey> queuedKeys() {
90     return ImmutableList.copyOf(queue);
91   }
92 
93   @NullableDecl
94   @Override
poll()95   public WatchKey poll() {
96     checkOpen();
97     return check(queue.poll());
98   }
99 
100   @NullableDecl
101   @Override
poll(long timeout, TimeUnit unit)102   public WatchKey poll(long timeout, TimeUnit unit) throws InterruptedException {
103     checkOpen();
104     return check(queue.poll(timeout, unit));
105   }
106 
107   @Override
take()108   public WatchKey take() throws InterruptedException {
109     checkOpen();
110     return check(queue.take());
111   }
112 
113   /** Returns the given key, throwing an exception if it's the poison. */
114   @NullableDecl
check(@ullableDecl WatchKey key)115   private WatchKey check(@NullableDecl WatchKey key) {
116     if (key == poison) {
117       // ensure other blocking threads get the poison
118       queue.offer(poison);
119       throw new ClosedWatchServiceException();
120     }
121     return key;
122   }
123 
124   /** Checks that the watch service is open, throwing {@link ClosedWatchServiceException} if not. */
checkOpen()125   protected final void checkOpen() {
126     if (!open.get()) {
127       throw new ClosedWatchServiceException();
128     }
129   }
130 
131   @Override
close()132   public void close() {
133     if (open.compareAndSet(true, false)) {
134       queue.clear();
135       queue.offer(poison);
136     }
137   }
138 
139   /** A basic implementation of {@link WatchEvent}. */
140   static final class Event<T> implements WatchEvent<T> {
141 
142     private final Kind<T> kind;
143     private final int count;
144 
145     @NullableDecl private final T context;
146 
Event(Kind<T> kind, int count, @NullableDecl T context)147     public Event(Kind<T> kind, int count, @NullableDecl T context) {
148       this.kind = checkNotNull(kind);
149       checkArgument(count >= 0, "count (%s) must be non-negative", count);
150       this.count = count;
151       this.context = context;
152     }
153 
154     @Override
kind()155     public Kind<T> kind() {
156       return kind;
157     }
158 
159     @Override
count()160     public int count() {
161       return count;
162     }
163 
164     @NullableDecl
165     @Override
context()166     public T context() {
167       return context;
168     }
169 
170     @Override
equals(Object obj)171     public boolean equals(Object obj) {
172       if (obj instanceof Event) {
173         Event<?> other = (Event<?>) obj;
174         return kind().equals(other.kind())
175             && count() == other.count()
176             && Objects.equals(context(), other.context());
177       }
178       return false;
179     }
180 
181     @Override
hashCode()182     public int hashCode() {
183       return Objects.hash(kind(), count(), context());
184     }
185 
186     @Override
toString()187     public String toString() {
188       return MoreObjects.toStringHelper(this)
189           .add("kind", kind())
190           .add("count", count())
191           .add("context", context())
192           .toString();
193     }
194   }
195 
196   /** Implementation of {@link WatchKey} for an {@link AbstractWatchService}. */
197   static final class Key implements WatchKey {
198 
199     @VisibleForTesting static final int MAX_QUEUE_SIZE = 256;
200 
overflowEvent(int count)201     private static WatchEvent<Object> overflowEvent(int count) {
202       return new Event<>(OVERFLOW, count, null);
203     }
204 
205     private final AbstractWatchService watcher;
206     private final Watchable watchable;
207     private final ImmutableSet<WatchEvent.Kind<?>> subscribedTypes;
208 
209     private final AtomicReference<State> state = new AtomicReference<>(State.READY);
210     private final AtomicBoolean valid = new AtomicBoolean(true);
211     private final AtomicInteger overflow = new AtomicInteger();
212 
213     private final BlockingQueue<WatchEvent<?>> events = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
214 
Key( AbstractWatchService watcher, @NullableDecl Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> subscribedTypes)215     public Key(
216         AbstractWatchService watcher,
217         @NullableDecl Watchable watchable,
218         Iterable<? extends WatchEvent.Kind<?>> subscribedTypes) {
219       this.watcher = checkNotNull(watcher);
220       this.watchable = watchable; // nullable for Watcher poison
221       this.subscribedTypes = ImmutableSet.copyOf(subscribedTypes);
222     }
223 
224     /** Gets the current state of this key, State.READY or SIGNALLED. */
225     @VisibleForTesting
state()226     State state() {
227       return state.get();
228     }
229 
230     /** Gets whether or not this key is subscribed to the given type of event. */
subscribesTo(WatchEvent.Kind<?> eventType)231     public boolean subscribesTo(WatchEvent.Kind<?> eventType) {
232       return subscribedTypes.contains(eventType);
233     }
234 
235     /**
236      * Posts the given event to this key. After posting one or more events, {@link #signal()} must
237      * be called to cause the key to be enqueued with the watch service.
238      */
post(WatchEvent<?> event)239     public void post(WatchEvent<?> event) {
240       if (!events.offer(event)) {
241         overflow.incrementAndGet();
242       }
243     }
244 
245     /**
246      * Sets the state to SIGNALLED and enqueues this key with the watcher if it was previously in
247      * the READY state.
248      */
signal()249     public void signal() {
250       if (state.getAndSet(State.SIGNALLED) == State.READY) {
251         watcher.enqueue(this);
252       }
253     }
254 
255     @Override
isValid()256     public boolean isValid() {
257       return watcher.isOpen() && valid.get();
258     }
259 
260     @Override
pollEvents()261     public List<WatchEvent<?>> pollEvents() {
262       // note: it's correct to be able to retrieve more events from a key without calling reset()
263       // reset() is ONLY for "returning" the key to the watch service to potentially be retrieved by
264       // another thread when you're finished with it
265       List<WatchEvent<?>> result = new ArrayList<>(events.size());
266       events.drainTo(result);
267       int overflowCount = overflow.getAndSet(0);
268       if (overflowCount != 0) {
269         result.add(overflowEvent(overflowCount));
270       }
271       return Collections.unmodifiableList(result);
272     }
273 
274     @Override
reset()275     public boolean reset() {
276       // calling reset() multiple times without polling events would cause key to be placed in
277       // watcher queue multiple times, but not much that can be done about that
278       if (isValid() && state.compareAndSet(State.SIGNALLED, State.READY)) {
279         // requeue if events are pending
280         if (!events.isEmpty()) {
281           signal();
282         }
283       }
284 
285       return isValid();
286     }
287 
288     @Override
cancel()289     public void cancel() {
290       valid.set(false);
291       watcher.cancelled(this);
292     }
293 
294     @Override
watchable()295     public Watchable watchable() {
296       return watchable;
297     }
298 
299     @VisibleForTesting
300     enum State {
301       READY,
302       SIGNALLED
303     }
304   }
305 }
306