• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2021 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static java.util.concurrent.TimeUnit.SECONDS;
21 import static org.mockito.ArgumentMatchers.any;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.timeout;
24 import static org.mockito.Mockito.verify;
25 
26 import com.google.common.collect.ImmutableMap;
27 import io.grpc.Attributes;
28 import io.grpc.CallOptions;
29 import io.grpc.ClientCall;
30 import io.grpc.ClientInterceptor;
31 import io.grpc.ClientStreamTracer;
32 import io.grpc.ClientStreamTracer.StreamInfo;
33 import io.grpc.Deadline;
34 import io.grpc.Deadline.Ticker;
35 import io.grpc.IntegerMarshaller;
36 import io.grpc.ManagedChannel;
37 import io.grpc.Metadata;
38 import io.grpc.MethodDescriptor;
39 import io.grpc.MethodDescriptor.MethodType;
40 import io.grpc.Server;
41 import io.grpc.ServerCall;
42 import io.grpc.ServerCall.Listener;
43 import io.grpc.ServerCallHandler;
44 import io.grpc.ServerMethodDefinition;
45 import io.grpc.ServerServiceDefinition;
46 import io.grpc.Status;
47 import io.grpc.Status.Code;
48 import io.grpc.StringMarshaller;
49 import io.grpc.census.InternalCensusStatsAccessor;
50 import io.grpc.census.internal.DeprecatedCensusConstants;
51 import io.grpc.internal.FakeClock;
52 import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
53 import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
54 import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
55 import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
56 import io.grpc.netty.NettyChannelBuilder;
57 import io.grpc.netty.NettyServerBuilder;
58 import io.grpc.testing.GrpcCleanupRule;
59 import io.netty.channel.DefaultEventLoopGroup;
60 import io.netty.channel.EventLoopGroup;
61 import io.netty.channel.local.LocalAddress;
62 import io.netty.channel.local.LocalChannel;
63 import io.netty.channel.local.LocalServerChannel;
64 import io.netty.util.concurrent.ScheduledFuture;
65 import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
66 import io.opencensus.stats.Measure;
67 import io.opencensus.stats.Measure.MeasureDouble;
68 import io.opencensus.stats.Measure.MeasureLong;
69 import io.opencensus.tags.TagValue;
70 import java.util.Arrays;
71 import java.util.HashMap;
72 import java.util.Map;
73 import java.util.concurrent.CountDownLatch;
74 import java.util.concurrent.LinkedBlockingQueue;
75 import java.util.concurrent.TimeUnit;
76 import java.util.concurrent.atomic.AtomicBoolean;
77 import org.junit.Rule;
78 import org.junit.Test;
79 import org.junit.runner.RunWith;
80 import org.junit.runners.JUnit4;
81 import org.mockito.ArgumentCaptor;
82 import org.mockito.Mock;
83 import org.mockito.junit.MockitoJUnit;
84 import org.mockito.junit.MockitoRule;
85 
86 @RunWith(JUnit4.class)
87 public class RetryTest {
88   private static final FakeTagger tagger = new FakeTagger();
89   private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
90       new FakeTagContextBinarySerializer();
91   private static final MeasureLong RETRIES_PER_CALL =
92       Measure.MeasureLong.create(
93           "grpc.io/client/retries_per_call", "Number of retries per call", "1");
94   private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
95       Measure.MeasureLong.create(
96           "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
97   private static final MeasureDouble RETRY_DELAY_PER_CALL =
98       Measure.MeasureDouble.create(
99           "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
100 
101   @Rule
102   public final MockitoRule mocks = MockitoJUnit.rule();
103   @Rule
104   public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
105   private final FakeClock fakeClock = new FakeClock();
106   @Mock
107   private ClientCall.Listener<Integer> mockCallListener;
108   private CountDownLatch backoffLatch = new CountDownLatch(1);
109   private final EventLoopGroup group = new DefaultEventLoopGroup() {
110     @SuppressWarnings("FutureReturnValueIgnored")
111     @Override
112     public ScheduledFuture<?> schedule(
113         final Runnable command, final long delay, final TimeUnit unit) {
114       if (!command.getClass().getName().contains("RetryBackoffRunnable")) {
115         return super.schedule(command, delay, unit);
116       }
117       fakeClock.getScheduledExecutorService().schedule(
118           new Runnable() {
119             @Override
120             public void run() {
121               group.execute(command);
122             }
123           },
124           delay,
125           unit);
126       backoffLatch.countDown();
127       return super.schedule(
128           new Runnable() {
129             @Override
130             public void run() {} // no-op
131           },
132           0,
133           TimeUnit.NANOSECONDS);
134     }
135   };
136   private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
137   private final ClientInterceptor statsInterceptor =
138       InternalCensusStatsAccessor.getClientInterceptor(
139           tagger, tagContextBinarySerializer, clientStatsRecorder,
140           fakeClock.getStopwatchSupplier(), true, true, true,
141           /* recordRealTimeMetrics= */ true, /* recordRetryMetrics= */ true);
142   private final MethodDescriptor<String, Integer> clientStreamingMethod =
143       MethodDescriptor.<String, Integer>newBuilder()
144           .setType(MethodType.CLIENT_STREAMING)
145           .setFullMethodName("service/method")
146           .setRequestMarshaller(new StringMarshaller())
147           .setResponseMarshaller(new IntegerMarshaller())
148           .build();
149   private final LinkedBlockingQueue<ServerCall<String, Integer>> serverCalls =
150       new LinkedBlockingQueue<>();
151   private final ServerMethodDefinition<String, Integer> methodDefinition =
152       ServerMethodDefinition.create(
153           clientStreamingMethod,
154           new ServerCallHandler<String, Integer>() {
155             @Override
156             public Listener<String> startCall(ServerCall<String, Integer> call, Metadata headers) {
157               serverCalls.offer(call);
158               return new Listener<String>() {};
159             }
160           }
161   );
162   private final ServerServiceDefinition serviceDefinition =
163       ServerServiceDefinition.builder(clientStreamingMethod.getServiceName())
164           .addMethod(methodDefinition)
165           .build();
166   private final LocalAddress localAddress = new LocalAddress(this.getClass().getName());
167   private Server localServer;
168   private ManagedChannel channel;
169   private Map<String, Object> retryPolicy = null;
170   private long bufferLimit = 1L << 20; // 1M
171 
startNewServer()172   private void startNewServer() throws Exception {
173     localServer = cleanupRule.register(NettyServerBuilder.forAddress(localAddress)
174         .channelType(LocalServerChannel.class)
175         .bossEventLoopGroup(group)
176         .workerEventLoopGroup(group)
177         .addService(serviceDefinition)
178         .build());
179     localServer.start();
180   }
181 
createNewChannel()182   private void createNewChannel() {
183     Map<String, Object> methodConfig = new HashMap<>();
184     Map<String, Object> name = new HashMap<>();
185     name.put("service", "service");
186     methodConfig.put("name", Arrays.<Object>asList(name));
187     if (retryPolicy != null) {
188       methodConfig.put("retryPolicy", retryPolicy);
189     }
190     Map<String, Object> rawServiceConfig = new HashMap<>();
191     rawServiceConfig.put("methodConfig", Arrays.<Object>asList(methodConfig));
192     channel = cleanupRule.register(
193         NettyChannelBuilder.forAddress(localAddress)
194             .channelType(LocalChannel.class)
195             .eventLoopGroup(group)
196             .usePlaintext()
197             .enableRetry()
198             .perRpcBufferLimit(bufferLimit)
199             .defaultServiceConfig(rawServiceConfig)
200             .intercept(statsInterceptor)
201             .build());
202   }
203 
elapseBackoff(long time, TimeUnit unit)204   private void elapseBackoff(long time, TimeUnit unit) throws Exception {
205     assertThat(backoffLatch.await(5, SECONDS)).isTrue();
206     backoffLatch = new CountDownLatch(1);
207     fakeClock.forwardTime(time, unit);
208   }
209 
assertRpcStartedRecorded()210   private void assertRpcStartedRecorded() throws Exception {
211     MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
212     assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_STARTED_RPCS))
213         .isEqualTo(1);
214   }
215 
assertOutboundMessageRecorded()216   private void assertOutboundMessageRecorded() throws Exception {
217     MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
218     assertThat(
219             record.getMetricAsLongOrFail(
220                 RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_METHOD))
221         .isEqualTo(1);
222   }
223 
assertInboundMessageRecorded()224   private void assertInboundMessageRecorded() throws Exception {
225     MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
226     assertThat(
227             record.getMetricAsLongOrFail(
228                 RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_METHOD))
229         .isEqualTo(1);
230   }
231 
assertOutboundWireSizeRecorded(long length)232   private void assertOutboundWireSizeRecorded(long length) throws Exception {
233     MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
234     assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_METHOD))
235         .isEqualTo(length);
236   }
237 
assertInboundWireSizeRecorded(long length)238   private void assertInboundWireSizeRecorded(long length) throws Exception {
239     MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
240     assertThat(
241             record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_METHOD))
242         .isEqualTo(length);
243   }
244 
assertRpcStatusRecorded( Status.Code code, long roundtripLatencyMs, long outboundMessages)245   private void assertRpcStatusRecorded(
246       Status.Code code, long roundtripLatencyMs, long outboundMessages) throws Exception {
247     MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
248     TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
249     assertThat(statusTag.asString()).isEqualTo(code.toString());
250     assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT))
251         .isEqualTo(1);
252     assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY))
253         .isEqualTo(roundtripLatencyMs);
254     assertThat(record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_RPC))
255         .isEqualTo(outboundMessages);
256   }
257 
assertRetryStatsRecorded( int numRetries, int numTransparentRetries, long retryDelayMs)258   private void assertRetryStatsRecorded(
259       int numRetries, int numTransparentRetries, long retryDelayMs) throws Exception {
260     MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
261     assertThat(record.getMetricAsLongOrFail(RETRIES_PER_CALL)).isEqualTo(numRetries);
262     assertThat(record.getMetricAsLongOrFail(TRANSPARENT_RETRIES_PER_CALL))
263         .isEqualTo(numTransparentRetries);
264     assertThat(record.getMetricAsLongOrFail(RETRY_DELAY_PER_CALL)).isEqualTo(retryDelayMs);
265   }
266 
267   @Test
retryUntilBufferLimitExceeded()268   public void retryUntilBufferLimitExceeded() throws Exception {
269     String message = "String of length 20.";
270 
271     startNewServer();
272     bufferLimit = message.length() * 2L - 1; // Can buffer no more than 1 message.
273     retryPolicy = ImmutableMap.<String, Object>builder()
274         .put("maxAttempts", 4D)
275         .put("initialBackoff", "10s")
276         .put("maxBackoff", "10s")
277         .put("backoffMultiplier", 1D)
278         .put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
279         .buildOrThrow();
280     createNewChannel();
281     ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT);
282     call.start(mockCallListener, new Metadata());
283     call.sendMessage(message);
284 
285     ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
286     serverCall.request(2);
287     // trigger retry
288     serverCall.close(
289         Status.UNAVAILABLE.withDescription("original attempt failed"),
290         new Metadata());
291     elapseBackoff(10, SECONDS);
292     // 2nd attempt received
293     serverCall = serverCalls.poll(5, SECONDS);
294     serverCall.request(2);
295     verify(mockCallListener, never()).onClose(any(Status.class), any(Metadata.class));
296     // send one more message, should exceed buffer limit
297     call.sendMessage(message);
298     // let attempt fail
299     serverCall.close(
300         Status.UNAVAILABLE.withDescription("2nd attempt failed"),
301         new Metadata());
302     // no more retry
303     ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
304     verify(mockCallListener, timeout(5000)).onClose(statusCaptor.capture(), any(Metadata.class));
305     assertThat(statusCaptor.getValue().getDescription()).contains("2nd attempt failed");
306   }
307 
308   @Test
statsRecorded()309   public void statsRecorded() throws Exception {
310     startNewServer();
311     retryPolicy = ImmutableMap.<String, Object>builder()
312         .put("maxAttempts", 4D)
313         .put("initialBackoff", "10s")
314         .put("maxBackoff", "10s")
315         .put("backoffMultiplier", 1D)
316         .put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
317         .buildOrThrow();
318     createNewChannel();
319 
320     ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, CallOptions.DEFAULT);
321     call.start(mockCallListener, new Metadata());
322     assertRpcStartedRecorded();
323     String message = "String of length 20.";
324     call.sendMessage(message);
325     assertOutboundMessageRecorded();
326     ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
327     serverCall.request(2);
328     assertOutboundWireSizeRecorded(message.length());
329     // original attempt latency
330     fakeClock.forwardTime(1, SECONDS);
331     // trigger retry
332     serverCall.close(
333         Status.UNAVAILABLE.withDescription("original attempt failed"),
334         new Metadata());
335     assertRpcStatusRecorded(Status.Code.UNAVAILABLE, 1000, 1);
336     elapseBackoff(10, SECONDS);
337     assertRpcStartedRecorded();
338     assertOutboundMessageRecorded();
339     serverCall = serverCalls.poll(5, SECONDS);
340     serverCall.request(2);
341     assertOutboundWireSizeRecorded(message.length());
342     message = "new message";
343     call.sendMessage(message);
344     assertOutboundMessageRecorded();
345     assertOutboundWireSizeRecorded(message.length());
346     // retry attempt latency
347     fakeClock.forwardTime(2, SECONDS);
348     serverCall.sendHeaders(new Metadata());
349     serverCall.sendMessage(3);
350     serverCall.close(Status.OK, new Metadata());
351     call.request(1);
352     assertInboundMessageRecorded();
353     assertInboundWireSizeRecorded(1);
354     assertRpcStatusRecorded(Status.Code.OK, 12000, 2);
355     assertRetryStatsRecorded(1, 0, 0);
356   }
357 
358   @Test
statsRecorde_callCancelledBeforeCommit()359   public void statsRecorde_callCancelledBeforeCommit() throws Exception {
360     startNewServer();
361     retryPolicy = ImmutableMap.<String, Object>builder()
362         .put("maxAttempts", 4D)
363         .put("initialBackoff", "10s")
364         .put("maxBackoff", "10s")
365         .put("backoffMultiplier", 1D)
366         .put("retryableStatusCodes", Arrays.<Object>asList("UNAVAILABLE"))
367         .buildOrThrow();
368     createNewChannel();
369 
370     // We will have streamClosed return at a particular moment that we want.
371     final CountDownLatch streamClosedLatch = new CountDownLatch(1);
372     ClientStreamTracer.Factory streamTracerFactory = new ClientStreamTracer.Factory() {
373       @Override
374       public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
375         return new ClientStreamTracer() {
376           @Override
377           public void streamClosed(Status status) {
378             if (status.getCode().equals(Code.CANCELLED)) {
379               try {
380                 streamClosedLatch.await();
381               } catch (InterruptedException e) {
382                 Thread.currentThread().interrupt();
383                 throw new AssertionError("streamClosedLatch interrupted", e);
384               }
385             }
386           }
387         };
388       }
389     };
390     ClientCall<String, Integer> call = channel.newCall(
391         clientStreamingMethod, CallOptions.DEFAULT.withStreamTracerFactory(streamTracerFactory));
392     call.start(mockCallListener, new Metadata());
393     assertRpcStartedRecorded();
394     fakeClock.forwardTime(5, SECONDS);
395     String message = "String of length 20.";
396     call.sendMessage(message);
397     assertOutboundMessageRecorded();
398     ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
399     serverCall.request(2);
400     assertOutboundWireSizeRecorded(message.length());
401     // trigger retry
402     serverCall.close(
403         Status.UNAVAILABLE.withDescription("original attempt failed"),
404         new Metadata());
405     assertRpcStatusRecorded(Code.UNAVAILABLE, 5000, 1);
406     elapseBackoff(10, SECONDS);
407     assertRpcStartedRecorded();
408     assertOutboundMessageRecorded();
409     serverCall = serverCalls.poll(5, SECONDS);
410     serverCall.request(2);
411     assertOutboundWireSizeRecorded(message.length());
412     fakeClock.forwardTime(7, SECONDS);
413     // A noop substream will commit. But call is not yet closed.
414     call.cancel("Cancelled before commit", null);
415     // Let the netty substream listener be closed.
416     streamClosedLatch.countDown();
417     // The call listener is closed.
418     verify(mockCallListener, timeout(5000)).onClose(any(Status.class), any(Metadata.class));
419     assertRpcStatusRecorded(Code.CANCELLED, 17_000, 1);
420     assertRetryStatsRecorded(1, 0, 0);
421   }
422 
423   @Test
serverCancelledAndClientDeadlineExceeded()424   public void serverCancelledAndClientDeadlineExceeded() throws Exception {
425     startNewServer();
426     createNewChannel();
427 
428     class CloseDelayedTracer extends ClientStreamTracer {
429       @Override
430       public void streamClosed(Status status) {
431         fakeClock.forwardTime(10, SECONDS);
432       }
433     }
434 
435     class CloseDelayedTracerFactory extends ClientStreamTracer.Factory {
436       @Override
437       public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
438         return new CloseDelayedTracer();
439       }
440     }
441 
442     CallOptions callOptions = CallOptions.DEFAULT
443         .withDeadline(Deadline.after(
444             10,
445             SECONDS,
446             new Ticker() {
447               @Override
448               public long nanoTime() {
449                 return fakeClock.getTicker().read();
450               }
451             }))
452         .withStreamTracerFactory(new CloseDelayedTracerFactory());
453     ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions);
454     call.start(mockCallListener, new Metadata());
455     assertRpcStartedRecorded();
456     ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
457     serverCall.close(Status.CANCELLED, new Metadata());
458     assertRpcStatusRecorded(Code.DEADLINE_EXCEEDED, 10_000, 0);
459     assertRetryStatsRecorded(0, 0, 0);
460   }
461 
462   @Test
transparentRetryStatsRecorded()463   public void transparentRetryStatsRecorded() throws Exception {
464     startNewServer();
465     createNewChannel();
466 
467     final AtomicBoolean originalAttemptFailed = new AtomicBoolean();
468     class TransparentRetryTriggeringTracer extends ClientStreamTracer {
469 
470       @Override
471       public void streamCreated(Attributes transportAttrs, Metadata metadata) {
472         if (originalAttemptFailed.get()) {
473           return;
474         }
475         // Send GOAWAY from server. The client may either receive GOAWAY or create the underlying
476         // netty stream and write headers first, even we await server termination as below.
477         // In the latter case, we rerun the test. We can also call localServer.shutdown() to trigger
478         // GOAWAY, but it takes a lot longer time to gracefully shut down.
479         localServer.shutdownNow();
480         try {
481           localServer.awaitTermination();
482         } catch (InterruptedException e) {
483           Thread.currentThread().interrupt();
484           throw new AssertionError(e);
485         }
486       }
487 
488       @Override
489       public void streamClosed(Status status) {
490         if (originalAttemptFailed.get()) {
491           return;
492         }
493         originalAttemptFailed.set(true);
494         try {
495           startNewServer();
496           channel.resetConnectBackoff();
497         } catch (Exception e) {
498           throw new AssertionError("local server can not be restarted", e);
499         }
500       }
501     }
502 
503     class TransparentRetryTracerFactory extends ClientStreamTracer.Factory {
504       @Override
505       public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
506         return new TransparentRetryTriggeringTracer();
507       }
508     }
509 
510     CallOptions callOptions = CallOptions.DEFAULT
511         .withWaitForReady()
512         .withStreamTracerFactory(new TransparentRetryTracerFactory());
513     while (true) {
514       ClientCall<String, Integer> call = channel.newCall(clientStreamingMethod, callOptions);
515       call.start(mockCallListener, new Metadata());
516       assertRpcStartedRecorded(); // original attempt
517       MetricsRecord record = clientStatsRecorder.pollRecord(5, SECONDS);
518       assertThat(record.getMetricAsLongOrFail(DeprecatedCensusConstants.RPC_CLIENT_FINISHED_COUNT))
519           .isEqualTo(1);
520       TagValue statusTag = record.tags.get(RpcMeasureConstants.GRPC_CLIENT_STATUS);
521       if (statusTag.asString().equals(Code.UNAVAILABLE.toString())) {
522         break;
523       } else {
524         // Due to race condition, GOAWAY is not received/processed before the stream is closed due
525         // to connection error. Rerun the test.
526         assertThat(statusTag.asString()).isEqualTo(Code.UNKNOWN.toString());
527         assertRetryStatsRecorded(0, 0, 0);
528         originalAttemptFailed.set(false);
529       }
530     }
531     assertRpcStartedRecorded(); // retry attempt
532     ServerCall<String, Integer> serverCall = serverCalls.poll(5, SECONDS);
533     serverCall.close(Status.INVALID_ARGUMENT, new Metadata());
534     assertRpcStatusRecorded(Code.INVALID_ARGUMENT, 0, 0);
535     assertRetryStatsRecorded(0, 1, 0);
536   }
537 }
538