• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal.testing;
18 
19 import io.grpc.Status;
20 import io.grpc.StreamTracer;
21 import java.util.Locale;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicLong;
27 import java.util.concurrent.atomic.AtomicReference;
28 import javax.annotation.Nullable;
29 
30 /**
31  * A {@link StreamTracer} suitable for testing.
32  */
33 public interface TestStreamTracer {
34 
35   /**
36    * Waits for the stream to be done.
37    */
await()38   void await() throws InterruptedException;
39 
40   /**
41    * Waits for the stream to be done.
42    */
await(long timeout, TimeUnit timeUnit)43   boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException;
44 
45   /**
46    * Returns the status passed to {@link StreamTracer#streamClosed}.
47    */
getStatus()48   Status getStatus();
49 
50   /**
51    * Returns to sum of all sizes passed to {@link StreamTracer#inboundWireSize}.
52    */
getInboundWireSize()53   long getInboundWireSize();
54 
55   /**
56    * Returns to sum of all sizes passed to {@link StreamTracer#inboundUncompressedSize}.
57    */
getInboundUncompressedSize()58   long getInboundUncompressedSize();
59 
60   /**
61    * Returns to sum of all sizes passed to {@link StreamTracer#outboundWireSize}.
62    */
getOutboundWireSize()63   long getOutboundWireSize();
64 
65   /**
66    * Returns to sum of al sizes passed to {@link StreamTracer#outboundUncompressedSize}.
67    */
getOutboundUncompressedSize()68   long getOutboundUncompressedSize();
69 
70   /**
71    * Sets whether to fail on unexpected duplicate calls to callback methods.
72    */
setFailDuplicateCallbacks(boolean fail)73   void setFailDuplicateCallbacks(boolean fail);
74 
75   /**
76    * Returns the next captured outbound message event.
77    */
78   @Nullable
nextOutboundEvent()79   String nextOutboundEvent();
80 
81   /**
82    * Returns the next captured outbound message event.
83    */
nextInboundEvent()84   String nextInboundEvent();
85 
86   /**
87    * A {@link StreamTracer} suitable for testing.
88    */
89   public static class TestBaseStreamTracer extends StreamTracer implements TestStreamTracer {
90 
91     protected final AtomicLong outboundWireSize = new AtomicLong();
92     protected final AtomicLong inboundWireSize = new AtomicLong();
93     protected final AtomicLong outboundUncompressedSize = new AtomicLong();
94     protected final AtomicLong inboundUncompressedSize = new AtomicLong();
95     protected final LinkedBlockingQueue<String> outboundEvents = new LinkedBlockingQueue<>();
96     protected final LinkedBlockingQueue<String> inboundEvents = new LinkedBlockingQueue<>();
97     protected final AtomicReference<Status> streamClosedStatus = new AtomicReference<>();
98     protected final AtomicReference<Throwable> streamClosedStack = new AtomicReference<>();
99     protected final CountDownLatch streamClosed = new CountDownLatch(1);
100     protected final AtomicBoolean failDuplicateCallbacks = new AtomicBoolean(true);
101 
102     @Override
await()103     public void await() throws InterruptedException {
104       streamClosed.await();
105     }
106 
107     @Override
await(long timeout, TimeUnit timeUnit)108     public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
109       return streamClosed.await(timeout, timeUnit);
110     }
111 
112     @Override
getStatus()113     public Status getStatus() {
114       return streamClosedStatus.get();
115     }
116 
117     @Override
getInboundWireSize()118     public long getInboundWireSize() {
119       return inboundWireSize.get();
120     }
121 
122     @Override
getInboundUncompressedSize()123     public long getInboundUncompressedSize() {
124       return inboundUncompressedSize.get();
125     }
126 
127     @Override
getOutboundWireSize()128     public long getOutboundWireSize() {
129       return outboundWireSize.get();
130     }
131 
132     @Override
getOutboundUncompressedSize()133     public long getOutboundUncompressedSize() {
134       return outboundUncompressedSize.get();
135     }
136 
137     @Override
outboundWireSize(long bytes)138     public void outboundWireSize(long bytes) {
139       outboundWireSize.addAndGet(bytes);
140     }
141 
142     @Override
inboundWireSize(long bytes)143     public void inboundWireSize(long bytes) {
144       inboundWireSize.addAndGet(bytes);
145     }
146 
147     @Override
outboundUncompressedSize(long bytes)148     public void outboundUncompressedSize(long bytes) {
149       outboundUncompressedSize.addAndGet(bytes);
150     }
151 
152     @Override
inboundUncompressedSize(long bytes)153     public void inboundUncompressedSize(long bytes) {
154       inboundUncompressedSize.addAndGet(bytes);
155     }
156 
157     @Override
streamClosed(Status status)158     public void streamClosed(Status status) {
159       streamClosedStack.compareAndSet(null, new Throwable("first call"));
160       if (!streamClosedStatus.compareAndSet(null, status)) {
161         if (failDuplicateCallbacks.get()) {
162           throw new AssertionError("streamClosed called more than once", streamClosedStack.get());
163         }
164       } else {
165         streamClosed.countDown();
166       }
167     }
168 
169     @Override
inboundMessage(int seqNo)170     public void inboundMessage(int seqNo) {
171       inboundEvents.add("inboundMessage(" + seqNo + ")");
172     }
173 
174     @Override
outboundMessage(int seqNo)175     public void outboundMessage(int seqNo) {
176       outboundEvents.add("outboundMessage(" + seqNo + ")");
177     }
178 
179     @Override
outboundMessageSent( int seqNo, long optionalWireSize, long optionalUncompressedSize)180     public void outboundMessageSent(
181         int seqNo, long optionalWireSize, long optionalUncompressedSize) {
182       outboundEvents.add(
183           String.format(
184               Locale.US,
185               "outboundMessageSent(%d, %d, %d)",
186               seqNo, optionalWireSize, optionalUncompressedSize));
187     }
188 
189     @Override
inboundMessageRead( int seqNo, long optionalWireSize, long optionalUncompressedSize)190     public void inboundMessageRead(
191         int seqNo, long optionalWireSize, long optionalUncompressedSize) {
192       inboundEvents.add(
193           String.format(
194               Locale.US,
195               "inboundMessageRead(%d, %d, %d)", seqNo, optionalWireSize, optionalUncompressedSize));
196     }
197 
198     @Override
setFailDuplicateCallbacks(boolean fail)199     public void setFailDuplicateCallbacks(boolean fail) {
200       failDuplicateCallbacks.set(fail);
201     }
202 
203     @Override
nextOutboundEvent()204     public String nextOutboundEvent() {
205       return outboundEvents.poll();
206     }
207 
208     @Override
nextInboundEvent()209     public String nextInboundEvent() {
210       return inboundEvents.poll();
211     }
212   }
213 }
214