1 /* 2 * Copyright 2013 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.jimfs; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static java.nio.file.StandardWatchEventKinds.OVERFLOW; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.MoreObjects; 25 import com.google.common.collect.ImmutableList; 26 import com.google.common.collect.ImmutableSet; 27 import java.io.IOException; 28 import java.nio.file.ClosedWatchServiceException; 29 import java.nio.file.WatchEvent; 30 import java.nio.file.WatchKey; 31 import java.nio.file.WatchService; 32 import java.nio.file.Watchable; 33 import java.util.ArrayList; 34 import java.util.Collections; 35 import java.util.List; 36 import java.util.Objects; 37 import java.util.concurrent.ArrayBlockingQueue; 38 import java.util.concurrent.BlockingQueue; 39 import java.util.concurrent.LinkedBlockingQueue; 40 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.atomic.AtomicBoolean; 42 import java.util.concurrent.atomic.AtomicInteger; 43 import java.util.concurrent.atomic.AtomicReference; 44 import org.checkerframework.checker.nullness.compatqual.NullableDecl; 45 46 /** 47 * Abstract implementation of {@link WatchService}. Provides the means for registering and managing 48 * keys but does not handle actually watching. Subclasses should implement the means of watching 49 * watchables, posting events to registered keys and queueing keys with the service by signalling 50 * them. 51 * 52 * @author Colin Decker 53 */ 54 abstract class AbstractWatchService implements WatchService { 55 56 private final BlockingQueue<WatchKey> queue = new LinkedBlockingQueue<>(); 57 private final WatchKey poison = new Key(this, null, ImmutableSet.<WatchEvent.Kind<?>>of()); 58 59 private final AtomicBoolean open = new AtomicBoolean(true); 60 61 /** 62 * Registers the given watchable with this service, returning a new watch key for it. This 63 * implementation just checks that the service is open and creates a key; subclasses may override 64 * it to do other things as well. 65 */ register(Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> eventTypes)66 public Key register(Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> eventTypes) 67 throws IOException { 68 checkOpen(); 69 return new Key(this, watchable, eventTypes); 70 } 71 72 /** Returns whether or not this watch service is open. */ 73 @VisibleForTesting isOpen()74 public boolean isOpen() { 75 return open.get(); 76 } 77 78 /** Enqueues the given key if the watch service is open; does nothing otherwise. */ enqueue(Key key)79 final void enqueue(Key key) { 80 if (isOpen()) { 81 queue.add(key); 82 } 83 } 84 85 /** Called when the given key is cancelled. Does nothing by default. */ cancelled(Key key)86 public void cancelled(Key key) {} 87 88 @VisibleForTesting queuedKeys()89 ImmutableList<WatchKey> queuedKeys() { 90 return ImmutableList.copyOf(queue); 91 } 92 93 @NullableDecl 94 @Override poll()95 public WatchKey poll() { 96 checkOpen(); 97 return check(queue.poll()); 98 } 99 100 @NullableDecl 101 @Override poll(long timeout, TimeUnit unit)102 public WatchKey poll(long timeout, TimeUnit unit) throws InterruptedException { 103 checkOpen(); 104 return check(queue.poll(timeout, unit)); 105 } 106 107 @Override take()108 public WatchKey take() throws InterruptedException { 109 checkOpen(); 110 return check(queue.take()); 111 } 112 113 /** Returns the given key, throwing an exception if it's the poison. */ 114 @NullableDecl check(@ullableDecl WatchKey key)115 private WatchKey check(@NullableDecl WatchKey key) { 116 if (key == poison) { 117 // ensure other blocking threads get the poison 118 queue.offer(poison); 119 throw new ClosedWatchServiceException(); 120 } 121 return key; 122 } 123 124 /** Checks that the watch service is open, throwing {@link ClosedWatchServiceException} if not. */ checkOpen()125 protected final void checkOpen() { 126 if (!open.get()) { 127 throw new ClosedWatchServiceException(); 128 } 129 } 130 131 @Override close()132 public void close() { 133 if (open.compareAndSet(true, false)) { 134 queue.clear(); 135 queue.offer(poison); 136 } 137 } 138 139 /** A basic implementation of {@link WatchEvent}. */ 140 static final class Event<T> implements WatchEvent<T> { 141 142 private final Kind<T> kind; 143 private final int count; 144 145 @NullableDecl private final T context; 146 Event(Kind<T> kind, int count, @NullableDecl T context)147 public Event(Kind<T> kind, int count, @NullableDecl T context) { 148 this.kind = checkNotNull(kind); 149 checkArgument(count >= 0, "count (%s) must be non-negative", count); 150 this.count = count; 151 this.context = context; 152 } 153 154 @Override kind()155 public Kind<T> kind() { 156 return kind; 157 } 158 159 @Override count()160 public int count() { 161 return count; 162 } 163 164 @NullableDecl 165 @Override context()166 public T context() { 167 return context; 168 } 169 170 @Override equals(Object obj)171 public boolean equals(Object obj) { 172 if (obj instanceof Event) { 173 Event<?> other = (Event<?>) obj; 174 return kind().equals(other.kind()) 175 && count() == other.count() 176 && Objects.equals(context(), other.context()); 177 } 178 return false; 179 } 180 181 @Override hashCode()182 public int hashCode() { 183 return Objects.hash(kind(), count(), context()); 184 } 185 186 @Override toString()187 public String toString() { 188 return MoreObjects.toStringHelper(this) 189 .add("kind", kind()) 190 .add("count", count()) 191 .add("context", context()) 192 .toString(); 193 } 194 } 195 196 /** Implementation of {@link WatchKey} for an {@link AbstractWatchService}. */ 197 static final class Key implements WatchKey { 198 199 @VisibleForTesting static final int MAX_QUEUE_SIZE = 256; 200 overflowEvent(int count)201 private static WatchEvent<Object> overflowEvent(int count) { 202 return new Event<>(OVERFLOW, count, null); 203 } 204 205 private final AbstractWatchService watcher; 206 private final Watchable watchable; 207 private final ImmutableSet<WatchEvent.Kind<?>> subscribedTypes; 208 209 private final AtomicReference<State> state = new AtomicReference<>(State.READY); 210 private final AtomicBoolean valid = new AtomicBoolean(true); 211 private final AtomicInteger overflow = new AtomicInteger(); 212 213 private final BlockingQueue<WatchEvent<?>> events = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); 214 Key( AbstractWatchService watcher, @NullableDecl Watchable watchable, Iterable<? extends WatchEvent.Kind<?>> subscribedTypes)215 public Key( 216 AbstractWatchService watcher, 217 @NullableDecl Watchable watchable, 218 Iterable<? extends WatchEvent.Kind<?>> subscribedTypes) { 219 this.watcher = checkNotNull(watcher); 220 this.watchable = watchable; // nullable for Watcher poison 221 this.subscribedTypes = ImmutableSet.copyOf(subscribedTypes); 222 } 223 224 /** Gets the current state of this key, State.READY or SIGNALLED. */ 225 @VisibleForTesting state()226 State state() { 227 return state.get(); 228 } 229 230 /** Gets whether or not this key is subscribed to the given type of event. */ subscribesTo(WatchEvent.Kind<?> eventType)231 public boolean subscribesTo(WatchEvent.Kind<?> eventType) { 232 return subscribedTypes.contains(eventType); 233 } 234 235 /** 236 * Posts the given event to this key. After posting one or more events, {@link #signal()} must 237 * be called to cause the key to be enqueued with the watch service. 238 */ post(WatchEvent<?> event)239 public void post(WatchEvent<?> event) { 240 if (!events.offer(event)) { 241 overflow.incrementAndGet(); 242 } 243 } 244 245 /** 246 * Sets the state to SIGNALLED and enqueues this key with the watcher if it was previously in 247 * the READY state. 248 */ signal()249 public void signal() { 250 if (state.getAndSet(State.SIGNALLED) == State.READY) { 251 watcher.enqueue(this); 252 } 253 } 254 255 @Override isValid()256 public boolean isValid() { 257 return watcher.isOpen() && valid.get(); 258 } 259 260 @Override pollEvents()261 public List<WatchEvent<?>> pollEvents() { 262 // note: it's correct to be able to retrieve more events from a key without calling reset() 263 // reset() is ONLY for "returning" the key to the watch service to potentially be retrieved by 264 // another thread when you're finished with it 265 List<WatchEvent<?>> result = new ArrayList<>(events.size()); 266 events.drainTo(result); 267 int overflowCount = overflow.getAndSet(0); 268 if (overflowCount != 0) { 269 result.add(overflowEvent(overflowCount)); 270 } 271 return Collections.unmodifiableList(result); 272 } 273 274 @Override reset()275 public boolean reset() { 276 // calling reset() multiple times without polling events would cause key to be placed in 277 // watcher queue multiple times, but not much that can be done about that 278 if (isValid() && state.compareAndSet(State.SIGNALLED, State.READY)) { 279 // requeue if events are pending 280 if (!events.isEmpty()) { 281 signal(); 282 } 283 } 284 285 return isValid(); 286 } 287 288 @Override cancel()289 public void cancel() { 290 valid.set(false); 291 watcher.cancelled(this); 292 } 293 294 @Override watchable()295 public Watchable watchable() { 296 return watchable; 297 } 298 299 @VisibleForTesting 300 enum State { 301 READY, 302 SIGNALLED 303 } 304 } 305 } 306