1 /* 2 * Copyright 2021 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static java.util.concurrent.TimeUnit.SECONDS; 21 import static org.mockito.ArgumentMatchers.any; 22 import static org.mockito.Mockito.never; 23 import static org.mockito.Mockito.timeout; 24 import static org.mockito.Mockito.verify; 25 26 import com.google.common.collect.ImmutableMap; 27 import io.grpc.Attributes; 28 import io.grpc.CallOptions; 29 import io.grpc.ClientCall; 30 import io.grpc.ClientInterceptor; 31 import io.grpc.ClientStreamTracer; 32 import io.grpc.ClientStreamTracer.StreamInfo; 33 import io.grpc.Deadline; 34 import io.grpc.Deadline.Ticker; 35 import io.grpc.IntegerMarshaller; 36 import io.grpc.ManagedChannel; 37 import io.grpc.Metadata; 38 import io.grpc.MethodDescriptor; 39 import io.grpc.MethodDescriptor.MethodType; 40 import io.grpc.Server; 41 import io.grpc.ServerCall; 42 import io.grpc.ServerCall.Listener; 43 import io.grpc.ServerCallHandler; 44 import io.grpc.ServerMethodDefinition; 45 import io.grpc.ServerServiceDefinition; 46 import io.grpc.Status; 47 import io.grpc.Status.Code; 48 import io.grpc.StringMarshaller; 49 import io.grpc.census.InternalCensusStatsAccessor; 50 import io.grpc.census.internal.DeprecatedCensusConstants; 51 import io.grpc.internal.FakeClock; 52 import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; 53 import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; 54 import io.grpc.internal.testing.StatsTestUtils.FakeTagger; 55 import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; 56 import io.grpc.netty.NettyChannelBuilder; 57 import io.grpc.netty.NettyServerBuilder; 58 import io.grpc.testing.GrpcCleanupRule; 59 import io.netty.channel.DefaultEventLoopGroup; 60 import io.netty.channel.EventLoopGroup; 61 import io.netty.channel.local.LocalAddress; 62 import io.netty.channel.local.LocalChannel; 63 import io.netty.channel.local.LocalServerChannel; 64 import io.netty.util.concurrent.ScheduledFuture; 65 import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; 66 import io.opencensus.stats.Measure; 67 import io.opencensus.stats.Measure.MeasureDouble; 68 import io.opencensus.stats.Measure.MeasureLong; 69 import io.opencensus.tags.TagValue; 70 import java.util.Arrays; 71 import java.util.HashMap; 72 import java.util.Map; 73 import java.util.concurrent.CountDownLatch; 74 import java.util.concurrent.LinkedBlockingQueue; 75 import java.util.concurrent.TimeUnit; 76 import java.util.concurrent.atomic.AtomicBoolean; 77 import org.junit.Rule; 78 import org.junit.Test; 79 import org.junit.runner.RunWith; 80 import org.junit.runners.JUnit4; 81 import org.mockito.ArgumentCaptor; 82 import org.mockito.Mock; 83 import org.mockito.junit.MockitoJUnit; 84 import org.mockito.junit.MockitoRule; 85 86 @RunWith(JUnit4.class) 87 public class RetryTest { 88 private static final FakeTagger tagger = new FakeTagger(); 89 private static final FakeTagContextBinarySerializer tagContextBinarySerializer = 90 new FakeTagContextBinarySerializer(); 91 private static final MeasureLong RETRIES_PER_CALL = 92 Measure.MeasureLong.create( 93 "grpc.io/client/retries_per_call", "Number of retries per call", "1"); 94 private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = 95 Measure.MeasureLong.create( 96 "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); 97 private static final MeasureDouble RETRY_DELAY_PER_CALL = 98 Measure.MeasureDouble.create( 99 "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); 100 101 @Rule 102 public final MockitoRule mocks = MockitoJUnit.rule(); 103 @Rule 104 public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); 105 private final FakeClock fakeClock = new FakeClock(); 106 @Mock 107 private ClientCall.Listener<Integer> mockCallListener; 108 private CountDownLatch backoffLatch = new CountDownLatch(1); 109 private final EventLoopGroup group = new DefaultEventLoopGroup() { 110 @SuppressWarnings("FutureReturnValueIgnored") 111 @Override 112 public ScheduledFuture<?> schedule( 113 final Runnable command, final long delay, final TimeUnit unit) { 114 if (!command.getClass().getName().contains("RetryBackoffRunnable")) { 115 return super.schedule(command, delay, unit); 116 } 117 fakeClock.getScheduledExecutorService().schedule( 118 new Runnable() { 119 @Override 120 public void run() { 121 group.execute(command); 122 } 123 }, 124 delay, 125 unit); 126 backoffLatch.countDown(); 127 return super.schedule( 128 new Runnable() { 129 @Override 130 public void run() {} // no-op 131 }, 132 0, 133 TimeUnit.NANOSECONDS); 134 } 135 }; 136 private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); 137 private final ClientInterceptor statsInterceptor = 138 InternalCensusStatsAccessor.getClientInterceptor( 139 tagger, tagContextBinarySerializer, clientStatsRecorder, 140 fakeClock.getStopwatchSupplier(), true, true, true, 141 /* recordRealTimeMetrics= */ true, /* recordRetryMetrics= */ true); 142 private final MethodDescriptor<String, Integer> clientStreamingMethod = 143 MethodDescriptor.<String, Integer>newBuilder() 144 .setType(MethodType.CLIENT_STREAMING) 145 .setFullMethodName("service/method") 146 .setRequestMarshaller(new StringMarshaller()) 147 .setResponseMarshaller(new IntegerMarshaller()) 148 .build(); 149 private final LinkedBlockingQueue<ServerCall<String, Integer>> serverCalls = 150 new LinkedBlockingQueue<>(); 151 private final ServerMethodDefinition<String, Integer> methodDefinition = 152 ServerMethodDefinition.create( 153 clientStreamingMethod, 154 new ServerCallHandler<String, Integer>() { 155 @Override 156 public Listener<String> startCall(ServerCall<String, Integer> call, Metadata headers) { 157 serverCalls.offer(call); 158 return new Listener<String>() {}; 159 } 160 } 161 ); 162 private final ServerServiceDefinition serviceDefinition = 163 ServerServiceDefinition.builder(clientStreamingMethod.getServiceName()) 164 .addMethod(methodDefinition) 165 .build(); 166 private final LocalAddress localAddress = new LocalAddress(this.getClass().getName()); 167 private Server localServer; 168 private ManagedChannel channel; 169 private Map<String, Object> retryPolicy = null; 170 private long bufferLimit = 1L << 20; // 1M 171 startNewServer()172 private void startNewServer() throws Exception { 173 localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress) 174 .channelType(LocalServerChannel.class) 175 .bossEventLoopGroup(group) 176 .workerEventLoopGroup(group) 177 .addService(serviceDefinition) 178 .build()); 179 localServer.start(); 180 } 181 createNewChannel()182 private void createNewChannel() { 183 Map<String, Object> methodConfig = new HashMap<>(); 184 Map<String, Object> name = new HashMap<>(); 185 name.put("service", "service"); 186 methodConfig.put("name", Arrays.<Object>asList(name)); 187 if (retryPolicy != null) { 188 methodConfig.put("retryPolicy", retryPolicy); 189 } 190 Map<String, Object> rawServiceConfig = new HashMap<>(); 191 rawServiceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig)); 192 channel = cleanupRule.register( 193 NettyChannelBuilder.forAddress(localAddress) 194 .channelType(LocalChannel.class) 195 .eventLoopGroup(group) 196 .usePlaintext() 197 .enableRetry() 198 .perRpcBufferLimit(bufferLimit) 199 .defaultServiceConfig(rawServiceConfig) 200 .intercept(statsInterceptor) 201 .build()); 202 } 203 elapseBackoff(long time, TimeUnit unit)204 private void elapseBackoff(long time, TimeUnit unit) throws Exception { 205 assertThat(backoffLatch.await(5, SECONDS)).isTrue(); 206 backoffLatch = new CountDownLatch(1); 207 fakeClock.forwardTime(time, unit); 208 } 209 assertRpcStartedRecorded()210 private void assertRpcStartedRecorded() throws Exception { 211 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 212 assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_STARTED_RPCS)) 213 .isEqualTo(1); 214 } 215 assertOutboundMessageRecorded()216 private void assertOutboundMessageRecorded() throws Exception { 217 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 218 assertThat( 219 record.getMetricAsLongOrFail( 220 RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD)) 221 .isEqualTo(1); 222 } 223 assertInboundMessageRecorded()224 private void assertInboundMessageRecorded() throws Exception { 225 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 226 assertThat( 227 record.getMetricAsLongOrFail( 228 RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD)) 229 .isEqualTo(1); 230 } 231 assertOutboundWireSizeRecorded(long length)232 private void assertOutboundWireSizeRecorded(long length) throws Exception { 233 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 234 assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD)) 235 .isEqualTo(length); 236 } 237 assertInboundWireSizeRecorded(long length)238 private void assertInboundWireSizeRecorded(long length) throws Exception { 239 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 240 assertThat( 241 record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD)) 242 .isEqualTo(length); 243 } 244 assertRpcStatusRecorded( Status.Code code, long roundtripLatencyMs, long outboundMessages)245 private void assertRpcStatusRecorded( 246 Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception { 247 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 248 TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); 249 assertThat(statusTag.asString()).isEqualTo(code.toString()); 250 assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)) 251 .isEqualTo(1); 252 assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY)) 253 .isEqualTo(roundtripLatencyMs); 254 assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_RPC)) 255 .isEqualTo(outboundMessages); 256 } 257 assertRetryStatsRecorded( int numRetries, int numTransparentRetries, long retryDelayMs)258 private void assertRetryStatsRecorded( 259 int numRetries, int numTransparentRetries, long retryDelayMs) throws Exception { 260 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 261 assertThat(record.getMetricAsLongOrFail(RETRIES_PER_CALL)).isEqualTo(numRetries); 262 assertThat(record.getMetricAsLongOrFail(TRANSPARENT_RETRIES_PER_CALL)) 263 .isEqualTo(numTransparentRetries); 264 assertThat(record.getMetricAsLongOrFail(RETRY_DELAY_PER_CALL)).isEqualTo(retryDelayMs); 265 } 266 267 @Test retryUntilBufferLimitExceeded()268 public void retryUntilBufferLimitExceeded() throws Exception { 269 String message = "String of length 20."; 270 271 startNewServer(); 272 bufferLimit = message.length() * 2L - 1; // Can buffer no more than 1 message. 273 retryPolicy = ImmutableMap.<String, Object>builder() 274 .put("maxAttempts", 4D) 275 .put("initialBackoff", "10s") 276 .put("maxBackoff", "10s") 277 .put("backoffMultiplier", 1D) 278 .put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE")) 279 .buildOrThrow(); 280 createNewChannel(); 281 ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT); 282 call.start(mockCallListener, new Metadata()); 283 call.sendMessage(message); 284 285 ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS); 286 serverCall.request(2); 287 // trigger retry 288 serverCall.close( 289 Status.UNAVAILABLE.withDescription("original attempt failed"), 290 new Metadata()); 291 elapseBackoff(10, SECONDS); 292 // 2nd attempt received 293 serverCall = serverCalls.poll(5, SECONDS); 294 serverCall.request(2); 295 verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class)); 296 // send one more message, should exceed buffer limit 297 call.sendMessage(message); 298 // let attempt fail 299 serverCall.close( 300 Status.UNAVAILABLE.withDescription("2nd attempt failed"), 301 new Metadata()); 302 // no more retry 303 ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class); 304 verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class)); 305 assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed"); 306 } 307 308 @Test statsRecorded()309 public void statsRecorded() throws Exception { 310 startNewServer(); 311 retryPolicy = ImmutableMap.<String, Object>builder() 312 .put("maxAttempts", 4D) 313 .put("initialBackoff", "10s") 314 .put("maxBackoff", "10s") 315 .put("backoffMultiplier", 1D) 316 .put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE")) 317 .buildOrThrow(); 318 createNewChannel(); 319 320 ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT); 321 call.start(mockCallListener, new Metadata()); 322 assertRpcStartedRecorded(); 323 String message = "String of length 20."; 324 call.sendMessage(message); 325 assertOutboundMessageRecorded(); 326 ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS); 327 serverCall.request(2); 328 assertOutboundWireSizeRecorded(message.length()); 329 // original attempt latency 330 fakeClock.forwardTime(1, SECONDS); 331 // trigger retry 332 serverCall.close( 333 Status.UNAVAILABLE.withDescription("original attempt failed"), 334 new Metadata()); 335 assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1); 336 elapseBackoff(10, SECONDS); 337 assertRpcStartedRecorded(); 338 assertOutboundMessageRecorded(); 339 serverCall = serverCalls.poll(5, SECONDS); 340 serverCall.request(2); 341 assertOutboundWireSizeRecorded(message.length()); 342 message = "new message"; 343 call.sendMessage(message); 344 assertOutboundMessageRecorded(); 345 assertOutboundWireSizeRecorded(message.length()); 346 // retry attempt latency 347 fakeClock.forwardTime(2, SECONDS); 348 serverCall.sendHeaders(new Metadata()); 349 serverCall.sendMessage(3); 350 serverCall.close(Status.OK, new Metadata()); 351 call.request(1); 352 assertInboundMessageRecorded(); 353 assertInboundWireSizeRecorded(1); 354 assertRpcStatusRecorded(Status.Code.OK, 12000, 2); 355 assertRetryStatsRecorded(1, 0, 0); 356 } 357 358 @Test statsRecorde_callCancelledBeforeCommit()359 public void statsRecorde_callCancelledBeforeCommit() throws Exception { 360 startNewServer(); 361 retryPolicy = ImmutableMap.<String, Object>builder() 362 .put("maxAttempts", 4D) 363 .put("initialBackoff", "10s") 364 .put("maxBackoff", "10s") 365 .put("backoffMultiplier", 1D) 366 .put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE")) 367 .buildOrThrow(); 368 createNewChannel(); 369 370 // We will have streamClosed return at a particular moment that we want. 371 final CountDownLatch streamClosedLatch = new CountDownLatch(1); 372 ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() { 373 @Override 374 public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { 375 return new ClientStreamTracer() { 376 @Override 377 public void streamClosed(Status status) { 378 if (status.getCode().equals(Code.CANCELLED)) { 379 try { 380 streamClosedLatch.await(); 381 } catch (InterruptedException e) { 382 Thread.currentThread().interrupt(); 383 throw new AssertionError("streamClosedLatch interrupted", e); 384 } 385 } 386 } 387 }; 388 } 389 }; 390 ClientCall<String, Integer> call = channel.newCall( 391 clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory)); 392 call.start(mockCallListener, new Metadata()); 393 assertRpcStartedRecorded(); 394 fakeClock.forwardTime(5, SECONDS); 395 String message = "String of length 20."; 396 call.sendMessage(message); 397 assertOutboundMessageRecorded(); 398 ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS); 399 serverCall.request(2); 400 assertOutboundWireSizeRecorded(message.length()); 401 // trigger retry 402 serverCall.close( 403 Status.UNAVAILABLE.withDescription("original attempt failed"), 404 new Metadata()); 405 assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1); 406 elapseBackoff(10, SECONDS); 407 assertRpcStartedRecorded(); 408 assertOutboundMessageRecorded(); 409 serverCall = serverCalls.poll(5, SECONDS); 410 serverCall.request(2); 411 assertOutboundWireSizeRecorded(message.length()); 412 fakeClock.forwardTime(7, SECONDS); 413 // A noop substream will commit. But call is not yet closed. 414 call.cancel("Cancelled before commit", null); 415 // Let the netty substream listener be closed. 416 streamClosedLatch.countDown(); 417 // The call listener is closed. 418 verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class)); 419 assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1); 420 assertRetryStatsRecorded(1, 0, 0); 421 } 422 423 @Test serverCancelledAndClientDeadlineExceeded()424 public void serverCancelledAndClientDeadlineExceeded() throws Exception { 425 startNewServer(); 426 createNewChannel(); 427 428 class CloseDelayedTracer extends ClientStreamTracer { 429 @Override 430 public void streamClosed(Status status) { 431 fakeClock.forwardTime(10, SECONDS); 432 } 433 } 434 435 class CloseDelayedTracerFactory extends ClientStreamTracer.Factory { 436 @Override 437 public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { 438 return new CloseDelayedTracer(); 439 } 440 } 441 442 CallOptions callOptions = CallOptions.DEFAULT 443 .withDeadline(Deadline.after( 444 10, 445 SECONDS, 446 new Ticker() { 447 @Override 448 public long nanoTime() { 449 return fakeClock.getTicker().read(); 450 } 451 })) 452 .withStreamTracerFactory(new CloseDelayedTracerFactory()); 453 ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions); 454 call.start(mockCallListener, new Metadata()); 455 assertRpcStartedRecorded(); 456 ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS); 457 serverCall.close(Status.CANCELLED, new Metadata()); 458 assertRpcStatusRecorded(Code.DEADLINE_EXCEEDED, 10_000, 0); 459 assertRetryStatsRecorded(0, 0, 0); 460 } 461 462 @Test transparentRetryStatsRecorded()463 public void transparentRetryStatsRecorded() throws Exception { 464 startNewServer(); 465 createNewChannel(); 466 467 final AtomicBoolean originalAttemptFailed = new AtomicBoolean(); 468 class TransparentRetryTriggeringTracer extends ClientStreamTracer { 469 470 @Override 471 public void streamCreated(Attributes transportAttrs, Metadata metadata) { 472 if (originalAttemptFailed.get()) { 473 return; 474 } 475 // Send GOAWAY from server. The client may either receive GOAWAY or create the underlying 476 // netty stream and write headers first, even we await server termination as below. 477 // In the latter case, we rerun the test. We can also call localServer.shutdown() to trigger 478 // GOAWAY, but it takes a lot longer time to gracefully shut down. 479 localServer.shutdownNow(); 480 try { 481 localServer.awaitTermination(); 482 } catch (InterruptedException e) { 483 Thread.currentThread().interrupt(); 484 throw new AssertionError(e); 485 } 486 } 487 488 @Override 489 public void streamClosed(Status status) { 490 if (originalAttemptFailed.get()) { 491 return; 492 } 493 originalAttemptFailed.set(true); 494 try { 495 startNewServer(); 496 channel.resetConnectBackoff(); 497 } catch (Exception e) { 498 throw new AssertionError("local server can not be restarted", e); 499 } 500 } 501 } 502 503 class TransparentRetryTracerFactory extends ClientStreamTracer.Factory { 504 @Override 505 public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { 506 return new TransparentRetryTriggeringTracer(); 507 } 508 } 509 510 CallOptions callOptions = CallOptions.DEFAULT 511 .withWaitForReady() 512 .withStreamTracerFactory(new TransparentRetryTracerFactory()); 513 while (true) { 514 ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions); 515 call.start(mockCallListener, new Metadata()); 516 assertRpcStartedRecorded(); // original attempt 517 MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS); 518 assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT)) 519 .isEqualTo(1); 520 TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS); 521 if (statusTag.asString().equals(Code.UNAVAILABLE.toString())) { 522 break; 523 } else { 524 // Due to race condition, GOAWAY is not received/processed before the stream is closed due 525 // to connection error. Rerun the test. 526 assertThat(statusTag.asString()).isEqualTo(Code.UNKNOWN.toString()); 527 assertRetryStatsRecorded(0, 0, 0); 528 originalAttemptFailed.set(false); 529 } 530 } 531 assertRpcStartedRecorded(); // retry attempt 532 ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS); 533 serverCall.close(Status.INVALID_ARGUMENT, new Metadata()); 534 assertRpcStatusRecorded(Code.INVALID_ARGUMENT, 0, 0); 535 assertRetryStatsRecorded(0, 1, 0); 536 } 537 } 538