1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal.testing; 18 19 import io.grpc.ClientStreamTracer; 20 import io.grpc.Metadata; 21 import io.grpc.Status; 22 import java.util.concurrent.CountDownLatch; 23 import java.util.concurrent.TimeUnit; 24 import java.util.concurrent.atomic.AtomicReference; 25 import javax.annotation.Nullable; 26 27 /** 28 * A {@link ClientStreamTracer} suitable for testing. 29 */ 30 public class TestClientStreamTracer extends ClientStreamTracer implements TestStreamTracer { 31 private final TestBaseStreamTracer delegate = new TestBaseStreamTracer(); 32 protected final CountDownLatch outboundHeadersLatch = new CountDownLatch(1); 33 protected final AtomicReference<Throwable> outboundHeadersCalled = 34 new AtomicReference<>(); 35 protected final AtomicReference<Throwable> inboundHeadersCalled = 36 new AtomicReference<>(); 37 protected final AtomicReference<Metadata> inboundTrailers = new AtomicReference<>(); 38 39 @Override await()40 public void await() throws InterruptedException { 41 delegate.await(); 42 } 43 44 @Override await(long timeout, TimeUnit timeUnit)45 public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { 46 return delegate.await(timeout, timeUnit); 47 } 48 49 /** 50 * Returns if {@link ClientStreamTracer#inboundHeaders} has been called. 51 */ getInboundHeaders()52 public boolean getInboundHeaders() { 53 return inboundHeadersCalled.get() != null; 54 } 55 56 /** 57 * Returns the inbound trailers if {@link ClientStreamTracer#inboundTrailers} has been called, or 58 * {@code null}. 59 */ 60 @Nullable getInboundTrailers()61 public Metadata getInboundTrailers() { 62 return inboundTrailers.get(); 63 } 64 65 /** 66 * Returns if {@link ClientStreamTracer#outboundHeaders} has been called. 67 */ getOutboundHeaders()68 public boolean getOutboundHeaders() { 69 return outboundHeadersCalled.get() != null; 70 } 71 72 /** 73 * Allow tests to await the outbound header event, which depending on the test case may be 74 * necessary (e.g., if we test for a Netty client's outbound headers upon receiving the start of 75 * stream on the server side, the tracer won't know that headers were sent until a channel future 76 * executes). 77 */ awaitOutboundHeaders(int timeout, TimeUnit unit)78 public boolean awaitOutboundHeaders(int timeout, TimeUnit unit) throws Exception { 79 return outboundHeadersLatch.await(timeout, unit); 80 } 81 82 @Override getStatus()83 public Status getStatus() { 84 return delegate.getStatus(); 85 } 86 87 @Override getInboundWireSize()88 public long getInboundWireSize() { 89 return delegate.getInboundWireSize(); 90 } 91 92 @Override getInboundUncompressedSize()93 public long getInboundUncompressedSize() { 94 return delegate.getInboundUncompressedSize(); 95 } 96 97 @Override getOutboundWireSize()98 public long getOutboundWireSize() { 99 return delegate.getOutboundWireSize(); 100 } 101 102 @Override getOutboundUncompressedSize()103 public long getOutboundUncompressedSize() { 104 return delegate.getOutboundUncompressedSize(); 105 } 106 107 @Override setFailDuplicateCallbacks(boolean fail)108 public void setFailDuplicateCallbacks(boolean fail) { 109 delegate.setFailDuplicateCallbacks(fail); 110 } 111 112 @Override nextOutboundEvent()113 public String nextOutboundEvent() { 114 return delegate.nextOutboundEvent(); 115 } 116 117 @Override nextInboundEvent()118 public String nextInboundEvent() { 119 return delegate.nextInboundEvent(); 120 } 121 122 @Override outboundWireSize(long bytes)123 public void outboundWireSize(long bytes) { 124 delegate.outboundWireSize(bytes); 125 } 126 127 @Override inboundWireSize(long bytes)128 public void inboundWireSize(long bytes) { 129 delegate.inboundWireSize(bytes); 130 } 131 132 @Override outboundUncompressedSize(long bytes)133 public void outboundUncompressedSize(long bytes) { 134 delegate.outboundUncompressedSize(bytes); 135 } 136 137 @Override inboundUncompressedSize(long bytes)138 public void inboundUncompressedSize(long bytes) { 139 delegate.inboundUncompressedSize(bytes); 140 } 141 142 @Override streamClosed(Status status)143 public void streamClosed(Status status) { 144 delegate.streamClosed(status); 145 } 146 147 @Override inboundMessage(int seqNo)148 public void inboundMessage(int seqNo) { 149 delegate.inboundMessage(seqNo); 150 } 151 152 @Override outboundMessage(int seqNo)153 public void outboundMessage(int seqNo) { 154 delegate.outboundMessage(seqNo); 155 } 156 157 @Override outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize)158 public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { 159 delegate.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize); 160 } 161 162 @Override inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize)163 public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) { 164 delegate.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize); 165 } 166 167 @Override outboundHeaders()168 public void outboundHeaders() { 169 if (!outboundHeadersCalled.compareAndSet(null, new Exception("first stack")) 170 && delegate.failDuplicateCallbacks.get()) { 171 throw new AssertionError( 172 "outboundHeaders called more than once", 173 new Exception("second stack", outboundHeadersCalled.get())); 174 } 175 outboundHeadersLatch.countDown(); 176 } 177 178 @Override inboundHeaders()179 public void inboundHeaders() { 180 if (!inboundHeadersCalled.compareAndSet(null, new Exception("first stack")) 181 && delegate.failDuplicateCallbacks.get()) { 182 throw new AssertionError( 183 "inboundHeaders called more than once", 184 new Exception("second stack", inboundHeadersCalled.get())); 185 } 186 } 187 188 @Override inboundTrailers(Metadata trailers)189 public void inboundTrailers(Metadata trailers) { 190 if (delegate.getStatus() != null) { 191 throw new AssertionError( 192 "stream has already been closed with " + delegate.getStatus(), 193 delegate.streamClosedStack.get()); 194 } 195 if (!inboundTrailers.compareAndSet(null, trailers) && delegate.failDuplicateCallbacks.get()) { 196 throw new AssertionError("inboundTrailers called more than once"); 197 } 198 } 199 } 200