• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal.testing;
18 
19 import io.grpc.ClientStreamTracer;
20 import io.grpc.Metadata;
21 import io.grpc.Status;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.Nullable;
26 
27 /**
28  * A {@link ClientStreamTracer} suitable for testing.
29  */
30 public class TestClientStreamTracer extends ClientStreamTracer implements TestStreamTracer {
31   private final TestBaseStreamTracer delegate = new TestBaseStreamTracer();
32   protected final CountDownLatch outboundHeadersLatch = new CountDownLatch(1);
33   protected final AtomicReference<Throwable> outboundHeadersCalled =
34       new AtomicReference<>();
35   protected final AtomicReference<Throwable> inboundHeadersCalled =
36       new AtomicReference<>();
37   protected final AtomicReference<Metadata> inboundTrailers = new AtomicReference<>();
38 
39   @Override
await()40   public void await() throws InterruptedException {
41     delegate.await();
42   }
43 
44   @Override
await(long timeout, TimeUnit timeUnit)45   public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
46     return delegate.await(timeout, timeUnit);
47   }
48 
49   /**
50    * Returns if {@link ClientStreamTracer#inboundHeaders} has been called.
51    */
getInboundHeaders()52   public boolean getInboundHeaders() {
53     return inboundHeadersCalled.get() != null;
54   }
55 
56   /**
57    * Returns the inbound trailers if {@link ClientStreamTracer#inboundTrailers} has been called, or
58    * {@code null}.
59    */
60   @Nullable
getInboundTrailers()61   public Metadata getInboundTrailers() {
62     return inboundTrailers.get();
63   }
64 
65   /**
66    * Returns if {@link ClientStreamTracer#outboundHeaders} has been called.
67    */
getOutboundHeaders()68   public boolean getOutboundHeaders() {
69     return outboundHeadersCalled.get() != null;
70   }
71 
72   /**
73    * Allow tests to await the outbound header event, which depending on the test case may be
74    * necessary (e.g., if we test for a Netty client's outbound headers upon receiving the start of
75    * stream on the server side, the tracer won't know that headers were sent until a channel future
76    * executes).
77    */
awaitOutboundHeaders(int timeout, TimeUnit unit)78   public boolean awaitOutboundHeaders(int timeout, TimeUnit unit) throws Exception {
79     return outboundHeadersLatch.await(timeout, unit);
80   }
81 
82   @Override
getStatus()83   public Status getStatus() {
84     return delegate.getStatus();
85   }
86 
87   @Override
getInboundWireSize()88   public long getInboundWireSize() {
89     return delegate.getInboundWireSize();
90   }
91 
92   @Override
getInboundUncompressedSize()93   public long getInboundUncompressedSize() {
94     return delegate.getInboundUncompressedSize();
95   }
96 
97   @Override
getOutboundWireSize()98   public long getOutboundWireSize() {
99     return delegate.getOutboundWireSize();
100   }
101 
102   @Override
getOutboundUncompressedSize()103   public long getOutboundUncompressedSize() {
104     return delegate.getOutboundUncompressedSize();
105   }
106 
107   @Override
setFailDuplicateCallbacks(boolean fail)108   public void setFailDuplicateCallbacks(boolean fail) {
109     delegate.setFailDuplicateCallbacks(fail);
110   }
111 
112   @Override
nextOutboundEvent()113   public String nextOutboundEvent() {
114     return delegate.nextOutboundEvent();
115   }
116 
117   @Override
nextInboundEvent()118   public String nextInboundEvent() {
119     return delegate.nextInboundEvent();
120   }
121 
122   @Override
outboundWireSize(long bytes)123   public void outboundWireSize(long bytes) {
124     delegate.outboundWireSize(bytes);
125   }
126 
127   @Override
inboundWireSize(long bytes)128   public void inboundWireSize(long bytes) {
129     delegate.inboundWireSize(bytes);
130   }
131 
132   @Override
outboundUncompressedSize(long bytes)133   public void outboundUncompressedSize(long bytes) {
134     delegate.outboundUncompressedSize(bytes);
135   }
136 
137   @Override
inboundUncompressedSize(long bytes)138   public void inboundUncompressedSize(long bytes) {
139     delegate.inboundUncompressedSize(bytes);
140   }
141 
142   @Override
streamClosed(Status status)143   public void streamClosed(Status status) {
144     delegate.streamClosed(status);
145   }
146 
147   @Override
inboundMessage(int seqNo)148   public void inboundMessage(int seqNo) {
149     delegate.inboundMessage(seqNo);
150   }
151 
152   @Override
outboundMessage(int seqNo)153   public void outboundMessage(int seqNo) {
154     delegate.outboundMessage(seqNo);
155   }
156 
157   @Override
outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize)158   public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
159     delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);
160   }
161 
162   @Override
inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize)163   public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
164     delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);
165   }
166 
167   @Override
outboundHeaders()168   public void outboundHeaders() {
169     if (!outboundHeadersCalled.compareAndSet(null, new Exception("first stack"))
170         && delegate.failDuplicateCallbacks.get()) {
171       throw new AssertionError(
172           "outboundHeaders called more than once",
173           new Exception("second stack", outboundHeadersCalled.get()));
174     }
175     outboundHeadersLatch.countDown();
176   }
177 
178   @Override
inboundHeaders()179   public void inboundHeaders() {
180     if (!inboundHeadersCalled.compareAndSet(null, new Exception("first stack"))
181         && delegate.failDuplicateCallbacks.get()) {
182       throw new AssertionError(
183           "inboundHeaders called more than once",
184           new Exception("second stack", inboundHeadersCalled.get()));
185     }
186   }
187 
188   @Override
inboundTrailers(Metadata trailers)189   public void inboundTrailers(Metadata trailers) {
190     if (delegate.getStatus() != null) {
191       throw new AssertionError(
192           "stream has already been closed with " + delegate.getStatus(),
193           delegate.streamClosedStack.get());
194     }
195     if (!inboundTrailers.compareAndSet(null, trailers) && delegate.failDuplicateCallbacks.get()) {
196       throw new AssertionError("inboundTrailers called more than once");
197     }
198   }
199 }
200