• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import io.grpc.Attributes;
24 import io.grpc.Compressor;
25 import io.grpc.Deadline;
26 import io.grpc.DecompressorRegistry;
27 import io.grpc.Metadata;
28 import io.grpc.Status;
29 import java.io.InputStream;
30 import java.util.ArrayList;
31 import java.util.List;
32 import javax.annotation.concurrent.GuardedBy;
33 
34 /**
35  * A stream that queues requests before the transport is available, and delegates to a real stream
36  * implementation when the transport is available.
37  *
38  * <p>{@code ClientStream} itself doesn't require thread-safety. However, the state of {@code
39  * DelayedStream} may be internally altered by different threads, thus internal synchronization is
40  * necessary.
41  */
42 class DelayedStream implements ClientStream {
43   /** {@code true} once realStream is valid and all pending calls have been drained. */
44   private volatile boolean passThrough;
45   /**
46    * Non-{@code null} iff start has been called. Used to assert methods are called in appropriate
47    * order, but also used if an error occurrs before {@code realStream} is set.
48    */
49   private ClientStreamListener listener;
50   /** Must hold {@code this} lock when setting. */
51   private ClientStream realStream;
52   @GuardedBy("this")
53   private Status error;
54   @GuardedBy("this")
55   private List<Runnable> pendingCalls = new ArrayList<>();
56   @GuardedBy("this")
57   private DelayedStreamListener delayedListener;
58 
59   @Override
setMaxInboundMessageSize(final int maxSize)60   public void setMaxInboundMessageSize(final int maxSize) {
61     if (passThrough) {
62       realStream.setMaxInboundMessageSize(maxSize);
63     } else {
64       delayOrExecute(new Runnable() {
65         @Override
66         public void run() {
67           realStream.setMaxInboundMessageSize(maxSize);
68         }
69       });
70     }
71   }
72 
73   @Override
setMaxOutboundMessageSize(final int maxSize)74   public void setMaxOutboundMessageSize(final int maxSize) {
75     if (passThrough) {
76       realStream.setMaxOutboundMessageSize(maxSize);
77     } else {
78       delayOrExecute(new Runnable() {
79         @Override
80         public void run() {
81           realStream.setMaxOutboundMessageSize(maxSize);
82         }
83       });
84     }
85   }
86 
87   @Override
setDeadline(final Deadline deadline)88   public void setDeadline(final Deadline deadline) {
89     delayOrExecute(new Runnable() {
90       @Override
91       public void run() {
92         realStream.setDeadline(deadline);
93       }
94     });
95   }
96 
97   /**
98    * Transfers all pending and future requests and mutations to the given stream.
99    *
100    * <p>No-op if either this method or {@link #cancel} have already been called.
101    */
102   // When this method returns, passThrough is guaranteed to be true
setStream(ClientStream stream)103   final void setStream(ClientStream stream) {
104     synchronized (this) {
105       // If realStream != null, then either setStream() or cancel() has been called.
106       if (realStream != null) {
107         return;
108       }
109       realStream = checkNotNull(stream, "stream");
110     }
111 
112     drainPendingCalls();
113   }
114 
115   /**
116    * Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
117    * multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
118    * should not be held when calling this method.
119    */
drainPendingCalls()120   private void drainPendingCalls() {
121     assert realStream != null;
122     assert !passThrough;
123     List<Runnable> toRun = new ArrayList<>();
124     DelayedStreamListener delayedListener = null;
125     while (true) {
126       synchronized (this) {
127         if (pendingCalls.isEmpty()) {
128           pendingCalls = null;
129           passThrough = true;
130           delayedListener = this.delayedListener;
131           break;
132         }
133         // Since there were pendingCalls, we need to process them. To maintain ordering we can't set
134         // passThrough=true until we run all pendingCalls, but new Runnables may be added after we
135         // drop the lock. So we will have to re-check pendingCalls.
136         List<Runnable> tmp = toRun;
137         toRun = pendingCalls;
138         pendingCalls = tmp;
139       }
140       for (Runnable runnable : toRun) {
141         // Must not call transport while lock is held to prevent deadlocks.
142         // TODO(ejona): exception handling
143         runnable.run();
144       }
145       toRun.clear();
146     }
147     if (delayedListener != null) {
148       delayedListener.drainPendingCallbacks();
149     }
150   }
151 
152   /**
153    * Enqueue the runnable or execute it now. Call sites that may be called many times may want avoid
154    * this method if {@code passThrough == true}.
155    *
156    * <p>Note that this method is no more thread-safe than {@code runnable}. It is thread-safe if and
157    * only if {@code runnable} is thread-safe.
158    */
delayOrExecute(Runnable runnable)159   private void delayOrExecute(Runnable runnable) {
160     synchronized (this) {
161       if (!passThrough) {
162         pendingCalls.add(runnable);
163         return;
164       }
165     }
166     runnable.run();
167   }
168 
169   @Override
setAuthority(final String authority)170   public void setAuthority(final String authority) {
171     checkState(listener == null, "May only be called before start");
172     checkNotNull(authority, "authority");
173     delayOrExecute(new Runnable() {
174       @Override
175       public void run() {
176         realStream.setAuthority(authority);
177       }
178     });
179   }
180 
181   @Override
start(ClientStreamListener listener)182   public void start(ClientStreamListener listener) {
183     checkState(this.listener == null, "already started");
184 
185     Status savedError;
186     boolean savedPassThrough;
187     synchronized (this) {
188       this.listener = checkNotNull(listener, "listener");
189       // If error != null, then cancel() has been called and was unable to close the listener
190       savedError = error;
191       savedPassThrough = passThrough;
192       if (!savedPassThrough) {
193         listener = delayedListener = new DelayedStreamListener(listener);
194       }
195     }
196     if (savedError != null) {
197       listener.closed(savedError, new Metadata());
198       return;
199     }
200 
201     if (savedPassThrough) {
202       realStream.start(listener);
203     } else {
204       final ClientStreamListener finalListener = listener;
205       delayOrExecute(new Runnable() {
206         @Override
207         public void run() {
208           realStream.start(finalListener);
209         }
210       });
211     }
212   }
213 
214   @Override
getAttributes()215   public Attributes getAttributes() {
216     checkState(passThrough, "Called getAttributes before attributes are ready");
217     return realStream.getAttributes();
218   }
219 
220   @Override
writeMessage(final InputStream message)221   public void writeMessage(final InputStream message) {
222     checkNotNull(message, "message");
223     if (passThrough) {
224       realStream.writeMessage(message);
225     } else {
226       delayOrExecute(new Runnable() {
227         @Override
228         public void run() {
229           realStream.writeMessage(message);
230         }
231       });
232     }
233   }
234 
235   @Override
flush()236   public void flush() {
237     if (passThrough) {
238       realStream.flush();
239     } else {
240       delayOrExecute(new Runnable() {
241         @Override
242         public void run() {
243           realStream.flush();
244         }
245       });
246     }
247   }
248 
249   // When this method returns, passThrough is guaranteed to be true
250   @Override
cancel(final Status reason)251   public void cancel(final Status reason) {
252     checkNotNull(reason, "reason");
253     boolean delegateToRealStream = true;
254     ClientStreamListener listenerToClose = null;
255     synchronized (this) {
256       // If realStream != null, then either setStream() or cancel() has been called
257       if (realStream == null) {
258         realStream = NoopClientStream.INSTANCE;
259         delegateToRealStream = false;
260 
261         // If listener == null, then start() will later call listener with 'error'
262         listenerToClose = listener;
263         error = reason;
264       }
265     }
266     if (delegateToRealStream) {
267       delayOrExecute(new Runnable() {
268         @Override
269         public void run() {
270           realStream.cancel(reason);
271         }
272       });
273     } else {
274       if (listenerToClose != null) {
275         listenerToClose.closed(reason, new Metadata());
276       }
277       drainPendingCalls();
278     }
279   }
280 
281   @Override
halfClose()282   public void halfClose() {
283     delayOrExecute(new Runnable() {
284       @Override
285       public void run() {
286         realStream.halfClose();
287       }
288     });
289   }
290 
291   @Override
request(final int numMessages)292   public void request(final int numMessages) {
293     if (passThrough) {
294       realStream.request(numMessages);
295     } else {
296       delayOrExecute(new Runnable() {
297         @Override
298         public void run() {
299           realStream.request(numMessages);
300         }
301       });
302     }
303   }
304 
305   @Override
setCompressor(final Compressor compressor)306   public void setCompressor(final Compressor compressor) {
307     checkNotNull(compressor, "compressor");
308     delayOrExecute(new Runnable() {
309       @Override
310       public void run() {
311         realStream.setCompressor(compressor);
312       }
313     });
314   }
315 
316   @Override
setFullStreamDecompression(final boolean fullStreamDecompression)317   public void setFullStreamDecompression(final boolean fullStreamDecompression) {
318     delayOrExecute(
319         new Runnable() {
320           @Override
321           public void run() {
322             realStream.setFullStreamDecompression(fullStreamDecompression);
323           }
324         });
325   }
326 
327   @Override
setDecompressorRegistry(final DecompressorRegistry decompressorRegistry)328   public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
329     checkNotNull(decompressorRegistry, "decompressorRegistry");
330     delayOrExecute(new Runnable() {
331       @Override
332       public void run() {
333         realStream.setDecompressorRegistry(decompressorRegistry);
334       }
335     });
336   }
337 
338   @Override
isReady()339   public boolean isReady() {
340     if (passThrough) {
341       return realStream.isReady();
342     } else {
343       return false;
344     }
345   }
346 
347   @Override
setMessageCompression(final boolean enable)348   public void setMessageCompression(final boolean enable) {
349     if (passThrough) {
350       realStream.setMessageCompression(enable);
351     } else {
352       delayOrExecute(new Runnable() {
353         @Override
354         public void run() {
355           realStream.setMessageCompression(enable);
356         }
357       });
358     }
359   }
360 
361   @VisibleForTesting
getRealStream()362   ClientStream getRealStream() {
363     return realStream;
364   }
365 
366   private static class DelayedStreamListener implements ClientStreamListener {
367     private final ClientStreamListener realListener;
368     private volatile boolean passThrough;
369     @GuardedBy("this")
370     private List<Runnable> pendingCallbacks = new ArrayList<>();
371 
DelayedStreamListener(ClientStreamListener listener)372     public DelayedStreamListener(ClientStreamListener listener) {
373       this.realListener = listener;
374     }
375 
delayOrExecute(Runnable runnable)376     private void delayOrExecute(Runnable runnable) {
377       synchronized (this) {
378         if (!passThrough) {
379           pendingCallbacks.add(runnable);
380           return;
381         }
382       }
383       runnable.run();
384     }
385 
386     @Override
messagesAvailable(final MessageProducer producer)387     public void messagesAvailable(final MessageProducer producer) {
388       if (passThrough) {
389         realListener.messagesAvailable(producer);
390       } else {
391         delayOrExecute(new Runnable() {
392           @Override
393           public void run() {
394             realListener.messagesAvailable(producer);
395           }
396         });
397       }
398     }
399 
400     @Override
onReady()401     public void onReady() {
402       if (passThrough) {
403         realListener.onReady();
404       } else {
405         delayOrExecute(new Runnable() {
406           @Override
407           public void run() {
408             realListener.onReady();
409           }
410         });
411       }
412     }
413 
414     @Override
headersRead(final Metadata headers)415     public void headersRead(final Metadata headers) {
416       delayOrExecute(new Runnable() {
417         @Override
418         public void run() {
419           realListener.headersRead(headers);
420         }
421       });
422     }
423 
424     @Override
closed(final Status status, final Metadata trailers)425     public void closed(final Status status, final Metadata trailers) {
426       delayOrExecute(new Runnable() {
427         @Override
428         public void run() {
429           realListener.closed(status, trailers);
430         }
431       });
432     }
433 
434     @Override
closed( final Status status, final RpcProgress rpcProgress, final Metadata trailers)435     public void closed(
436         final Status status, final RpcProgress rpcProgress,
437         final Metadata trailers) {
438       delayOrExecute(new Runnable() {
439         @Override
440         public void run() {
441           realListener.closed(status, rpcProgress, trailers);
442         }
443       });
444     }
445 
drainPendingCallbacks()446     public void drainPendingCallbacks() {
447       assert !passThrough;
448       List<Runnable> toRun = new ArrayList<>();
449       while (true) {
450         synchronized (this) {
451           if (pendingCallbacks.isEmpty()) {
452             pendingCallbacks = null;
453             passThrough = true;
454             break;
455           }
456           // Since there were pendingCallbacks, we need to process them. To maintain ordering we
457           // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
458           // added after we drop the lock. So we will have to re-check pendingCallbacks.
459           List<Runnable> tmp = toRun;
460           toRun = pendingCallbacks;
461           pendingCallbacks = tmp;
462         }
463         for (Runnable runnable : toRun) {
464           // Avoid calling listener while lock is held to prevent deadlocks.
465           // TODO(ejona): exception handling
466           runnable.run();
467         }
468         toRun.clear();
469       }
470     }
471   }
472 }
473