• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20 
21 import com.google.common.annotations.Beta;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Supplier;
24 import com.google.common.base.Throwables;
25 
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.locks.ReentrantLock;
35 import java.util.logging.Level;
36 import java.util.logging.Logger;
37 
38 import javax.annotation.concurrent.GuardedBy;
39 
40 /**
41  * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in
42  * the "running" state need to perform a periodic task.  Subclasses can implement {@link #startUp},
43  * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically.
44  *
45  * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run
46  * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the
47  * {@link #runOneIteration} that will be executed periodically as specified by its
48  * {@link Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the
49  * periodic task (but not interrupt it) and wait for it to stop before running the
50  * {@link #shutDown} method.
51  *
52  * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link
53  * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link
54  * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start
55  * late.  Also, all life cycle methods are executed with a lock held, so subclasses can safely
56  * modify shared state without additional synchronization necessary for visibility to later
57  * executions of the life cycle methods.
58  *
59  * <h3>Usage Example</h3>
60  *
61  * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to
62  * rate limit itself. <pre> {@code
63  * class CrawlingService extends AbstractScheduledService {
64  *   private Set<Uri> visited;
65  *   private Queue<Uri> toCrawl;
66  *   protected void startUp() throws Exception {
67  *     toCrawl = readStartingUris();
68  *   }
69  *
70  *   protected void runOneIteration() throws Exception {
71  *     Uri uri = toCrawl.remove();
72  *     Collection<Uri> newUris = crawl(uri);
73  *     visited.add(uri);
74  *     for (Uri newUri : newUris) {
75  *       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
76  *     }
77  *   }
78  *
79  *   protected void shutDown() throws Exception {
80  *     saveUris(toCrawl);
81  *   }
82  *
83  *   protected Scheduler scheduler() {
84  *     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
85  *   }
86  * }}</pre>
87  *
88  * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of
89  * outstanding URIs when shutting down.  Also, it takes advantage of the scheduling functionality to
90  * rate limit the number of queries we perform.
91  *
92  * @author Luke Sandberg
93  * @since 11.0
94  */
95 @Beta
96 public abstract class AbstractScheduledService implements Service {
97   private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName());
98 
99   /**
100    * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its
101    * task.
102    *
103    * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory
104    * methods, these provide {@link Scheduler} instances for the common use case of running the
105    * service with a fixed schedule.  If more flexibility is needed then consider subclassing
106    * {@link CustomScheduler}.
107    *
108    * @author Luke Sandberg
109    * @since 11.0
110    */
111   public abstract static class Scheduler {
112     /**
113      * Returns a {@link Scheduler} that schedules the task using the
114      * {@link ScheduledExecutorService#scheduleWithFixedDelay} method.
115      *
116      * @param initialDelay the time to delay first execution
117      * @param delay the delay between the termination of one execution and the commencement of the
118      *        next
119      * @param unit the time unit of the initialDelay and delay parameters
120      */
newFixedDelaySchedule(final long initialDelay, final long delay, final TimeUnit unit)121     public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay,
122         final TimeUnit unit) {
123       return new Scheduler() {
124         @Override
125         public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
126             Runnable task) {
127           return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
128         }
129       };
130     }
131 
132     /**
133      * Returns a {@link Scheduler} that schedules the task using the
134      * {@link ScheduledExecutorService#scheduleAtFixedRate} method.
135      *
136      * @param initialDelay the time to delay first execution
137      * @param period the period between successive executions of the task
138      * @param unit the time unit of the initialDelay and period parameters
139      */
newFixedRateSchedule(final long initialDelay, final long period, final TimeUnit unit)140     public static Scheduler newFixedRateSchedule(final long initialDelay, final long period,
141         final TimeUnit unit) {
142       return new Scheduler() {
143         @Override
144         public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
145             Runnable task) {
146           return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
147         }
148       };
149     }
150 
151     /** Schedules the task to run on the provided executor on behalf of the service.  */
152     abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
153         Runnable runnable);
154 
155     private Scheduler() {}
156   }
157 
158   /* use AbstractService for state management */
159   private final AbstractService delegate = new AbstractService() {
160 
161     // A handle to the running task so that we can stop it when a shutdown has been requested.
162     // These two fields are volatile because their values will be accessed from multiple threads.
163     private volatile Future<?> runningTask;
164     private volatile ScheduledExecutorService executorService;
165 
166     // This lock protects the task so we can ensure that none of the template methods (startUp,
167     // shutDown or runOneIteration) run concurrently with one another.
168     private final ReentrantLock lock = new ReentrantLock();
169 
170     private final Runnable task = new Runnable() {
171       @Override public void run() {
172         lock.lock();
173         try {
174           AbstractScheduledService.this.runOneIteration();
175         } catch (Throwable t) {
176           try {
177             shutDown();
178           } catch (Exception ignored) {
179             logger.log(Level.WARNING,
180                 "Error while attempting to shut down the service after failure.", ignored);
181           }
182           notifyFailed(t);
183           throw Throwables.propagate(t);
184         } finally {
185           lock.unlock();
186         }
187       }
188     };
189 
190     @Override protected final void doStart() {
191       executorService = MoreExecutors.renamingDecorator(executor(), new Supplier<String>() {
192         @Override public String get() {
193           return serviceName() + " " + state();
194         }
195       });
196       executorService.execute(new Runnable() {
197         @Override public void run() {
198           lock.lock();
199           try {
200             startUp();
201             runningTask = scheduler().schedule(delegate, executorService, task);
202             notifyStarted();
203           } catch (Throwable t) {
204             notifyFailed(t);
205             throw Throwables.propagate(t);
206           } finally {
207             lock.unlock();
208           }
209         }
210       });
211     }
212 
213     @Override protected final void doStop() {
214       runningTask.cancel(false);
215       executorService.execute(new Runnable() {
216         @Override public void run() {
217           try {
218             lock.lock();
219             try {
220               if (state() != State.STOPPING) {
221                 // This means that the state has changed since we were scheduled.  This implies that
222                 // an execution of runOneIteration has thrown an exception and we have transitioned
223                 // to a failed state, also this means that shutDown has already been called, so we
224                 // do not want to call it again.
225                 return;
226               }
227               shutDown();
228             } finally {
229               lock.unlock();
230             }
231             notifyStopped();
232           } catch (Throwable t) {
233             notifyFailed(t);
234             throw Throwables.propagate(t);
235           }
236         }
237       });
238     }
239   };
240 
241   /** Constructor for use by subclasses. */
242   protected AbstractScheduledService() {}
243 
244   /**
245    * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
246    * the service will transition to the {@link Service.State#FAILED} state and this method will no
247    * longer be called.
248    */
249   protected abstract void runOneIteration() throws Exception;
250 
251   /**
252    * Start the service.
253    *
254    * <p>By default this method does nothing.
255    */
256   protected void startUp() throws Exception {}
257 
258   /**
259    * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}.
260    *
261    * <p>By default this method does nothing.
262    */
263   protected void shutDown() throws Exception {}
264 
265   /**
266    * Returns the {@link Scheduler} object used to configure this service.  This method will only be
267    * called once.
268    */
269   protected abstract Scheduler scheduler();
270 
271   /**
272    * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
273    * {@link #runOneIteration} and {@link #shutDown} methods.  If this method is overridden the
274    * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this
275    * service {@linkplain Service.State#TERMINATED terminates} or
276    * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a
277    * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called
278    * once.
279    *
280    * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
281    * pool that sets the name of the thread to the {@linkplain #serviceName() service name}.
282    * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the
283    * service {@linkplain Service.State#TERMINATED terminates} or
284    * {@linkplain Service.State#TERMINATED fails}.
285    */
286   protected ScheduledExecutorService executor() {
287     final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
288         new ThreadFactory() {
289           @Override public Thread newThread(Runnable runnable) {
290             return MoreExecutors.newThread(serviceName(), runnable);
291           }
292         });
293     // Add a listener to shutdown the executor after the service is stopped.  This ensures that the
294     // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
295     // Technically this listener is added after start() was called so it is a little gross, but it
296     // is called within doStart() so we know that the service cannot terminate or fail concurrently
297     // with adding this listener so it is impossible to miss an event that we are interested in.
298     addListener(new Listener() {
299       @Override public void terminated(State from) {
300         executor.shutdown();
301       }
302       @Override public void failed(State from, Throwable failure) {
303         executor.shutdown();
304       }
305     }, directExecutor());
306     return executor;
307   }
308 
309   /**
310    * Returns the name of this service. {@link AbstractScheduledService} may include the name in
311    * debugging output.
312    *
313    * @since 14.0
314    */
315   protected String serviceName() {
316     return getClass().getSimpleName();
317   }
318 
319   @Override public String toString() {
320     return serviceName() + " [" + state() + "]";
321   }
322 
323   @Override public final boolean isRunning() {
324     return delegate.isRunning();
325   }
326 
327   @Override public final State state() {
328     return delegate.state();
329   }
330 
331   /**
332    * @since 13.0
333    */
334   @Override public final void addListener(Listener listener, Executor executor) {
335     delegate.addListener(listener, executor);
336   }
337 
338   /**
339    * @since 14.0
340    */
341   @Override public final Throwable failureCause() {
342     return delegate.failureCause();
343   }
344 
345   /**
346    * @since 15.0
347    */
348   @Override public final Service startAsync() {
349     delegate.startAsync();
350     return this;
351   }
352 
353   /**
354    * @since 15.0
355    */
356   @Override public final Service stopAsync() {
357     delegate.stopAsync();
358     return this;
359   }
360 
361   /**
362    * @since 15.0
363    */
364   @Override public final void awaitRunning() {
365     delegate.awaitRunning();
366   }
367 
368   /**
369    * @since 15.0
370    */
371   @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
372     delegate.awaitRunning(timeout, unit);
373   }
374 
375   /**
376    * @since 15.0
377    */
378   @Override public final void awaitTerminated() {
379     delegate.awaitTerminated();
380   }
381 
382   /**
383    * @since 15.0
384    */
385   @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
386     delegate.awaitTerminated(timeout, unit);
387   }
388 
389   /**
390    * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to
391    * use a dynamically changing schedule.  After every execution of the task, assuming it hasn't
392    * been cancelled, the {@link #getNextSchedule} method will be called.
393    *
394    * @author Luke Sandberg
395    * @since 11.0
396    */
397   @Beta
398   public abstract static class CustomScheduler extends Scheduler {
399 
400     /**
401      * A callable class that can reschedule itself using a {@link CustomScheduler}.
402      */
403     private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> {
404 
405       /** The underlying task. */
406       private final Runnable wrappedRunnable;
407 
408       /** The executor on which this Callable will be scheduled. */
409       private final ScheduledExecutorService executor;
410 
411       /**
412        * The service that is managing this callable.  This is used so that failure can be
413        * reported properly.
414        */
415       private final AbstractService service;
416 
417       /**
418        * This lock is used to ensure safe and correct cancellation, it ensures that a new task is
419        * not scheduled while a cancel is ongoing.  Also it protects the currentFuture variable to
420        * ensure that it is assigned atomically with being scheduled.
421        */
422       private final ReentrantLock lock = new ReentrantLock();
423 
424       /** The future that represents the next execution of this task.*/
425       @GuardedBy("lock")
426       private Future<Void> currentFuture;
427 
428       ReschedulableCallable(AbstractService service, ScheduledExecutorService executor,
429           Runnable runnable) {
430         this.wrappedRunnable = runnable;
431         this.executor = executor;
432         this.service = service;
433       }
434 
435       @Override
436       public Void call() throws Exception {
437         wrappedRunnable.run();
438         reschedule();
439         return null;
440       }
441 
442       /**
443        * Atomically reschedules this task and assigns the new future to {@link #currentFuture}.
444        */
445       public void reschedule() {
446         // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
447         // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
448         // to currentFuture doesn't race with itself so that currentFuture is assigned in the
449         // correct order.
450         lock.lock();
451         try {
452           if (currentFuture == null || !currentFuture.isCancelled()) {
453             final Schedule schedule = CustomScheduler.this.getNextSchedule();
454             currentFuture = executor.schedule(this, schedule.delay, schedule.unit);
455           }
456         } catch (Throwable e) {
457           // If an exception is thrown by the subclass then we need to make sure that the service
458           // notices and transitions to the FAILED state.  We do it by calling notifyFailed directly
459           // because the service does not monitor the state of the future so if the exception is not
460           // caught and forwarded to the service the task would stop executing but the service would
461           // have no idea.
462           service.notifyFailed(e);
463         } finally {
464           lock.unlock();
465         }
466       }
467 
468       // N.B. Only protect cancel and isCancelled because those are the only methods that are
469       // invoked by the AbstractScheduledService.
470       @Override
471       public boolean cancel(boolean mayInterruptIfRunning) {
472         // Ensure that a task cannot be rescheduled while a cancel is ongoing.
473         lock.lock();
474         try {
475           return currentFuture.cancel(mayInterruptIfRunning);
476         } finally {
477           lock.unlock();
478         }
479       }
480 
481       @Override
482       protected Future<Void> delegate() {
483         throw new UnsupportedOperationException("Only cancel is supported by this future");
484       }
485     }
486 
487     @Override
488     final Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
489         Runnable runnable) {
490       ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable);
491       task.reschedule();
492       return task;
493     }
494 
495     /**
496      * A value object that represents an absolute delay until a task should be invoked.
497      *
498      * @author Luke Sandberg
499      * @since 11.0
500      */
501     @Beta
502     protected static final class Schedule {
503 
504       private final long delay;
505       private final TimeUnit unit;
506 
507       /**
508        * @param delay the time from now to delay execution
509        * @param unit the time unit of the delay parameter
510        */
511       public Schedule(long delay, TimeUnit unit) {
512         this.delay = delay;
513         this.unit = Preconditions.checkNotNull(unit);
514       }
515     }
516 
517     /**
518      * Calculates the time at which to next invoke the task.
519      *
520      * <p>This is guaranteed to be called immediately after the task has completed an iteration and
521      * on the same thread as the previous execution of {@link
522      * AbstractScheduledService#runOneIteration}.
523      *
524      * @return a schedule that defines the delay before the next execution.
525      */
526     protected abstract Schedule getNextSchedule() throws Exception;
527   }
528 }
529