1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal.testing; 18 19 import io.grpc.Status; 20 import io.grpc.StreamTracer; 21 import java.util.Locale; 22 import java.util.concurrent.CountDownLatch; 23 import java.util.concurrent.LinkedBlockingQueue; 24 import java.util.concurrent.TimeUnit; 25 import java.util.concurrent.atomic.AtomicBoolean; 26 import java.util.concurrent.atomic.AtomicLong; 27 import java.util.concurrent.atomic.AtomicReference; 28 import javax.annotation.Nullable; 29 30 /** 31 * A {@link StreamTracer} suitable for testing. 32 */ 33 public interface TestStreamTracer { 34 35 /** 36 * Waits for the stream to be done. 37 */ await()38 void await() throws InterruptedException; 39 40 /** 41 * Waits for the stream to be done. 42 */ await(long timeout, TimeUnit timeUnit)43 boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException; 44 45 /** 46 * Returns the status passed to {@link StreamTracer#streamClosed}. 47 */ getStatus()48 Status getStatus(); 49 50 /** 51 * Returns to sum of all sizes passed to {@link StreamTracer#inboundWireSize}. 52 */ getInboundWireSize()53 long getInboundWireSize(); 54 55 /** 56 * Returns to sum of all sizes passed to {@link StreamTracer#inboundUncompressedSize}. 57 */ getInboundUncompressedSize()58 long getInboundUncompressedSize(); 59 60 /** 61 * Returns to sum of all sizes passed to {@link StreamTracer#outboundWireSize}. 62 */ getOutboundWireSize()63 long getOutboundWireSize(); 64 65 /** 66 * Returns to sum of al sizes passed to {@link StreamTracer#outboundUncompressedSize}. 67 */ getOutboundUncompressedSize()68 long getOutboundUncompressedSize(); 69 70 /** 71 * Sets whether to fail on unexpected duplicate calls to callback methods. 72 */ setFailDuplicateCallbacks(boolean fail)73 void setFailDuplicateCallbacks(boolean fail); 74 75 /** 76 * Returns the next captured outbound message event. 77 */ 78 @Nullable nextOutboundEvent()79 String nextOutboundEvent(); 80 81 /** 82 * Returns the next captured outbound message event. 83 */ nextInboundEvent()84 String nextInboundEvent(); 85 86 /** 87 * A {@link StreamTracer} suitable for testing. 88 */ 89 public static class TestBaseStreamTracer extends StreamTracer implements TestStreamTracer { 90 91 protected final AtomicLong outboundWireSize = new AtomicLong(); 92 protected final AtomicLong inboundWireSize = new AtomicLong(); 93 protected final AtomicLong outboundUncompressedSize = new AtomicLong(); 94 protected final AtomicLong inboundUncompressedSize = new AtomicLong(); 95 protected final LinkedBlockingQueue<String> outboundEvents = new LinkedBlockingQueue<>(); 96 protected final LinkedBlockingQueue<String> inboundEvents = new LinkedBlockingQueue<>(); 97 protected final AtomicReference<Status> streamClosedStatus = new AtomicReference<>(); 98 protected final AtomicReference<Throwable> streamClosedStack = new AtomicReference<>(); 99 protected final CountDownLatch streamClosed = new CountDownLatch(1); 100 protected final AtomicBoolean failDuplicateCallbacks = new AtomicBoolean(true); 101 102 @Override await()103 public void await() throws InterruptedException { 104 streamClosed.await(); 105 } 106 107 @Override await(long timeout, TimeUnit timeUnit)108 public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException { 109 return streamClosed.await(timeout, timeUnit); 110 } 111 112 @Override getStatus()113 public Status getStatus() { 114 return streamClosedStatus.get(); 115 } 116 117 @Override getInboundWireSize()118 public long getInboundWireSize() { 119 return inboundWireSize.get(); 120 } 121 122 @Override getInboundUncompressedSize()123 public long getInboundUncompressedSize() { 124 return inboundUncompressedSize.get(); 125 } 126 127 @Override getOutboundWireSize()128 public long getOutboundWireSize() { 129 return outboundWireSize.get(); 130 } 131 132 @Override getOutboundUncompressedSize()133 public long getOutboundUncompressedSize() { 134 return outboundUncompressedSize.get(); 135 } 136 137 @Override outboundWireSize(long bytes)138 public void outboundWireSize(long bytes) { 139 outboundWireSize.addAndGet(bytes); 140 } 141 142 @Override inboundWireSize(long bytes)143 public void inboundWireSize(long bytes) { 144 inboundWireSize.addAndGet(bytes); 145 } 146 147 @Override outboundUncompressedSize(long bytes)148 public void outboundUncompressedSize(long bytes) { 149 outboundUncompressedSize.addAndGet(bytes); 150 } 151 152 @Override inboundUncompressedSize(long bytes)153 public void inboundUncompressedSize(long bytes) { 154 inboundUncompressedSize.addAndGet(bytes); 155 } 156 157 @Override streamClosed(Status status)158 public void streamClosed(Status status) { 159 streamClosedStack.compareAndSet(null, new Throwable("first call")); 160 if (!streamClosedStatus.compareAndSet(null, status)) { 161 if (failDuplicateCallbacks.get()) { 162 throw new AssertionError("streamClosed called more than once", streamClosedStack.get()); 163 } 164 } else { 165 streamClosed.countDown(); 166 } 167 } 168 169 @Override inboundMessage(int seqNo)170 public void inboundMessage(int seqNo) { 171 inboundEvents.add("inboundMessage(" + seqNo + ")"); 172 } 173 174 @Override outboundMessage(int seqNo)175 public void outboundMessage(int seqNo) { 176 outboundEvents.add("outboundMessage(" + seqNo + ")"); 177 } 178 179 @Override outboundMessageSent( int seqNo, long optionalWireSize, long optionalUncompressedSize)180 public void outboundMessageSent( 181 int seqNo, long optionalWireSize, long optionalUncompressedSize) { 182 outboundEvents.add( 183 String.format( 184 Locale.US, 185 "outboundMessageSent(%d, %d, %d)", 186 seqNo, optionalWireSize, optionalUncompressedSize)); 187 } 188 189 @Override inboundMessageRead( int seqNo, long optionalWireSize, long optionalUncompressedSize)190 public void inboundMessageRead( 191 int seqNo, long optionalWireSize, long optionalUncompressedSize) { 192 inboundEvents.add( 193 String.format( 194 Locale.US, 195 "inboundMessageRead(%d, %d, %d)", seqNo, optionalWireSize, optionalUncompressedSize)); 196 } 197 198 @Override setFailDuplicateCallbacks(boolean fail)199 public void setFailDuplicateCallbacks(boolean fail) { 200 failDuplicateCallbacks.set(fail); 201 } 202 203 @Override nextOutboundEvent()204 public String nextOutboundEvent() { 205 return outboundEvents.poll(); 206 } 207 208 @Override nextInboundEvent()209 public String nextInboundEvent() { 210 return inboundEvents.poll(); 211 } 212 } 213 } 214