• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.ArrayList;
41 import java.util.Arrays;
42 import java.util.List;
43 import java.util.concurrent.locks.LockSupport;
44 import java.util.function.BiConsumer;
45 import java.util.function.BiPredicate;
46 import java.util.function.Consumer;
47 import static java.util.concurrent.Flow.Publisher;
48 import static java.util.concurrent.Flow.Subscriber;
49 import static java.util.concurrent.Flow.Subscription;
50 
51 /**
52  * A {@link Flow.Publisher} that asynchronously issues submitted
53  * (non-null) items to current subscribers until it is closed.  Each
54  * current subscriber receives newly submitted items in the same order
55  * unless drops or exceptions are encountered.  Using a
56  * SubmissionPublisher allows item generators to act as compliant <a
57  * href="http://www.reactive-streams.org/"> reactive-streams</a>
58  * Publishers relying on drop handling and/or blocking for flow
59  * control.
60  *
61  * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
62  * constructor for delivery to subscribers. The best choice of
63  * Executor depends on expected usage. If the generator(s) of
64  * submitted items run in separate threads, and the number of
65  * subscribers can be estimated, consider using a {@link
66  * Executors#newFixedThreadPool}. Otherwise consider using the
67  * default, normally the {@link ForkJoinPool#commonPool}.
68  *
69  * <p>Buffering allows producers and consumers to transiently operate
70  * at different rates.  Each subscriber uses an independent buffer.
71  * Buffers are created upon first use and expanded as needed up to the
72  * given maximum. (The enforced capacity may be rounded up to the
73  * nearest power of two and/or bounded by the largest value supported
74  * by this implementation.)  Invocations of {@link
75  * Flow.Subscription#request(long) request} do not directly result in
76  * buffer expansion, but risk saturation if unfilled requests exceed
77  * the maximum capacity.  The default value of {@link
78  * Flow#defaultBufferSize()} may provide a useful starting point for
79  * choosing a capacity based on expected rates, resources, and usages.
80  *
81  * <p>A single SubmissionPublisher may be shared among multiple
82  * sources. Actions in a source thread prior to publishing an item or
83  * issuing a signal <a href="package-summary.html#MemoryVisibility">
84  * <i>happen-before</i></a> actions subsequent to the corresponding
85  * access by each subscriber. But reported estimates of lag and demand
86  * are designed for use in monitoring, not for synchronization
87  * control, and may reflect stale or inaccurate views of progress.
88  *
89  * <p>Publication methods support different policies about what to do
90  * when buffers are saturated. Method {@link #submit(Object) submit}
91  * blocks until resources are available. This is simplest, but least
92  * responsive.  The {@code offer} methods may drop items (either
93  * immediately or with bounded timeout), but provide an opportunity to
94  * interpose a handler and then retry.
95  *
96  * <p>If any Subscriber method throws an exception, its subscription
97  * is cancelled.  If a handler is supplied as a constructor argument,
98  * it is invoked before cancellation upon an exception in method
99  * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
100  * {@link Flow.Subscriber#onSubscribe onSubscribe},
101  * {@link Flow.Subscriber#onError(Throwable) onError} and
102  * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
103  * handled before cancellation.  If the supplied Executor throws
104  * {@link RejectedExecutionException} (or any other RuntimeException
105  * or Error) when attempting to execute a task, or a drop handler
106  * throws an exception when processing a dropped item, then the
107  * exception is rethrown. In these cases, not all subscribers will
108  * have been issued the published item. It is usually good practice to
109  * {@link #closeExceptionally closeExceptionally} in these cases.
110  *
111  * <p>Method {@link #consume(Consumer)} simplifies support for a
112  * common case in which the only action of a subscriber is to request
113  * and process all items using a supplied function.
114  *
115  * <p>This class may also serve as a convenient base for subclasses
116  * that generate items, and use the methods in this class to publish
117  * them.  For example here is a class that periodically publishes the
118  * items generated from a supplier. (In practice you might add methods
119  * to independently start and stop generation, to share Executors
120  * among publishers, and so on, or use a SubmissionPublisher as a
121  * component rather than a superclass.)
122  *
123  * <pre> {@code
124  * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
125  *   final ScheduledFuture<?> periodicTask;
126  *   final ScheduledExecutorService scheduler;
127  *   PeriodicPublisher(Executor executor, int maxBufferCapacity,
128  *                     Supplier<? extends T> supplier,
129  *                     long period, TimeUnit unit) {
130  *     super(executor, maxBufferCapacity);
131  *     scheduler = new ScheduledThreadPoolExecutor(1);
132  *     periodicTask = scheduler.scheduleAtFixedRate(
133  *       () -> submit(supplier.get()), 0, period, unit);
134  *   }
135  *   public void close() {
136  *     periodicTask.cancel(false);
137  *     scheduler.shutdown();
138  *     super.close();
139  *   }
140  * }}</pre>
141  *
142  * <p>Here is an example of a {@link Flow.Processor} implementation.
143  * It uses single-step requests to its publisher for simplicity of
144  * illustration. A more adaptive version could monitor flow using the
145  * lag estimate returned from {@code submit}, along with other utility
146  * methods.
147  *
148  * <pre> {@code
149  * class TransformProcessor<S,T> extends SubmissionPublisher<T>
150  *   implements Flow.Processor<S,T> {
151  *   final Function<? super S, ? extends T> function;
152  *   Flow.Subscription subscription;
153  *   TransformProcessor(Executor executor, int maxBufferCapacity,
154  *                      Function<? super S, ? extends T> function) {
155  *     super(executor, maxBufferCapacity);
156  *     this.function = function;
157  *   }
158  *   public void onSubscribe(Flow.Subscription subscription) {
159  *     (this.subscription = subscription).request(1);
160  *   }
161  *   public void onNext(S item) {
162  *     subscription.request(1);
163  *     submit(function.apply(item));
164  *   }
165  *   public void onError(Throwable ex) { closeExceptionally(ex); }
166  *   public void onComplete() { close(); }
167  * }}</pre>
168  *
169  * @param <T> the published item type
170  * @author Doug Lea
171  * @since 9
172  */
173 public class SubmissionPublisher<T> implements Publisher<T>,
174                                                AutoCloseable {
175     /*
176      * Most mechanics are handled by BufferedSubscription. This class
177      * mainly tracks subscribers and ensures sequentiality, by using
178      * built-in synchronization locks across public methods. Using
179      * built-in locks works well in the most typical case in which
180      * only one thread submits items. We extend this idea in
181      * submission methods by detecting single-ownership to reduce
182      * producer-consumer synchronization strength.
183      */
184 
185     /** The largest possible power of two array size. */
186     static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
187 
188     /**
189      * Initial buffer capacity used when maxBufferCapacity is
190      * greater. Must be a power of two.
191      */
192     static final int INITIAL_CAPACITY = 32;
193 
194     /** Round capacity to power of 2, at most limit. */
roundCapacity(int cap)195     static final int roundCapacity(int cap) {
196         int n = cap - 1;
197         n |= n >>> 1;
198         n |= n >>> 2;
199         n |= n >>> 4;
200         n |= n >>> 8;
201         n |= n >>> 16;
202         return (n <= 0) ? 1 : // at least 1
203             (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
204     }
205 
206     // default Executor setup; nearly the same as CompletableFuture
207 
208     /**
209      * Default executor -- ForkJoinPool.commonPool() unless it cannot
210      * support parallelism.
211      */
212     private static final Executor ASYNC_POOL =
213         (ForkJoinPool.getCommonPoolParallelism() > 1) ?
214         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
215 
216     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
217     private static final class ThreadPerTaskExecutor implements Executor {
ThreadPerTaskExecutor()218         ThreadPerTaskExecutor() {}      // prevent access constructor creation
execute(Runnable r)219         public void execute(Runnable r) { new Thread(r).start(); }
220     }
221 
222     /**
223      * Clients (BufferedSubscriptions) are maintained in a linked list
224      * (via their "next" fields). This works well for publish loops.
225      * It requires O(n) traversal to check for duplicate subscribers,
226      * but we expect that subscribing is much less common than
227      * publishing. Unsubscribing occurs only during traversal loops,
228      * when BufferedSubscription methods return negative values
229      * signifying that they have been closed.  To reduce
230      * head-of-line blocking, submit and offer methods first call
231      * BufferedSubscription.offer on each subscriber, and place
232      * saturated ones in retries list (using nextRetry field), and
233      * retry, possibly blocking or dropping.
234      */
235     BufferedSubscription<T> clients;
236 
237     /** Run status, updated only within locks */
238     volatile boolean closed;
239     /** Set true on first call to subscribe, to initialize possible owner */
240     boolean subscribed;
241     /** The first caller thread to subscribe, or null if thread ever changed */
242     Thread owner;
243     /** If non-null, the exception in closeExceptionally */
244     volatile Throwable closedException;
245 
246     // Parameters for constructing BufferedSubscriptions
247     final Executor executor;
248     final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
249     final int maxBufferCapacity;
250 
251     /**
252      * Creates a new SubmissionPublisher using the given Executor for
253      * async delivery to subscribers, with the given maximum buffer size
254      * for each subscriber, and, if non-null, the given handler invoked
255      * when any Subscriber throws an exception in method {@link
256      * Flow.Subscriber#onNext(Object) onNext}.
257      *
258      * @param executor the executor to use for async delivery,
259      * supporting creation of at least one independent thread
260      * @param maxBufferCapacity the maximum capacity for each
261      * subscriber's buffer (the enforced capacity may be rounded up to
262      * the nearest power of two and/or bounded by the largest value
263      * supported by this implementation; method {@link #getMaxBufferCapacity}
264      * returns the actual value)
265      * @param handler if non-null, procedure to invoke upon exception
266      * thrown in method {@code onNext}
267      * @throws NullPointerException if executor is null
268      * @throws IllegalArgumentException if maxBufferCapacity not
269      * positive
270      */
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)271     public SubmissionPublisher(Executor executor, int maxBufferCapacity,
272                                BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) {
273         if (executor == null)
274             throw new NullPointerException();
275         if (maxBufferCapacity <= 0)
276             throw new IllegalArgumentException("capacity must be positive");
277         this.executor = executor;
278         this.onNextHandler = handler;
279         this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
280     }
281 
282     /**
283      * Creates a new SubmissionPublisher using the given Executor for
284      * async delivery to subscribers, with the given maximum buffer size
285      * for each subscriber, and no handler for Subscriber exceptions in
286      * method {@link Flow.Subscriber#onNext(Object) onNext}.
287      *
288      * @param executor the executor to use for async delivery,
289      * supporting creation of at least one independent thread
290      * @param maxBufferCapacity the maximum capacity for each
291      * subscriber's buffer (the enforced capacity may be rounded up to
292      * the nearest power of two and/or bounded by the largest value
293      * supported by this implementation; method {@link #getMaxBufferCapacity}
294      * returns the actual value)
295      * @throws NullPointerException if executor is null
296      * @throws IllegalArgumentException if maxBufferCapacity not
297      * positive
298      */
SubmissionPublisher(Executor executor, int maxBufferCapacity)299     public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
300         this(executor, maxBufferCapacity, null);
301     }
302 
303     /**
304      * Creates a new SubmissionPublisher using the {@link
305      * ForkJoinPool#commonPool()} for async delivery to subscribers
306      * (unless it does not support a parallelism level of at least two,
307      * in which case, a new Thread is created to run each task), with
308      * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
309      * handler for Subscriber exceptions in method {@link
310      * Flow.Subscriber#onNext(Object) onNext}.
311      */
SubmissionPublisher()312     public SubmissionPublisher() {
313         this(ASYNC_POOL, Flow.defaultBufferSize(), null);
314     }
315 
316     /**
317      * Adds the given Subscriber unless already subscribed.  If already
318      * subscribed, the Subscriber's {@link
319      * Flow.Subscriber#onError(Throwable) onError} method is invoked on
320      * the existing subscription with an {@link IllegalStateException}.
321      * Otherwise, upon success, the Subscriber's {@link
322      * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
323      * asynchronously with a new {@link Flow.Subscription}.  If {@link
324      * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
325      * subscription is cancelled. Otherwise, if this SubmissionPublisher
326      * was closed exceptionally, then the subscriber's {@link
327      * Flow.Subscriber#onError onError} method is invoked with the
328      * corresponding exception, or if closed without exception, the
329      * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
330      * method is invoked.  Subscribers may enable receiving items by
331      * invoking the {@link Flow.Subscription#request(long) request}
332      * method of the new Subscription, and may unsubscribe by invoking
333      * its {@link Flow.Subscription#cancel() cancel} method.
334      *
335      * @param subscriber the subscriber
336      * @throws NullPointerException if subscriber is null
337      */
subscribe(Subscriber<? super T> subscriber)338     public void subscribe(Subscriber<? super T> subscriber) {
339         if (subscriber == null) throw new NullPointerException();
340         int max = maxBufferCapacity; // allocate initial array
341         Object[] array = new Object[max < INITIAL_CAPACITY ?
342                                     max : INITIAL_CAPACITY];
343         BufferedSubscription<T> subscription =
344             new BufferedSubscription<T>(subscriber, executor, onNextHandler,
345                                         array, max);
346         synchronized (this) {
347             if (!subscribed) {
348                 subscribed = true;
349                 owner = Thread.currentThread();
350             }
351             for (BufferedSubscription<T> b = clients, pred = null;;) {
352                 if (b == null) {
353                     Throwable ex;
354                     subscription.onSubscribe();
355                     if ((ex = closedException) != null)
356                         subscription.onError(ex);
357                     else if (closed)
358                         subscription.onComplete();
359                     else if (pred == null)
360                         clients = subscription;
361                     else
362                         pred.next = subscription;
363                     break;
364                 }
365                 BufferedSubscription<T> next = b.next;
366                 if (b.isClosed()) {   // remove
367                     b.next = null;    // detach
368                     if (pred == null)
369                         clients = next;
370                     else
371                         pred.next = next;
372                 }
373                 else if (subscriber.equals(b.subscriber)) {
374                     b.onError(new IllegalStateException("Duplicate subscribe"));
375                     break;
376                 }
377                 else
378                     pred = b;
379                 b = next;
380             }
381         }
382     }
383 
384     /**
385      * Common implementation for all three forms of submit and offer.
386      * Acts as submit if nanos == Long.MAX_VALUE, else offer.
387      */
doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop)388     private int doOffer(T item, long nanos,
389                         BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
390         if (item == null) throw new NullPointerException();
391         int lag = 0;
392         boolean complete, unowned;
393         synchronized (this) {
394             Thread t = Thread.currentThread(), o;
395             BufferedSubscription<T> b = clients;
396             if ((unowned = ((o = owner) != t)) && o != null)
397                 owner = null;                     // disable bias
398             if (b == null)
399                 complete = closed;
400             else {
401                 complete = false;
402                 boolean cleanMe = false;
403                 BufferedSubscription<T> retries = null, rtail = null, next;
404                 do {
405                     next = b.next;
406                     int stat = b.offer(item, unowned);
407                     if (stat == 0) {              // saturated; add to retry list
408                         b.nextRetry = null;       // avoid garbage on exceptions
409                         if (rtail == null)
410                             retries = b;
411                         else
412                             rtail.nextRetry = b;
413                         rtail = b;
414                     }
415                     else if (stat < 0)            // closed
416                         cleanMe = true;           // remove later
417                     else if (stat > lag)
418                         lag = stat;
419                 } while ((b = next) != null);
420 
421                 if (retries != null || cleanMe)
422                     lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
423             }
424         }
425         if (complete)
426             throw new IllegalStateException("Closed");
427         else
428             return lag;
429     }
430 
431     /**
432      * Helps, (timed) waits for, and/or drops buffers on list; returns
433      * lag or negative drops (for use in offer).
434      */
retryOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop, BufferedSubscription<T> retries, int lag, boolean cleanMe)435     private int retryOffer(T item, long nanos,
436                            BiPredicate<Subscriber<? super T>, ? super T> onDrop,
437                            BufferedSubscription<T> retries, int lag,
438                            boolean cleanMe) {
439         for (BufferedSubscription<T> r = retries; r != null;) {
440             BufferedSubscription<T> nextRetry = r.nextRetry;
441             r.nextRetry = null;
442             if (nanos > 0L)
443                 r.awaitSpace(nanos);
444             int stat = r.retryOffer(item);
445             if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item))
446                 stat = r.retryOffer(item);
447             if (stat == 0)
448                 lag = (lag >= 0) ? -1 : lag - 1;
449             else if (stat < 0)
450                 cleanMe = true;
451             else if (lag >= 0 && stat > lag)
452                 lag = stat;
453             r = nextRetry;
454         }
455         if (cleanMe)
456             cleanAndCount();
457         return lag;
458     }
459 
460     /**
461      * Returns current list count after removing closed subscribers.
462      * Call only while holding lock.  Used mainly by retryOffer for
463      * cleanup.
464      */
cleanAndCount()465     private int cleanAndCount() {
466         int count = 0;
467         BufferedSubscription<T> pred = null, next;
468         for (BufferedSubscription<T> b = clients; b != null; b = next) {
469             next = b.next;
470             if (b.isClosed()) {
471                 b.next = null;
472                 if (pred == null)
473                     clients = next;
474                 else
475                     pred.next = next;
476             }
477             else {
478                 pred = b;
479                 ++count;
480             }
481         }
482         return count;
483     }
484 
485     /**
486      * Publishes the given item to each current subscriber by
487      * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
488      * onNext} method, blocking uninterruptibly while resources for any
489      * subscriber are unavailable. This method returns an estimate of
490      * the maximum lag (number of items submitted but not yet consumed)
491      * among all current subscribers. This value is at least one
492      * (accounting for this submitted item) if there are any
493      * subscribers, else zero.
494      *
495      * <p>If the Executor for this publisher throws a
496      * RejectedExecutionException (or any other RuntimeException or
497      * Error) when attempting to asynchronously notify subscribers,
498      * then this exception is rethrown, in which case not all
499      * subscribers will have been issued this item.
500      *
501      * @param item the (non-null) item to publish
502      * @return the estimated maximum lag among subscribers
503      * @throws IllegalStateException if closed
504      * @throws NullPointerException if item is null
505      * @throws RejectedExecutionException if thrown by Executor
506      */
submit(T item)507     public int submit(T item) {
508         return doOffer(item, Long.MAX_VALUE, null);
509     }
510 
511     /**
512      * Publishes the given item, if possible, to each current subscriber
513      * by asynchronously invoking its {@link
514      * Flow.Subscriber#onNext(Object) onNext} method. The item may be
515      * dropped by one or more subscribers if resource limits are
516      * exceeded, in which case the given handler (if non-null) is
517      * invoked, and if it returns true, retried once.  Other calls to
518      * methods in this class by other threads are blocked while the
519      * handler is invoked.  Unless recovery is assured, options are
520      * usually limited to logging the error and/or issuing an {@link
521      * Flow.Subscriber#onError(Throwable) onError} signal to the
522      * subscriber.
523      *
524      * <p>This method returns a status indicator: If negative, it
525      * represents the (negative) number of drops (failed attempts to
526      * issue the item to a subscriber). Otherwise it is an estimate of
527      * the maximum lag (number of items submitted but not yet
528      * consumed) among all current subscribers. This value is at least
529      * one (accounting for this submitted item) if there are any
530      * subscribers, else zero.
531      *
532      * <p>If the Executor for this publisher throws a
533      * RejectedExecutionException (or any other RuntimeException or
534      * Error) when attempting to asynchronously notify subscribers, or
535      * the drop handler throws an exception when processing a dropped
536      * item, then this exception is rethrown.
537      *
538      * @param item the (non-null) item to publish
539      * @param onDrop if non-null, the handler invoked upon a drop to a
540      * subscriber, with arguments of the subscriber and item; if it
541      * returns true, an offer is re-attempted (once)
542      * @return if negative, the (negative) number of drops; otherwise
543      * an estimate of maximum lag
544      * @throws IllegalStateException if closed
545      * @throws NullPointerException if item is null
546      * @throws RejectedExecutionException if thrown by Executor
547      */
offer(T item, BiPredicate<Subscriber<? super T>, ? super T> onDrop)548     public int offer(T item,
549                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
550         return doOffer(item, 0L, onDrop);
551     }
552 
553     /**
554      * Publishes the given item, if possible, to each current subscriber
555      * by asynchronously invoking its {@link
556      * Flow.Subscriber#onNext(Object) onNext} method, blocking while
557      * resources for any subscription are unavailable, up to the
558      * specified timeout or until the caller thread is interrupted, at
559      * which point the given handler (if non-null) is invoked, and if it
560      * returns true, retried once. (The drop handler may distinguish
561      * timeouts from interrupts by checking whether the current thread
562      * is interrupted.)  Other calls to methods in this class by other
563      * threads are blocked while the handler is invoked.  Unless
564      * recovery is assured, options are usually limited to logging the
565      * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
566      * onError} signal to the subscriber.
567      *
568      * <p>This method returns a status indicator: If negative, it
569      * represents the (negative) number of drops (failed attempts to
570      * issue the item to a subscriber). Otherwise it is an estimate of
571      * the maximum lag (number of items submitted but not yet
572      * consumed) among all current subscribers. This value is at least
573      * one (accounting for this submitted item) if there are any
574      * subscribers, else zero.
575      *
576      * <p>If the Executor for this publisher throws a
577      * RejectedExecutionException (or any other RuntimeException or
578      * Error) when attempting to asynchronously notify subscribers, or
579      * the drop handler throws an exception when processing a dropped
580      * item, then this exception is rethrown.
581      *
582      * @param item the (non-null) item to publish
583      * @param timeout how long to wait for resources for any subscriber
584      * before giving up, in units of {@code unit}
585      * @param unit a {@code TimeUnit} determining how to interpret the
586      * {@code timeout} parameter
587      * @param onDrop if non-null, the handler invoked upon a drop to a
588      * subscriber, with arguments of the subscriber and item; if it
589      * returns true, an offer is re-attempted (once)
590      * @return if negative, the (negative) number of drops; otherwise
591      * an estimate of maximum lag
592      * @throws IllegalStateException if closed
593      * @throws NullPointerException if item is null
594      * @throws RejectedExecutionException if thrown by Executor
595      */
offer(T item, long timeout, TimeUnit unit, BiPredicate<Subscriber<? super T>, ? super T> onDrop)596     public int offer(T item, long timeout, TimeUnit unit,
597                      BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
598         long nanos = unit.toNanos(timeout);
599         // distinguishes from untimed (only wrt interrupt policy)
600         if (nanos == Long.MAX_VALUE) --nanos;
601         return doOffer(item, nanos, onDrop);
602     }
603 
604     /**
605      * Unless already closed, issues {@link
606      * Flow.Subscriber#onComplete() onComplete} signals to current
607      * subscribers, and disallows subsequent attempts to publish.
608      * Upon return, this method does <em>NOT</em> guarantee that all
609      * subscribers have yet completed.
610      */
close()611     public void close() {
612         if (!closed) {
613             BufferedSubscription<T> b;
614             synchronized (this) {
615                 // no need to re-check closed here
616                 b = clients;
617                 clients = null;
618                 owner = null;
619                 closed = true;
620             }
621             while (b != null) {
622                 BufferedSubscription<T> next = b.next;
623                 b.next = null;
624                 b.onComplete();
625                 b = next;
626             }
627         }
628     }
629 
630     /**
631      * Unless already closed, issues {@link
632      * Flow.Subscriber#onError(Throwable) onError} signals to current
633      * subscribers with the given error, and disallows subsequent
634      * attempts to publish.  Future subscribers also receive the given
635      * error. Upon return, this method does <em>NOT</em> guarantee
636      * that all subscribers have yet completed.
637      *
638      * @param error the {@code onError} argument sent to subscribers
639      * @throws NullPointerException if error is null
640      */
closeExceptionally(Throwable error)641     public void closeExceptionally(Throwable error) {
642         if (error == null)
643             throw new NullPointerException();
644         if (!closed) {
645             BufferedSubscription<T> b;
646             synchronized (this) {
647                 b = clients;
648                 if (!closed) {  // don't clobber racing close
649                     closedException = error;
650                     clients = null;
651                     owner = null;
652                     closed = true;
653                 }
654             }
655             while (b != null) {
656                 BufferedSubscription<T> next = b.next;
657                 b.next = null;
658                 b.onError(error);
659                 b = next;
660             }
661         }
662     }
663 
664     /**
665      * Returns true if this publisher is not accepting submissions.
666      *
667      * @return true if closed
668      */
isClosed()669     public boolean isClosed() {
670         return closed;
671     }
672 
673     /**
674      * Returns the exception associated with {@link
675      * #closeExceptionally(Throwable) closeExceptionally}, or null if
676      * not closed or if closed normally.
677      *
678      * @return the exception, or null if none
679      */
getClosedException()680     public Throwable getClosedException() {
681         return closedException;
682     }
683 
684     /**
685      * Returns true if this publisher has any subscribers.
686      *
687      * @return true if this publisher has any subscribers
688      */
hasSubscribers()689     public boolean hasSubscribers() {
690         boolean nonEmpty = false;
691         synchronized (this) {
692             for (BufferedSubscription<T> b = clients; b != null;) {
693                 BufferedSubscription<T> next = b.next;
694                 if (b.isClosed()) {
695                     b.next = null;
696                     b = clients = next;
697                 }
698                 else {
699                     nonEmpty = true;
700                     break;
701                 }
702             }
703         }
704         return nonEmpty;
705     }
706 
707     /**
708      * Returns the number of current subscribers.
709      *
710      * @return the number of current subscribers
711      */
getNumberOfSubscribers()712     public int getNumberOfSubscribers() {
713         synchronized (this) {
714             return cleanAndCount();
715         }
716     }
717 
718     /**
719      * Returns the Executor used for asynchronous delivery.
720      *
721      * @return the Executor used for asynchronous delivery
722      */
getExecutor()723     public Executor getExecutor() {
724         return executor;
725     }
726 
727     /**
728      * Returns the maximum per-subscriber buffer capacity.
729      *
730      * @return the maximum per-subscriber buffer capacity
731      */
getMaxBufferCapacity()732     public int getMaxBufferCapacity() {
733         return maxBufferCapacity;
734     }
735 
736     /**
737      * Returns a list of current subscribers for monitoring and
738      * tracking purposes, not for invoking {@link Flow.Subscriber}
739      * methods on the subscribers.
740      *
741      * @return list of current subscribers
742      */
getSubscribers()743     public List<Subscriber<? super T>> getSubscribers() {
744         ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
745         synchronized (this) {
746             BufferedSubscription<T> pred = null, next;
747             for (BufferedSubscription<T> b = clients; b != null; b = next) {
748                 next = b.next;
749                 if (b.isClosed()) {
750                     b.next = null;
751                     if (pred == null)
752                         clients = next;
753                     else
754                         pred.next = next;
755                 }
756                 else {
757                     subs.add(b.subscriber);
758                     pred = b;
759                 }
760             }
761         }
762         return subs;
763     }
764 
765     /**
766      * Returns true if the given Subscriber is currently subscribed.
767      *
768      * @param subscriber the subscriber
769      * @return true if currently subscribed
770      * @throws NullPointerException if subscriber is null
771      */
isSubscribed(Subscriber<? super T> subscriber)772     public boolean isSubscribed(Subscriber<? super T> subscriber) {
773         if (subscriber == null) throw new NullPointerException();
774         if (!closed) {
775             synchronized (this) {
776                 BufferedSubscription<T> pred = null, next;
777                 for (BufferedSubscription<T> b = clients; b != null; b = next) {
778                     next = b.next;
779                     if (b.isClosed()) {
780                         b.next = null;
781                         if (pred == null)
782                             clients = next;
783                         else
784                             pred.next = next;
785                     }
786                     else if (subscriber.equals(b.subscriber))
787                         return true;
788                     else
789                         pred = b;
790                 }
791             }
792         }
793         return false;
794     }
795 
796     /**
797      * Returns an estimate of the minimum number of items requested
798      * (via {@link Flow.Subscription#request(long) request}) but not
799      * yet produced, among all current subscribers.
800      *
801      * @return the estimate, or zero if no subscribers
802      */
estimateMinimumDemand()803     public long estimateMinimumDemand() {
804         long min = Long.MAX_VALUE;
805         boolean nonEmpty = false;
806         synchronized (this) {
807             BufferedSubscription<T> pred = null, next;
808             for (BufferedSubscription<T> b = clients; b != null; b = next) {
809                 int n; long d;
810                 next = b.next;
811                 if ((n = b.estimateLag()) < 0) {
812                     b.next = null;
813                     if (pred == null)
814                         clients = next;
815                     else
816                         pred.next = next;
817                 }
818                 else {
819                     if ((d = b.demand - n) < min)
820                         min = d;
821                     nonEmpty = true;
822                     pred = b;
823                 }
824             }
825         }
826         return nonEmpty ? min : 0;
827     }
828 
829     /**
830      * Returns an estimate of the maximum number of items produced but
831      * not yet consumed among all current subscribers.
832      *
833      * @return the estimate
834      */
estimateMaximumLag()835     public int estimateMaximumLag() {
836         int max = 0;
837         synchronized (this) {
838             BufferedSubscription<T> pred = null, next;
839             for (BufferedSubscription<T> b = clients; b != null; b = next) {
840                 int n;
841                 next = b.next;
842                 if ((n = b.estimateLag()) < 0) {
843                     b.next = null;
844                     if (pred == null)
845                         clients = next;
846                     else
847                         pred.next = next;
848                 }
849                 else {
850                     if (n > max)
851                         max = n;
852                     pred = b;
853                 }
854             }
855         }
856         return max;
857     }
858 
859     /**
860      * Processes all published items using the given Consumer function.
861      * Returns a CompletableFuture that is completed normally when this
862      * publisher signals {@link Flow.Subscriber#onComplete()
863      * onComplete}, or completed exceptionally upon any error, or an
864      * exception is thrown by the Consumer, or the returned
865      * CompletableFuture is cancelled, in which case no further items
866      * are processed.
867      *
868      * @param consumer the function applied to each onNext item
869      * @return a CompletableFuture that is completed normally
870      * when the publisher signals onComplete, and exceptionally
871      * upon any error or cancellation
872      * @throws NullPointerException if consumer is null
873      */
consume(Consumer<? super T> consumer)874     public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
875         if (consumer == null)
876             throw new NullPointerException();
877         CompletableFuture<Void> status = new CompletableFuture<>();
878         subscribe(new ConsumerSubscriber<T>(status, consumer));
879         return status;
880     }
881 
882     /** Subscriber for method consume */
883     static final class ConsumerSubscriber<T> implements Subscriber<T> {
884         final CompletableFuture<Void> status;
885         final Consumer<? super T> consumer;
886         Subscription subscription;
ConsumerSubscriber(CompletableFuture<Void> status, Consumer<? super T> consumer)887         ConsumerSubscriber(CompletableFuture<Void> status,
888                            Consumer<? super T> consumer) {
889             this.status = status; this.consumer = consumer;
890         }
onSubscribe(Subscription subscription)891         public final void onSubscribe(Subscription subscription) {
892             this.subscription = subscription;
893             status.whenComplete((v, e) -> subscription.cancel());
894             if (!status.isDone())
895                 subscription.request(Long.MAX_VALUE);
896         }
onError(Throwable ex)897         public final void onError(Throwable ex) {
898             status.completeExceptionally(ex);
899         }
onComplete()900         public final void onComplete() {
901             status.complete(null);
902         }
onNext(T item)903         public final void onNext(T item) {
904             try {
905                 consumer.accept(item);
906             } catch (Throwable ex) {
907                 subscription.cancel();
908                 status.completeExceptionally(ex);
909             }
910         }
911     }
912 
913     /**
914      * A task for consuming buffer items and signals, created and
915      * executed whenever they become available. A task consumes as
916      * many items/signals as possible before terminating, at which
917      * point another task is created when needed. The dual Runnable
918      * and ForkJoinTask declaration saves overhead when executed by
919      * ForkJoinPools, without impacting other kinds of Executors.
920      */
921     @SuppressWarnings("serial")
922     static final class ConsumerTask<T> extends ForkJoinTask<Void>
923         implements Runnable, CompletableFuture.AsynchronousCompletionTask {
924         final BufferedSubscription<T> consumer;
ConsumerTask(BufferedSubscription<T> consumer)925         ConsumerTask(BufferedSubscription<T> consumer) {
926             this.consumer = consumer;
927         }
getRawResult()928         public final Void getRawResult() { return null; }
setRawResult(Void v)929         public final void setRawResult(Void v) {}
exec()930         public final boolean exec() { consumer.consume(); return false; }
run()931         public final void run() { consumer.consume(); }
932     }
933 
934     /**
935      * A resizable array-based ring buffer with integrated control to
936      * start a consumer task whenever items are available.  The buffer
937      * algorithm is specialized for the case of at most one concurrent
938      * producer and consumer, and power of two buffer sizes. It relies
939      * primarily on atomic operations (CAS or getAndSet) at the next
940      * array slot to put or take an element, at the "tail" and "head"
941      * indices written only by the producer and consumer respectively.
942      *
943      * We ensure internally that there is at most one active consumer
944      * task at any given time. The publisher guarantees a single
945      * producer via its lock. Sync among producers and consumers
946      * relies on volatile fields "ctl", "demand", and "waiting" (along
947      * with element access). Other variables are accessed in plain
948      * mode, relying on outer ordering and exclusion, and/or enclosing
949      * them within other volatile accesses. Some atomic operations are
950      * avoided by tracking single threaded ownership by producers (in
951      * the style of biased locking).
952      *
953      * Execution control and protocol state are managed using field
954      * "ctl".  Methods to subscribe, close, request, and cancel set
955      * ctl bits (mostly using atomic boolean method getAndBitwiseOr),
956      * and ensure that a task is running. (The corresponding consumer
957      * side actions are in method consume.)  To avoid starting a new
958      * task on each action, ctl also includes a keep-alive bit
959      * (ACTIVE) that is refreshed if needed on producer actions.
960      * (Maintaining agreement about keep-alives requires most atomic
961      * updates to be full SC/Volatile strength, which is still much
962      * cheaper than using one task per item.)  Error signals
963      * additionally null out items and/or fields to reduce termination
964      * latency.  The cancel() method is supported by treating as ERROR
965      * but suppressing onError signal.
966      *
967      * Support for blocking also exploits the fact that there is only
968      * one possible waiter. ManagedBlocker-compatible control fields
969      * are placed in this class itself rather than in wait-nodes.
970      * Blocking control relies on the "waiting" and "waiter"
971      * fields. Producers set them before trying to block. Signalling
972      * unparks and clears fields. If the producer and/or consumer are
973      * using a ForkJoinPool, the producer attempts to help run
974      * consumer tasks via ForkJoinPool.helpAsyncBlocker before
975      * blocking.
976      *
977      * Usages of this class may encounter any of several forms of
978      * memory contention. We try to ameliorate across them without
979      * unduly impacting footprints in low-contention usages where it
980      * isn't needed. Buffer arrays start out small and grow only as
981      * needed.  The class uses @Contended and heuristic field
982      * declaration ordering to reduce false-sharing memory contention
983      * across instances of BufferedSubscription (as in, multiple
984      * subscribers per publisher).  We additionally segregate some
985      * fields that would otherwise nearly always encounter cache line
986      * contention among producers and consumers. To reduce contention
987      * across time (vs space), consumers only periodically update
988      * other fields (see method takeItems), at the expense of possibly
989      * staler reporting of lags and demand (bounded at 12.5% == 1/8
990      * capacity) and possibly more atomic operations.
991      *
992      * Other forms of imbalance and slowdowns can occur during startup
993      * when producer and consumer methods are compiled and/or memory
994      * is allocated at different rates.  This is ameliorated by
995      * artificially subdividing some consumer methods, including
996      * isolation of all subscriber callbacks.  This code also includes
997      * typical power-of-two array screening idioms to avoid compilers
998      * generating traps, along with the usual SSA-based inline
999      * assignment coding style. Also, all methods and fields have
1000      * default visibility to simplify usage by callers.
1001      */
1002     @SuppressWarnings("serial")
1003     @jdk.internal.vm.annotation.Contended
1004     static final class BufferedSubscription<T>
1005         implements Subscription, ForkJoinPool.ManagedBlocker {
1006         long timeout;                      // Long.MAX_VALUE if untimed wait
1007         int head;                          // next position to take
1008         int tail;                          // next position to put
1009         final int maxCapacity;             // max buffer size
1010         volatile int ctl;                  // atomic run state flags
1011         Object[] array;                    // buffer
1012         final Subscriber<? super T> subscriber;
1013         final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
1014         Executor executor;                 // null on error
1015         Thread waiter;                     // blocked producer thread
1016         Throwable pendingError;            // holds until onError issued
1017         BufferedSubscription<T> next;      // used only by publisher
1018         BufferedSubscription<T> nextRetry; // used only by publisher
1019 
1020         @jdk.internal.vm.annotation.Contended("c") // segregate
1021         volatile long demand;              // # unfilled requests
1022         @jdk.internal.vm.annotation.Contended("c")
1023         volatile int waiting;              // nonzero if producer blocked
1024 
1025         // ctl bit values
1026         static final int CLOSED   = 0x01;  // if set, other bits ignored
1027         static final int ACTIVE   = 0x02;  // keep-alive for consumer task
1028         static final int REQS     = 0x04;  // (possibly) nonzero demand
1029         static final int ERROR    = 0x08;  // issues onError when noticed
1030         static final int COMPLETE = 0x10;  // issues onComplete when done
1031         static final int RUN      = 0x20;  // task is or will be running
1032         static final int OPEN     = 0x40;  // true after subscribe
1033 
1034         static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1035 
BufferedSubscription(Subscriber<? super T> subscriber, Executor executor, BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler, Object[] array, int maxBufferCapacity)1036         BufferedSubscription(Subscriber<? super T> subscriber,
1037                              Executor executor,
1038                              BiConsumer<? super Subscriber<? super T>,
1039                              ? super Throwable> onNextHandler,
1040                              Object[] array,
1041                              int maxBufferCapacity) {
1042             this.subscriber = subscriber;
1043             this.executor = executor;
1044             this.onNextHandler = onNextHandler;
1045             this.array = array;
1046             this.maxCapacity = maxBufferCapacity;
1047         }
1048 
1049         // Wrappers for some VarHandle methods
1050 
weakCasCtl(int cmp, int val)1051         final boolean weakCasCtl(int cmp, int val) {
1052             return CTL.weakCompareAndSet(this, cmp, val);
1053         }
1054 
getAndBitwiseOrCtl(int bits)1055         final int getAndBitwiseOrCtl(int bits) {
1056             return (int)CTL.getAndBitwiseOr(this, bits);
1057         }
1058 
subtractDemand(int k)1059         final long subtractDemand(int k) {
1060             long n = (long)(-k);
1061             return n + (long)DEMAND.getAndAdd(this, n);
1062         }
1063 
casDemand(long cmp, long val)1064         final boolean casDemand(long cmp, long val) {
1065             return DEMAND.compareAndSet(this, cmp, val);
1066         }
1067 
1068         // Utilities used by SubmissionPublisher
1069 
1070         /**
1071          * Returns true if closed (consumer task may still be running).
1072          */
isClosed()1073         final boolean isClosed() {
1074             return (ctl & CLOSED) != 0;
1075         }
1076 
1077         /**
1078          * Returns estimated number of buffered items, or negative if
1079          * closed.
1080          */
estimateLag()1081         final int estimateLag() {
1082             int c = ctl, n = tail - head;
1083             return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n;
1084         }
1085 
1086         // Methods for submitting items
1087 
1088         /**
1089          * Tries to add item and start consumer task if necessary.
1090          * @return negative if closed, 0 if saturated, else estimated lag
1091          */
offer(T item, boolean unowned)1092         final int offer(T item, boolean unowned) {
1093             Object[] a;
1094             int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
1095             int t = tail, i = t & (cap - 1), n = t + 1 - head;
1096             if (cap > 0) {
1097                 boolean added;
1098                 if (n >= cap && cap < maxCapacity) // resize
1099                     added = growAndOffer(item, a, t);
1100                 else if (n >= cap || unowned)      // need volatile CAS
1101                     added = QA.compareAndSet(a, i, null, item);
1102                 else {                             // can use release mode
1103                     QA.setRelease(a, i, item);
1104                     added = true;
1105                 }
1106                 if (added) {
1107                     tail = t + 1;
1108                     stat = n;
1109                 }
1110             }
1111             return startOnOffer(stat);
1112         }
1113 
1114         /**
1115          * Tries to expand buffer and add item, returning true on
1116          * success. Currently fails only if out of memory.
1117          */
growAndOffer(T item, Object[] a, int t)1118         final boolean growAndOffer(T item, Object[] a, int t) {
1119             int cap = 0, newCap = 0;
1120             Object[] newArray = null;
1121             if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) {
1122                 try {
1123                     newArray = new Object[newCap];
1124                 } catch (OutOfMemoryError ex) {
1125                 }
1126             }
1127             if (newArray == null)
1128                 return false;
1129             else {                                // take and move items
1130                 int newMask = newCap - 1;
1131                 newArray[t-- & newMask] = item;
1132                 for (int mask = cap - 1, k = mask; k >= 0; --k) {
1133                     Object x = QA.getAndSet(a, t & mask, null);
1134                     if (x == null)
1135                         break;                    // already consumed
1136                     else
1137                         newArray[t-- & newMask] = x;
1138                 }
1139                 array = newArray;
1140                 VarHandle.releaseFence();         // release array and slots
1141                 return true;
1142             }
1143         }
1144 
1145         /**
1146          * Version of offer for retries (no resize or bias)
1147          */
retryOffer(T item)1148         final int retryOffer(T item) {
1149             Object[] a;
1150             int stat = 0, t = tail, h = head, cap;
1151             if ((a = array) != null && (cap = a.length) > 0 &&
1152                 QA.compareAndSet(a, (cap - 1) & t, null, item))
1153                 stat = (tail = t + 1) - h;
1154             return startOnOffer(stat);
1155         }
1156 
1157         /**
1158          * Tries to start consumer task after offer.
1159          * @return negative if now closed, else argument
1160          */
startOnOffer(int stat)1161         final int startOnOffer(int stat) {
1162             int c; // start or keep alive if requests exist and not active
1163             if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
1164                 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
1165                 tryStart();
1166             else if ((c & CLOSED) != 0)
1167                 stat = -1;
1168             return stat;
1169         }
1170 
1171         /**
1172          * Tries to start consumer task. Sets error state on failure.
1173          */
tryStart()1174         final void tryStart() {
1175             try {
1176                 Executor e;
1177                 ConsumerTask<T> task = new ConsumerTask<T>(this);
1178                 if ((e = executor) != null)   // skip if disabled on error
1179                     e.execute(task);
1180             } catch (RuntimeException | Error ex) {
1181                 getAndBitwiseOrCtl(ERROR | CLOSED);
1182                 throw ex;
1183             }
1184         }
1185 
1186         // Signals to consumer tasks
1187 
1188         /**
1189          * Sets the given control bits, starting task if not running or closed.
1190          * @param bits state bits, assumed to include RUN but not CLOSED
1191          */
startOnSignal(int bits)1192         final void startOnSignal(int bits) {
1193             if ((ctl & bits) != bits &&
1194                 (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0)
1195                 tryStart();
1196         }
1197 
onSubscribe()1198         final void onSubscribe() {
1199             startOnSignal(RUN | ACTIVE);
1200         }
1201 
onComplete()1202         final void onComplete() {
1203             startOnSignal(RUN | ACTIVE | COMPLETE);
1204         }
1205 
onError(Throwable ex)1206         final void onError(Throwable ex) {
1207             int c; Object[] a;      // to null out buffer on async error
1208             if (ex != null)
1209                 pendingError = ex;  // races are OK
1210             if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) {
1211                 if ((c & RUN) == 0)
1212                     tryStart();
1213                 else if ((a = array) != null)
1214                     Arrays.fill(a, null);
1215             }
1216         }
1217 
cancel()1218         public final void cancel() {
1219             onError(null);
1220         }
1221 
request(long n)1222         public final void request(long n) {
1223             if (n > 0L) {
1224                 for (;;) {
1225                     long p = demand, d = p + n;  // saturate
1226                     if (casDemand(p, d < p ? Long.MAX_VALUE : d))
1227                         break;
1228                 }
1229                 startOnSignal(RUN | ACTIVE | REQS);
1230             }
1231             else
1232                 onError(new IllegalArgumentException(
1233                             "non-positive subscription request"));
1234         }
1235 
1236         // Consumer task actions
1237 
1238         /**
1239          * Consumer loop, called from ConsumerTask, or indirectly when
1240          * helping during submit.
1241          */
consume()1242         final void consume() {
1243             Subscriber<? super T> s;
1244             if ((s = subscriber) != null) {          // hoist checks
1245                 subscribeOnOpen(s);
1246                 long d = demand;
1247                 for (int h = head, t = tail;;) {
1248                     int c, taken; boolean empty;
1249                     if (((c = ctl) & ERROR) != 0) {
1250                         closeOnError(s, null);
1251                         break;
1252                     }
1253                     else if ((taken = takeItems(s, d, h)) > 0) {
1254                         head = h += taken;
1255                         d = subtractDemand(taken);
1256                     }
1257                     else if ((d = demand) == 0L && (c & REQS) != 0)
1258                         weakCasCtl(c, c & ~REQS);    // exhausted demand
1259                     else if (d != 0L && (c & REQS) == 0)
1260                         weakCasCtl(c, c | REQS);     // new demand
1261                     else if (t == (t = tail)) {      // stability check
1262                         if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1263                             closeOnComplete(s);      // end of stream
1264                             break;
1265                         }
1266                         else if (empty || d == 0L) {
1267                             int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1268                             if (weakCasCtl(c, c & ~bit) && bit == RUN)
1269                                 break;               // un-keep-alive or exit
1270                         }
1271                     }
1272                 }
1273             }
1274         }
1275 
1276         /**
1277          * Consumes some items until unavailable or bound or error.
1278          *
1279          * @param s subscriber
1280          * @param d current demand
1281          * @param h current head
1282          * @return number taken
1283          */
takeItems(Subscriber<? super T> s, long d, int h)1284         final int takeItems(Subscriber<? super T> s, long d, int h) {
1285             Object[] a;
1286             int k = 0, cap;
1287             if ((a = array) != null && (cap = a.length) > 0) {
1288                 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1289                 int n = (d < (long)b) ? (int)d : b;
1290                 for (; k < n; ++h, ++k) {
1291                     Object x = QA.getAndSet(a, h & m, null);
1292                     if (waiting != 0)
1293                         signalWaiter();
1294                     if (x == null)
1295                         break;
1296                     else if (!consumeNext(s, x))
1297                         break;
1298                 }
1299             }
1300             return k;
1301         }
1302 
consumeNext(Subscriber<? super T> s, Object x)1303         final boolean consumeNext(Subscriber<? super T> s, Object x) {
1304             try {
1305                 @SuppressWarnings("unchecked") T y = (T) x;
1306                 if (s != null)
1307                     s.onNext(y);
1308                 return true;
1309             } catch (Throwable ex) {
1310                 handleOnNext(s, ex);
1311                 return false;
1312             }
1313         }
1314 
1315         /**
1316          * Processes exception in Subscriber.onNext.
1317          */
handleOnNext(Subscriber<? super T> s, Throwable ex)1318         final void handleOnNext(Subscriber<? super T> s, Throwable ex) {
1319             BiConsumer<? super Subscriber<? super T>, ? super Throwable> h;
1320             try {
1321                 if ((h = onNextHandler) != null)
1322                     h.accept(s, ex);
1323             } catch (Throwable ignore) {
1324             }
1325             closeOnError(s, ex);
1326         }
1327 
1328         /**
1329          * Issues subscriber.onSubscribe if this is first signal.
1330          */
subscribeOnOpen(Subscriber<? super T> s)1331         final void subscribeOnOpen(Subscriber<? super T> s) {
1332             if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0)
1333                 consumeSubscribe(s);
1334         }
1335 
consumeSubscribe(Subscriber<? super T> s)1336         final void consumeSubscribe(Subscriber<? super T> s) {
1337             try {
1338                 if (s != null) // ignore if disabled
1339                     s.onSubscribe(this);
1340             } catch (Throwable ex) {
1341                 closeOnError(s, ex);
1342             }
1343         }
1344 
1345         /**
1346          * Issues subscriber.onComplete unless already closed.
1347          */
closeOnComplete(Subscriber<? super T> s)1348         final void closeOnComplete(Subscriber<? super T> s) {
1349             if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0)
1350                 consumeComplete(s);
1351         }
1352 
consumeComplete(Subscriber<? super T> s)1353         final void consumeComplete(Subscriber<? super T> s) {
1354             try {
1355                 if (s != null)
1356                     s.onComplete();
1357             } catch (Throwable ignore) {
1358             }
1359         }
1360 
1361         /**
1362          * Issues subscriber.onError, and unblocks producer if needed.
1363          */
closeOnError(Subscriber<? super T> s, Throwable ex)1364         final void closeOnError(Subscriber<? super T> s, Throwable ex) {
1365             if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) {
1366                 if (ex == null)
1367                     ex = pendingError;
1368                 pendingError = null;  // detach
1369                 executor = null;      // suppress racing start calls
1370                 signalWaiter();
1371                 consumeError(s, ex);
1372             }
1373         }
1374 
consumeError(Subscriber<? super T> s, Throwable ex)1375         final void consumeError(Subscriber<? super T> s, Throwable ex) {
1376             try {
1377                 if (ex != null && s != null)
1378                     s.onError(ex);
1379             } catch (Throwable ignore) {
1380             }
1381         }
1382 
1383         // Blocking support
1384 
1385         /**
1386          * Unblocks waiting producer.
1387          */
signalWaiter()1388         final void signalWaiter() {
1389             Thread w;
1390             waiting = 0;
1391             if ((w = waiter) != null)
1392                 LockSupport.unpark(w);
1393         }
1394 
1395         /**
1396          * Returns true if closed or space available.
1397          * For ManagedBlocker.
1398          */
isReleasable()1399         public final boolean isReleasable() {
1400             Object[] a; int cap;
1401             return ((ctl & CLOSED) != 0 ||
1402                     ((a = array) != null && (cap = a.length) > 0 &&
1403                      QA.getAcquire(a, (cap - 1) & tail) == null));
1404         }
1405 
1406         /**
1407          * Helps or blocks until timeout, closed, or space available.
1408          */
awaitSpace(long nanos)1409         final void awaitSpace(long nanos) {
1410             if (!isReleasable()) {
1411                 ForkJoinPool.helpAsyncBlocker(executor, this);
1412                 if (!isReleasable()) {
1413                     timeout = nanos;
1414                     try {
1415                         ForkJoinPool.managedBlock(this);
1416                     } catch (InterruptedException ie) {
1417                         timeout = INTERRUPTED;
1418                     }
1419                     if (timeout == INTERRUPTED)
1420                         Thread.currentThread().interrupt();
1421                 }
1422             }
1423         }
1424 
1425         /**
1426          * Blocks until closed, space available or timeout.
1427          * For ManagedBlocker.
1428          */
block()1429         public final boolean block() {
1430             long nanos = timeout;
1431             boolean timed = (nanos < Long.MAX_VALUE);
1432             long deadline = timed ? System.nanoTime() + nanos : 0L;
1433             while (!isReleasable()) {
1434                 if (Thread.interrupted()) {
1435                     timeout = INTERRUPTED;
1436                     if (timed)
1437                         break;
1438                 }
1439                 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L)
1440                     break;
1441                 else if (waiter == null)
1442                     waiter = Thread.currentThread();
1443                 else if (waiting == 0)
1444                     waiting = 1;
1445                 else if (timed)
1446                     LockSupport.parkNanos(this, nanos);
1447                 else
1448                     LockSupport.park(this);
1449             }
1450             waiter = null;
1451             waiting = 0;
1452             return true;
1453         }
1454 
1455         // VarHandle mechanics
1456         static final VarHandle CTL;
1457         static final VarHandle DEMAND;
1458         static final VarHandle QA;
1459 
1460         static {
1461             try {
1462                 MethodHandles.Lookup l = MethodHandles.lookup();
1463                 CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1464                                       int.class);
1465                 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1466                                          long.class);
1467                 QA = MethodHandles.arrayElementVarHandle(Object[].class);
1468             } catch (ReflectiveOperationException e) {
1469                 throw new ExceptionInInitializerError(e);
1470             }
1471 
1472             // Reduce the risk of rare disastrous classloading in first call to
1473             // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1474             Class<?> ensureLoaded = LockSupport.class;
1475         }
1476     }
1477 }
1478