• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Charsets.UTF_8;
20 import static com.google.common.truth.Truth.assertThat;
21 import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
22 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
23 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
24 import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
25 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
26 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
27 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
28 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
29 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
30 import static org.junit.Assert.assertEquals;
31 import static org.junit.Assert.assertFalse;
32 import static org.junit.Assert.assertNotNull;
33 import static org.junit.Assert.assertNull;
34 import static org.junit.Assert.assertSame;
35 import static org.junit.Assert.assertTrue;
36 import static org.junit.Assert.fail;
37 
38 import com.google.common.io.ByteStreams;
39 import com.google.common.util.concurrent.SettableFuture;
40 import io.grpc.Attributes;
41 import io.grpc.CallOptions;
42 import io.grpc.Grpc;
43 import io.grpc.InternalChannelz;
44 import io.grpc.Metadata;
45 import io.grpc.MethodDescriptor;
46 import io.grpc.MethodDescriptor.Marshaller;
47 import io.grpc.ServerStreamTracer;
48 import io.grpc.Status;
49 import io.grpc.Status.Code;
50 import io.grpc.StatusException;
51 import io.grpc.internal.ClientStream;
52 import io.grpc.internal.ClientStreamListener;
53 import io.grpc.internal.ClientTransport;
54 import io.grpc.internal.FakeClock;
55 import io.grpc.internal.GrpcUtil;
56 import io.grpc.internal.ManagedClientTransport;
57 import io.grpc.internal.ServerListener;
58 import io.grpc.internal.ServerStream;
59 import io.grpc.internal.ServerStreamListener;
60 import io.grpc.internal.ServerTransport;
61 import io.grpc.internal.ServerTransportListener;
62 import io.grpc.internal.TransportTracer;
63 import io.grpc.internal.testing.TestUtils;
64 import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
65 import io.netty.channel.ChannelConfig;
66 import io.netty.channel.ChannelOption;
67 import io.netty.channel.nio.NioEventLoopGroup;
68 import io.netty.channel.socket.SocketChannelConfig;
69 import io.netty.channel.socket.nio.NioServerSocketChannel;
70 import io.netty.channel.socket.nio.NioSocketChannel;
71 import io.netty.handler.codec.http2.StreamBufferingEncoder;
72 import io.netty.handler.ssl.ClientAuth;
73 import io.netty.handler.ssl.SslContext;
74 import io.netty.handler.ssl.SupportedCipherSuiteFilter;
75 import io.netty.util.AsciiString;
76 import java.io.ByteArrayInputStream;
77 import java.io.File;
78 import java.io.IOException;
79 import java.io.InputStream;
80 import java.net.InetSocketAddress;
81 import java.net.SocketAddress;
82 import java.util.ArrayList;
83 import java.util.Collections;
84 import java.util.HashMap;
85 import java.util.List;
86 import java.util.Map;
87 import java.util.concurrent.ExecutionException;
88 import java.util.concurrent.LinkedBlockingQueue;
89 import java.util.concurrent.TimeUnit;
90 import java.util.concurrent.TimeoutException;
91 import javax.annotation.Nullable;
92 import javax.net.ssl.SSLHandshakeException;
93 import org.junit.After;
94 import org.junit.Before;
95 import org.junit.Test;
96 import org.junit.runner.RunWith;
97 import org.junit.runners.JUnit4;
98 import org.mockito.Mock;
99 import org.mockito.MockitoAnnotations;
100 
101 /**
102  * Tests for {@link NettyClientTransport}.
103  */
104 @RunWith(JUnit4.class)
105 public class NettyClientTransportTest {
106   private static final SslContext SSL_CONTEXT = createSslContext();
107 
108   @Mock
109   private ManagedClientTransport.Listener clientTransportListener;
110 
111   private final List<NettyClientTransport> transports = new ArrayList<>();
112   private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
113       new LinkedBlockingQueue<>();
114   private final NioEventLoopGroup group = new NioEventLoopGroup(1);
115   private final EchoServerListener serverListener = new EchoServerListener();
116   private final InternalChannelz channelz = new InternalChannelz();
117   private Runnable tooManyPingsRunnable = new Runnable() {
118     // Throwing is useless in this method, because Netty doesn't propagate the exception
119     @Override public void run() {}
120   };
121   private Attributes eagAttributes = Attributes.EMPTY;
122 
123   private ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(SSL_CONTEXT);
124 
125   private InetSocketAddress address;
126   private String authority;
127   private NettyServer server;
128 
129   @Before
setup()130   public void setup() {
131     MockitoAnnotations.initMocks(this);
132   }
133 
134   @After
teardown()135   public void teardown() throws Exception {
136     for (NettyClientTransport transport : transports) {
137       transport.shutdown(Status.UNAVAILABLE);
138     }
139 
140     if (server != null) {
141       server.shutdown();
142     }
143 
144     group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
145   }
146 
147   @Test
testToString()148   public void testToString() throws Exception {
149     address = TestUtils.testServerAddress(12345);
150     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
151     String s = newTransport(newNegotiator()).toString();
152     transports.clear();
153     assertTrue("Unexpected: " + s, s.contains("NettyClientTransport"));
154     assertTrue("Unexpected: " + s, s.contains(address.toString()));
155   }
156 
157   @Test
addDefaultUserAgent()158   public void addDefaultUserAgent() throws Exception {
159     startServer();
160     NettyClientTransport transport = newTransport(newNegotiator());
161     callMeMaybe(transport.start(clientTransportListener));
162 
163     // Send a single RPC and wait for the response.
164     new Rpc(transport).halfClose().waitForResponse();
165 
166     // Verify that the received headers contained the User-Agent.
167     assertEquals(1, serverListener.streamListeners.size());
168 
169     Metadata headers = serverListener.streamListeners.get(0).headers;
170     assertEquals(GrpcUtil.getGrpcUserAgent("netty", null), headers.get(USER_AGENT_KEY));
171   }
172 
173   @Test
setSoLingerChannelOption()174   public void setSoLingerChannelOption() throws IOException {
175     startServer();
176     Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();
177     // set SO_LINGER option
178     int soLinger = 123;
179     channelOptions.put(ChannelOption.SO_LINGER, soLinger);
180     NettyClientTransport transport = new NettyClientTransport(
181         address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
182         DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
183         KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
184         tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker());
185     transports.add(transport);
186     callMeMaybe(transport.start(clientTransportListener));
187 
188     // verify SO_LINGER has been set
189     ChannelConfig config = transport.channel().config();
190     assertTrue(config instanceof SocketChannelConfig);
191     assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger());
192   }
193 
194   @Test
overrideDefaultUserAgent()195   public void overrideDefaultUserAgent() throws Exception {
196     startServer();
197     NettyClientTransport transport = newTransport(newNegotiator(),
198         DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
199     callMeMaybe(transport.start(clientTransportListener));
200 
201     new Rpc(transport, new Metadata()).halfClose().waitForResponse();
202 
203     // Verify that the received headers contained the User-Agent.
204     assertEquals(1, serverListener.streamListeners.size());
205     Metadata receivedHeaders = serverListener.streamListeners.get(0).headers;
206     assertEquals(GrpcUtil.getGrpcUserAgent("netty", "testUserAgent"),
207         receivedHeaders.get(USER_AGENT_KEY));
208   }
209 
210   @Test
maxMessageSizeShouldBeEnforced()211   public void maxMessageSizeShouldBeEnforced() throws Throwable {
212     startServer();
213     // Allow the response payloads of up to 1 byte.
214     NettyClientTransport transport = newTransport(newNegotiator(),
215         1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
216     callMeMaybe(transport.start(clientTransportListener));
217 
218     try {
219       // Send a single RPC and wait for the response.
220       new Rpc(transport).halfClose().waitForResponse();
221       fail("Expected the stream to fail.");
222     } catch (ExecutionException e) {
223       Status status = Status.fromThrowable(e);
224       assertEquals(Code.RESOURCE_EXHAUSTED, status.getCode());
225       assertTrue("Missing exceeds maximum from: " + status.getDescription(),
226           status.getDescription().contains("exceeds maximum"));
227     }
228   }
229 
230   /**
231    * Verifies that we can create multiple TLS client transports from the same builder.
232    */
233   @Test
creatingMultipleTlsTransportsShouldSucceed()234   public void creatingMultipleTlsTransportsShouldSucceed() throws Exception {
235     startServer();
236 
237     // Create a couple client transports.
238     ProtocolNegotiator negotiator = newNegotiator();
239     for (int index = 0; index < 2; ++index) {
240       NettyClientTransport transport = newTransport(negotiator);
241       callMeMaybe(transport.start(clientTransportListener));
242     }
243 
244     // Send a single RPC on each transport.
245     final List<Rpc> rpcs = new ArrayList<>(transports.size());
246     for (NettyClientTransport transport : transports) {
247       rpcs.add(new Rpc(transport).halfClose());
248     }
249 
250     // Wait for the RPCs to complete.
251     for (Rpc rpc : rpcs) {
252       rpc.waitForResponse();
253     }
254   }
255 
256   @Test
negotiationFailurePropagatesToStatus()257   public void negotiationFailurePropagatesToStatus() throws Exception {
258     negotiator = ProtocolNegotiators.serverPlaintext();
259     startServer();
260 
261     final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator();
262     final NettyClientTransport transport = newTransport(negotiator);
263     callMeMaybe(transport.start(clientTransportListener));
264     final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
265     transport.channel().eventLoop().execute(new Runnable() {
266       @Override
267       public void run() {
268         negotiator.handler.fail(transport.channel().pipeline().context(negotiator.handler),
269             failureStatus.asRuntimeException());
270       }
271     });
272 
273     Rpc rpc = new Rpc(transport).halfClose();
274     try {
275       rpc.waitForClose();
276       fail("expected exception");
277     } catch (ExecutionException ex) {
278       assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
279     }
280   }
281 
282   @Test
tlsNegotiationFailurePropagatesToStatus()283   public void tlsNegotiationFailurePropagatesToStatus() throws Exception {
284     File serverCert = TestUtils.loadCert("server1.pem");
285     File serverKey = TestUtils.loadCert("server1.key");
286     // Don't trust ca.pem, so that client auth fails
287     SslContext sslContext = GrpcSslContexts.forServer(serverCert, serverKey)
288         .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
289         .clientAuth(ClientAuth.REQUIRE)
290         .build();
291     negotiator = ProtocolNegotiators.serverTls(sslContext);
292     startServer();
293 
294     File caCert = TestUtils.loadCert("ca.pem");
295     File clientCert = TestUtils.loadCert("client.pem");
296     File clientKey = TestUtils.loadCert("client.key");
297     SslContext clientContext = GrpcSslContexts.forClient()
298         .trustManager(caCert)
299         .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
300         .keyManager(clientCert, clientKey)
301         .build();
302     ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext);
303     final NettyClientTransport transport = newTransport(negotiator);
304     callMeMaybe(transport.start(clientTransportListener));
305 
306     Rpc rpc = new Rpc(transport).halfClose();
307     try {
308       rpc.waitForClose();
309       fail("expected exception");
310     } catch (ExecutionException ex) {
311       StatusException sre = (StatusException) ex.getCause();
312       assertEquals(Status.Code.UNAVAILABLE, sre.getStatus().getCode());
313       assertThat(sre.getCause()).isInstanceOf(SSLHandshakeException.class);
314       assertThat(sre.getCause().getMessage()).contains("SSLV3_ALERT_HANDSHAKE_FAILURE");
315     }
316   }
317 
318   @Test
channelExceptionDuringNegotiatonPropagatesToStatus()319   public void channelExceptionDuringNegotiatonPropagatesToStatus() throws Exception {
320     negotiator = ProtocolNegotiators.serverPlaintext();
321     startServer();
322 
323     NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator();
324     NettyClientTransport transport = newTransport(negotiator);
325     callMeMaybe(transport.start(clientTransportListener));
326     final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
327     transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException());
328 
329     Rpc rpc = new Rpc(transport).halfClose();
330     try {
331       rpc.waitForClose();
332       fail("expected exception");
333     } catch (ExecutionException ex) {
334       assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
335     }
336   }
337 
338   @Test
handlerExceptionDuringNegotiatonPropagatesToStatus()339   public void handlerExceptionDuringNegotiatonPropagatesToStatus() throws Exception {
340     negotiator = ProtocolNegotiators.serverPlaintext();
341     startServer();
342 
343     final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator();
344     final NettyClientTransport transport = newTransport(negotiator);
345     callMeMaybe(transport.start(clientTransportListener));
346     final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!");
347     transport.channel().eventLoop().execute(new Runnable() {
348       @Override
349       public void run() {
350         try {
351           negotiator.handler.exceptionCaught(
352               transport.channel().pipeline().context(negotiator.handler),
353               failureStatus.asRuntimeException());
354         } catch (Exception ex) {
355           throw new RuntimeException(ex);
356         }
357       }
358     });
359 
360     Rpc rpc = new Rpc(transport).halfClose();
361     try {
362       rpc.waitForClose();
363       fail("expected exception");
364     } catch (ExecutionException ex) {
365       assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
366     }
367   }
368 
369   @Test
bufferedStreamsShouldBeClosedWhenConnectionTerminates()370   public void bufferedStreamsShouldBeClosedWhenConnectionTerminates() throws Exception {
371     // Only allow a single stream active at a time.
372     startServer(1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
373 
374     NettyClientTransport transport = newTransport(newNegotiator());
375     callMeMaybe(transport.start(clientTransportListener));
376 
377     // Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS
378     // has been received by the remote endpoint.
379     new Rpc(transport).halfClose().waitForResponse();
380 
381     // Create 3 streams, but don't half-close. The transport will buffer the second and third.
382     Rpc[] rpcs = new Rpc[] { new Rpc(transport), new Rpc(transport), new Rpc(transport) };
383 
384     // Wait for the response for the stream that was actually created.
385     rpcs[0].waitForResponse();
386 
387     // Now forcibly terminate the connection from the server side.
388     serverListener.transports.get(0).channel().pipeline().firstContext().close();
389 
390     // Now wait for both listeners to be closed.
391     for (int i = 1; i < rpcs.length; i++) {
392       try {
393         rpcs[i].waitForClose();
394         fail("Expected the RPC to fail");
395       } catch (ExecutionException e) {
396         // Expected.
397         Throwable t = getRootCause(e);
398         // Make sure that the Http2ChannelClosedException got replaced with the real cause of
399         // the shutdown.
400         assertFalse(t instanceof StreamBufferingEncoder.Http2ChannelClosedException);
401       }
402     }
403   }
404 
405   public static class CantConstructChannel extends NioSocketChannel {
406     /** Constructor. It doesn't work. Feel free to try. But it doesn't work. */
CantConstructChannel()407     public CantConstructChannel() {
408       // Use an Error because we've seen cases of channels failing to construct due to classloading
409       // problems (like mixing different versions of Netty), and those involve Errors.
410       throw new CantConstructChannelError();
411     }
412   }
413 
414   private static class CantConstructChannelError extends Error {}
415 
416   @Test
failingToConstructChannelShouldFailGracefully()417   public void failingToConstructChannelShouldFailGracefully() throws Exception {
418     address = TestUtils.testServerAddress(12345);
419     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
420     NettyClientTransport transport = new NettyClientTransport(
421         address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group,
422         newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
423         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
424         null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker());
425     transports.add(transport);
426 
427     // Should not throw
428     callMeMaybe(transport.start(clientTransportListener));
429 
430     // And RPCs and PINGs should fail cleanly, reporting the failure
431     Rpc rpc = new Rpc(transport);
432     try {
433       rpc.waitForResponse();
434       fail("Expected exception");
435     } catch (Exception ex) {
436       if (!(getRootCause(ex) instanceof CantConstructChannelError)) {
437         throw new AssertionError("Could not find expected error", ex);
438       }
439     }
440 
441     final SettableFuture<Object> pingResult = SettableFuture.create();
442     FakeClock clock = new FakeClock();
443     ClientTransport.PingCallback pingCallback = new ClientTransport.PingCallback() {
444       @Override
445       public void onSuccess(long roundTripTimeNanos) {
446         pingResult.set(roundTripTimeNanos);
447       }
448 
449       @Override
450       public void onFailure(Throwable cause) {
451         pingResult.setException(cause);
452       }
453     };
454     transport.ping(pingCallback, clock.getScheduledExecutorService());
455     assertFalse(pingResult.isDone());
456     clock.runDueTasks();
457     assertTrue(pingResult.isDone());
458     try {
459       pingResult.get();
460       fail("Expected exception");
461     } catch (Exception ex) {
462       if (!(getRootCause(ex) instanceof CantConstructChannelError)) {
463         throw new AssertionError("Could not find expected error", ex);
464       }
465     }
466   }
467 
468   @Test
maxHeaderListSizeShouldBeEnforcedOnClient()469   public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception {
470     startServer();
471 
472     NettyClientTransport transport =
473         newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
474     callMeMaybe(transport.start(clientTransportListener));
475 
476     try {
477       // Send a single RPC and wait for the response.
478       new Rpc(transport, new Metadata()).halfClose().waitForResponse();
479       fail("The stream should have been failed due to client received header exceeds header list"
480           + " size limit!");
481     } catch (Exception e) {
482       Throwable rootCause = getRootCause(e);
483       Status status = ((StatusException) rootCause).getStatus();
484       assertEquals(Status.Code.INTERNAL, status.getCode());
485       assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream",
486           status.getDescription());
487     }
488   }
489 
490   @Test
maxHeaderListSizeShouldBeEnforcedOnServer()491   public void maxHeaderListSizeShouldBeEnforcedOnServer() throws Exception {
492     startServer(100, 1);
493 
494     NettyClientTransport transport = newTransport(newNegotiator());
495     callMeMaybe(transport.start(clientTransportListener));
496 
497     try {
498       // Send a single RPC and wait for the response.
499       new Rpc(transport, new Metadata()).halfClose().waitForResponse();
500       fail("The stream should have been failed due to server received header exceeds header list"
501           + " size limit!");
502     } catch (Exception e) {
503       Status status = Status.fromThrowable(e);
504       assertEquals(status.toString(), Status.Code.INTERNAL, status.getCode());
505     }
506   }
507 
508   @Test
getAttributes_negotiatorHandler()509   public void getAttributes_negotiatorHandler() throws Exception {
510     address = TestUtils.testServerAddress(12345);
511     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
512 
513     NettyClientTransport transport = newTransport(new NoopProtocolNegotiator());
514     callMeMaybe(transport.start(clientTransportListener));
515 
516     assertEquals(Attributes.EMPTY, transport.getAttributes());
517   }
518 
519   @Test
getEagAttributes_negotiatorHandler()520   public void getEagAttributes_negotiatorHandler() throws Exception {
521     address = TestUtils.testServerAddress(12345);
522     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
523 
524     NoopProtocolNegotiator npn = new NoopProtocolNegotiator();
525     eagAttributes = Attributes.newBuilder()
526         .set(Attributes.Key.create("trash"), "value")
527         .build();
528     NettyClientTransport transport = newTransport(npn);
529     callMeMaybe(transport.start(clientTransportListener));
530 
531     // EAG Attributes are available before the negotiation is complete
532     assertSame(eagAttributes, npn.grpcHandler.getEagAttributes());
533   }
534 
535   @Test
clientStreamGetsAttributes()536   public void clientStreamGetsAttributes() throws Exception {
537     startServer();
538     NettyClientTransport transport = newTransport(newNegotiator());
539     callMeMaybe(transport.start(clientTransportListener));
540     Rpc rpc = new Rpc(transport).halfClose();
541     rpc.waitForResponse();
542 
543     assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
544     assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
545     Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
546     assertNotNull(serverTransportAttrs);
547     SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
548     assertNotNull(clientAddr);
549     assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
550   }
551 
552   @Test
keepAliveEnabled()553   public void keepAliveEnabled() throws Exception {
554     startServer();
555     NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
556         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
557     callMeMaybe(transport.start(clientTransportListener));
558     Rpc rpc = new Rpc(transport).halfClose();
559     rpc.waitForResponse();
560 
561     assertNotNull(transport.keepAliveManager());
562   }
563 
564   @Test
keepAliveDisabled()565   public void keepAliveDisabled() throws Exception {
566     startServer();
567     NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
568         GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
569     callMeMaybe(transport.start(clientTransportListener));
570     Rpc rpc = new Rpc(transport).halfClose();
571     rpc.waitForResponse();
572 
573     assertNull(transport.keepAliveManager());
574   }
575 
getRootCause(Throwable t)576   private Throwable getRootCause(Throwable t) {
577     if (t.getCause() == null) {
578       return t;
579     }
580     return getRootCause(t.getCause());
581   }
582 
newNegotiator()583   private ProtocolNegotiator newNegotiator() throws IOException {
584     File caCert = TestUtils.loadCert("ca.pem");
585     SslContext clientContext = GrpcSslContexts.forClient().trustManager(caCert)
586         .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
587     return ProtocolNegotiators.tls(clientContext);
588   }
589 
newTransport(ProtocolNegotiator negotiator)590   private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
591     return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
592         null /* user agent */, true /* keep alive */);
593   }
594 
newTransport(ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent, boolean enableKeepAlive)595   private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
596       int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
597     long keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
598     long keepAliveTimeoutNano = TimeUnit.SECONDS.toNanos(1L);
599     if (enableKeepAlive) {
600       keepAliveTimeNano = TimeUnit.SECONDS.toNanos(10L);
601     }
602     NettyClientTransport transport = new NettyClientTransport(
603         address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
604         DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
605         keepAliveTimeNano, keepAliveTimeoutNano,
606         false, authority, userAgent, tooManyPingsRunnable,
607         new TransportTracer(), eagAttributes, new SocketPicker());
608     transports.add(transport);
609     return transport;
610   }
611 
startServer()612   private void startServer() throws IOException {
613     startServer(100, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
614   }
615 
startServer(int maxStreamsPerConnection, int maxHeaderListSize)616   private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
617     server = new NettyServer(
618         TestUtils.testServerAddress(0),
619         NioServerSocketChannel.class,
620         new HashMap<ChannelOption<?>, Object>(),
621         group, group, negotiator,
622         Collections.<ServerStreamTracer.Factory>emptyList(),
623         TransportTracer.getDefaultFactory(),
624         maxStreamsPerConnection,
625         DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
626         DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
627         MAX_CONNECTION_IDLE_NANOS_DISABLED,
628         MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
629         channelz);
630     server.start(serverListener);
631     address = TestUtils.testServerAddress(server.getPort());
632     authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
633   }
634 
callMeMaybe(Runnable r)635   private void callMeMaybe(Runnable r) {
636     if (r != null) {
637       r.run();
638     }
639   }
640 
createSslContext()641   private static SslContext createSslContext() {
642     try {
643       File serverCert = TestUtils.loadCert("server1.pem");
644       File key = TestUtils.loadCert("server1.key");
645       return GrpcSslContexts.forServer(serverCert, key)
646           .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
647     } catch (IOException ex) {
648       throw new RuntimeException(ex);
649     }
650   }
651 
652   private static class Rpc {
653     static final String MESSAGE = "hello";
654     static final MethodDescriptor<String, String> METHOD =
655         MethodDescriptor.<String, String>newBuilder()
656             .setType(MethodDescriptor.MethodType.UNARY)
657             .setFullMethodName("testService/test")
658             .setRequestMarshaller(StringMarshaller.INSTANCE)
659             .setResponseMarshaller(StringMarshaller.INSTANCE)
660             .build();
661 
662     final ClientStream stream;
663     final TestClientStreamListener listener = new TestClientStreamListener();
664 
Rpc(NettyClientTransport transport)665     Rpc(NettyClientTransport transport) {
666       this(transport, new Metadata());
667     }
668 
Rpc(NettyClientTransport transport, Metadata headers)669     Rpc(NettyClientTransport transport, Metadata headers) {
670       stream = transport.newStream(METHOD, headers, CallOptions.DEFAULT);
671       stream.start(listener);
672       stream.request(1);
673       stream.writeMessage(new ByteArrayInputStream(MESSAGE.getBytes(UTF_8)));
674       stream.flush();
675     }
676 
halfClose()677     Rpc halfClose() {
678       stream.halfClose();
679       return this;
680     }
681 
waitForResponse()682     void waitForResponse() throws InterruptedException, ExecutionException, TimeoutException {
683       listener.responseFuture.get(10, TimeUnit.SECONDS);
684     }
685 
waitForClose()686     void waitForClose() throws InterruptedException, ExecutionException, TimeoutException {
687       listener.closedFuture.get(10, TimeUnit.SECONDS);
688     }
689   }
690 
691   private static final class TestClientStreamListener implements ClientStreamListener {
692     final SettableFuture<Void> closedFuture = SettableFuture.create();
693     final SettableFuture<Void> responseFuture = SettableFuture.create();
694 
695     @Override
headersRead(Metadata headers)696     public void headersRead(Metadata headers) {
697     }
698 
699     @Override
closed(Status status, Metadata trailers)700     public void closed(Status status, Metadata trailers) {
701       closed(status, RpcProgress.PROCESSED, trailers);
702     }
703 
704     @Override
closed(Status status, RpcProgress rpcProgress, Metadata trailers)705     public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
706       if (status.isOk()) {
707         closedFuture.set(null);
708       } else {
709         StatusException e = status.asException();
710         closedFuture.setException(e);
711         responseFuture.setException(e);
712       }
713     }
714 
715     @Override
messagesAvailable(MessageProducer producer)716     public void messagesAvailable(MessageProducer producer) {
717       if (producer.next() != null) {
718         responseFuture.set(null);
719       }
720     }
721 
722     @Override
onReady()723     public void onReady() {
724     }
725   }
726 
727   private static final class EchoServerStreamListener implements ServerStreamListener {
728     final ServerStream stream;
729     final String method;
730     final Metadata headers;
731 
EchoServerStreamListener(ServerStream stream, String method, Metadata headers)732     EchoServerStreamListener(ServerStream stream, String method, Metadata headers) {
733       this.stream = stream;
734       this.method = method;
735       this.headers = headers;
736     }
737 
738     @Override
messagesAvailable(MessageProducer producer)739     public void messagesAvailable(MessageProducer producer) {
740       InputStream message;
741       while ((message = producer.next()) != null) {
742         // Just echo back the message.
743         stream.writeMessage(message);
744         stream.flush();
745       }
746     }
747 
748     @Override
onReady()749     public void onReady() {
750     }
751 
752     @Override
halfClosed()753     public void halfClosed() {
754       // Just close when the client closes.
755       stream.close(Status.OK, new Metadata());
756     }
757 
758     @Override
closed(Status status)759     public void closed(Status status) {
760     }
761   }
762 
763   private final class EchoServerListener implements ServerListener {
764     final List<NettyServerTransport> transports = new ArrayList<>();
765     final List<EchoServerStreamListener> streamListeners =
766             Collections.synchronizedList(new ArrayList<EchoServerStreamListener>());
767 
768     @Override
transportCreated(final ServerTransport transport)769     public ServerTransportListener transportCreated(final ServerTransport transport) {
770       transports.add((NettyServerTransport) transport);
771       return new ServerTransportListener() {
772         @Override
773         public void streamCreated(ServerStream stream, String method, Metadata headers) {
774           EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers);
775           stream.setListener(listener);
776           stream.writeHeaders(new Metadata());
777           stream.request(1);
778           streamListeners.add(listener);
779         }
780 
781         @Override
782         public Attributes transportReady(Attributes transportAttrs) {
783           serverTransportAttributesList.add(transportAttrs);
784           return transportAttrs;
785         }
786 
787         @Override
788         public void transportTerminated() {}
789       };
790     }
791 
792     @Override
serverShutdown()793     public void serverShutdown() {
794     }
795   }
796 
797   private static final class StringMarshaller implements Marshaller<String> {
798     static final StringMarshaller INSTANCE = new StringMarshaller();
799 
800     @Override
801     public InputStream stream(String value) {
802       return new ByteArrayInputStream(value.getBytes(UTF_8));
803     }
804 
805     @Override
806     public String parse(InputStream stream) {
807       try {
808         return new String(ByteStreams.toByteArray(stream), UTF_8);
809       } catch (IOException ex) {
810         throw new RuntimeException(ex);
811       }
812     }
813   }
814 
815   private static class NoopHandler extends ProtocolNegotiators.AbstractBufferingHandler
816       implements ProtocolNegotiator.Handler {
817     public NoopHandler(GrpcHttp2ConnectionHandler grpcHandler) {
818       super(grpcHandler);
819     }
820 
821     @Override
822     public AsciiString scheme() {
823       return Utils.HTTP;
824     }
825   }
826 
827   private static class NoopProtocolNegotiator implements ProtocolNegotiator {
828     GrpcHttp2ConnectionHandler grpcHandler;
829     NoopHandler handler;
830 
831     @Override
832     public Handler newHandler(final GrpcHttp2ConnectionHandler grpcHandler) {
833       this.grpcHandler = grpcHandler;
834       return handler = new NoopHandler(grpcHandler);
835     }
836 
837     @Override
838     public void close() {}
839   }
840 
841   private static final class SocketPicker extends LocalSocketPicker {
842 
843     @Nullable
844     @Override
845     public SocketAddress createSocketAddress(SocketAddress remoteAddress, Attributes attrs) {
846       return null;
847     }
848   }
849 }
850