1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Charsets.UTF_8; 20 import static com.google.common.truth.Truth.assertThat; 21 import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; 22 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; 23 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; 24 import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; 25 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; 26 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; 27 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; 28 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; 29 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; 30 import static org.junit.Assert.assertEquals; 31 import static org.junit.Assert.assertFalse; 32 import static org.junit.Assert.assertNotNull; 33 import static org.junit.Assert.assertNull; 34 import static org.junit.Assert.assertSame; 35 import static org.junit.Assert.assertTrue; 36 import static org.junit.Assert.fail; 37 38 import com.google.common.io.ByteStreams; 39 import com.google.common.util.concurrent.SettableFuture; 40 import io.grpc.Attributes; 41 import io.grpc.CallOptions; 42 import io.grpc.Grpc; 43 import io.grpc.InternalChannelz; 44 import io.grpc.Metadata; 45 import io.grpc.MethodDescriptor; 46 import io.grpc.MethodDescriptor.Marshaller; 47 import io.grpc.ServerStreamTracer; 48 import io.grpc.Status; 49 import io.grpc.Status.Code; 50 import io.grpc.StatusException; 51 import io.grpc.internal.ClientStream; 52 import io.grpc.internal.ClientStreamListener; 53 import io.grpc.internal.ClientTransport; 54 import io.grpc.internal.FakeClock; 55 import io.grpc.internal.GrpcUtil; 56 import io.grpc.internal.ManagedClientTransport; 57 import io.grpc.internal.ServerListener; 58 import io.grpc.internal.ServerStream; 59 import io.grpc.internal.ServerStreamListener; 60 import io.grpc.internal.ServerTransport; 61 import io.grpc.internal.ServerTransportListener; 62 import io.grpc.internal.TransportTracer; 63 import io.grpc.internal.testing.TestUtils; 64 import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; 65 import io.netty.channel.ChannelConfig; 66 import io.netty.channel.ChannelOption; 67 import io.netty.channel.nio.NioEventLoopGroup; 68 import io.netty.channel.socket.SocketChannelConfig; 69 import io.netty.channel.socket.nio.NioServerSocketChannel; 70 import io.netty.channel.socket.nio.NioSocketChannel; 71 import io.netty.handler.codec.http2.StreamBufferingEncoder; 72 import io.netty.handler.ssl.ClientAuth; 73 import io.netty.handler.ssl.SslContext; 74 import io.netty.handler.ssl.SupportedCipherSuiteFilter; 75 import io.netty.util.AsciiString; 76 import java.io.ByteArrayInputStream; 77 import java.io.File; 78 import java.io.IOException; 79 import java.io.InputStream; 80 import java.net.InetSocketAddress; 81 import java.net.SocketAddress; 82 import java.util.ArrayList; 83 import java.util.Collections; 84 import java.util.HashMap; 85 import java.util.List; 86 import java.util.Map; 87 import java.util.concurrent.ExecutionException; 88 import java.util.concurrent.LinkedBlockingQueue; 89 import java.util.concurrent.TimeUnit; 90 import java.util.concurrent.TimeoutException; 91 import javax.annotation.Nullable; 92 import javax.net.ssl.SSLHandshakeException; 93 import org.junit.After; 94 import org.junit.Before; 95 import org.junit.Test; 96 import org.junit.runner.RunWith; 97 import org.junit.runners.JUnit4; 98 import org.mockito.Mock; 99 import org.mockito.MockitoAnnotations; 100 101 /** 102 * Tests for {@link NettyClientTransport}. 103 */ 104 @RunWith(JUnit4.class) 105 public class NettyClientTransportTest { 106 private static final SslContext SSL_CONTEXT = createSslContext(); 107 108 @Mock 109 private ManagedClientTransport.Listener clientTransportListener; 110 111 private final List<NettyClientTransport> transports = new ArrayList<>(); 112 private final LinkedBlockingQueue<Attributes> serverTransportAttributesList = 113 new LinkedBlockingQueue<>(); 114 private final NioEventLoopGroup group = new NioEventLoopGroup(1); 115 private final EchoServerListener serverListener = new EchoServerListener(); 116 private final InternalChannelz channelz = new InternalChannelz(); 117 private Runnable tooManyPingsRunnable = new Runnable() { 118 // Throwing is useless in this method, because Netty doesn't propagate the exception 119 @Override public void run() {} 120 }; 121 private Attributes eagAttributes = Attributes.EMPTY; 122 123 private ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(SSL_CONTEXT); 124 125 private InetSocketAddress address; 126 private String authority; 127 private NettyServer server; 128 129 @Before setup()130 public void setup() { 131 MockitoAnnotations.initMocks(this); 132 } 133 134 @After teardown()135 public void teardown() throws Exception { 136 for (NettyClientTransport transport : transports) { 137 transport.shutdown(Status.UNAVAILABLE); 138 } 139 140 if (server != null) { 141 server.shutdown(); 142 } 143 144 group.shutdownGracefully(0, 10, TimeUnit.SECONDS); 145 } 146 147 @Test testToString()148 public void testToString() throws Exception { 149 address = TestUtils.testServerAddress(12345); 150 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 151 String s = newTransport(newNegotiator()).toString(); 152 transports.clear(); 153 assertTrue("Unexpected: " + s, s.contains("NettyClientTransport")); 154 assertTrue("Unexpected: " + s, s.contains(address.toString())); 155 } 156 157 @Test addDefaultUserAgent()158 public void addDefaultUserAgent() throws Exception { 159 startServer(); 160 NettyClientTransport transport = newTransport(newNegotiator()); 161 callMeMaybe(transport.start(clientTransportListener)); 162 163 // Send a single RPC and wait for the response. 164 new Rpc(transport).halfClose().waitForResponse(); 165 166 // Verify that the received headers contained the User-Agent. 167 assertEquals(1, serverListener.streamListeners.size()); 168 169 Metadata headers = serverListener.streamListeners.get(0).headers; 170 assertEquals(GrpcUtil.getGrpcUserAgent("netty", null), headers.get(USER_AGENT_KEY)); 171 } 172 173 @Test setSoLingerChannelOption()174 public void setSoLingerChannelOption() throws IOException { 175 startServer(); 176 Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>(); 177 // set SO_LINGER option 178 int soLinger = 123; 179 channelOptions.put(ChannelOption.SO_LINGER, soLinger); 180 NettyClientTransport transport = new NettyClientTransport( 181 address, NioSocketChannel.class, channelOptions, group, newNegotiator(), 182 DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, 183 KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, 184 tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker()); 185 transports.add(transport); 186 callMeMaybe(transport.start(clientTransportListener)); 187 188 // verify SO_LINGER has been set 189 ChannelConfig config = transport.channel().config(); 190 assertTrue(config instanceof SocketChannelConfig); 191 assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger()); 192 } 193 194 @Test overrideDefaultUserAgent()195 public void overrideDefaultUserAgent() throws Exception { 196 startServer(); 197 NettyClientTransport transport = newTransport(newNegotiator(), 198 DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true); 199 callMeMaybe(transport.start(clientTransportListener)); 200 201 new Rpc(transport, new Metadata()).halfClose().waitForResponse(); 202 203 // Verify that the received headers contained the User-Agent. 204 assertEquals(1, serverListener.streamListeners.size()); 205 Metadata receivedHeaders = serverListener.streamListeners.get(0).headers; 206 assertEquals(GrpcUtil.getGrpcUserAgent("netty", "testUserAgent"), 207 receivedHeaders.get(USER_AGENT_KEY)); 208 } 209 210 @Test maxMessageSizeShouldBeEnforced()211 public void maxMessageSizeShouldBeEnforced() throws Throwable { 212 startServer(); 213 // Allow the response payloads of up to 1 byte. 214 NettyClientTransport transport = newTransport(newNegotiator(), 215 1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true); 216 callMeMaybe(transport.start(clientTransportListener)); 217 218 try { 219 // Send a single RPC and wait for the response. 220 new Rpc(transport).halfClose().waitForResponse(); 221 fail("Expected the stream to fail."); 222 } catch (ExecutionException e) { 223 Status status = Status.fromThrowable(e); 224 assertEquals(Code.RESOURCE_EXHAUSTED, status.getCode()); 225 assertTrue("Missing exceeds maximum from: " + status.getDescription(), 226 status.getDescription().contains("exceeds maximum")); 227 } 228 } 229 230 /** 231 * Verifies that we can create multiple TLS client transports from the same builder. 232 */ 233 @Test creatingMultipleTlsTransportsShouldSucceed()234 public void creatingMultipleTlsTransportsShouldSucceed() throws Exception { 235 startServer(); 236 237 // Create a couple client transports. 238 ProtocolNegotiator negotiator = newNegotiator(); 239 for (int index = 0; index < 2; ++index) { 240 NettyClientTransport transport = newTransport(negotiator); 241 callMeMaybe(transport.start(clientTransportListener)); 242 } 243 244 // Send a single RPC on each transport. 245 final List<Rpc> rpcs = new ArrayList<>(transports.size()); 246 for (NettyClientTransport transport : transports) { 247 rpcs.add(new Rpc(transport).halfClose()); 248 } 249 250 // Wait for the RPCs to complete. 251 for (Rpc rpc : rpcs) { 252 rpc.waitForResponse(); 253 } 254 } 255 256 @Test negotiationFailurePropagatesToStatus()257 public void negotiationFailurePropagatesToStatus() throws Exception { 258 negotiator = ProtocolNegotiators.serverPlaintext(); 259 startServer(); 260 261 final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator(); 262 final NettyClientTransport transport = newTransport(negotiator); 263 callMeMaybe(transport.start(clientTransportListener)); 264 final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); 265 transport.channel().eventLoop().execute(new Runnable() { 266 @Override 267 public void run() { 268 negotiator.handler.fail(transport.channel().pipeline().context(negotiator.handler), 269 failureStatus.asRuntimeException()); 270 } 271 }); 272 273 Rpc rpc = new Rpc(transport).halfClose(); 274 try { 275 rpc.waitForClose(); 276 fail("expected exception"); 277 } catch (ExecutionException ex) { 278 assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus()); 279 } 280 } 281 282 @Test tlsNegotiationFailurePropagatesToStatus()283 public void tlsNegotiationFailurePropagatesToStatus() throws Exception { 284 File serverCert = TestUtils.loadCert("server1.pem"); 285 File serverKey = TestUtils.loadCert("server1.key"); 286 // Don't trust ca.pem, so that client auth fails 287 SslContext sslContext = GrpcSslContexts.forServer(serverCert, serverKey) 288 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE) 289 .clientAuth(ClientAuth.REQUIRE) 290 .build(); 291 negotiator = ProtocolNegotiators.serverTls(sslContext); 292 startServer(); 293 294 File caCert = TestUtils.loadCert("ca.pem"); 295 File clientCert = TestUtils.loadCert("client.pem"); 296 File clientKey = TestUtils.loadCert("client.key"); 297 SslContext clientContext = GrpcSslContexts.forClient() 298 .trustManager(caCert) 299 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE) 300 .keyManager(clientCert, clientKey) 301 .build(); 302 ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext); 303 final NettyClientTransport transport = newTransport(negotiator); 304 callMeMaybe(transport.start(clientTransportListener)); 305 306 Rpc rpc = new Rpc(transport).halfClose(); 307 try { 308 rpc.waitForClose(); 309 fail("expected exception"); 310 } catch (ExecutionException ex) { 311 StatusException sre = (StatusException) ex.getCause(); 312 assertEquals(Status.Code.UNAVAILABLE, sre.getStatus().getCode()); 313 assertThat(sre.getCause()).isInstanceOf(SSLHandshakeException.class); 314 assertThat(sre.getCause().getMessage()).contains("SSLV3_ALERT_HANDSHAKE_FAILURE"); 315 } 316 } 317 318 @Test channelExceptionDuringNegotiatonPropagatesToStatus()319 public void channelExceptionDuringNegotiatonPropagatesToStatus() throws Exception { 320 negotiator = ProtocolNegotiators.serverPlaintext(); 321 startServer(); 322 323 NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator(); 324 NettyClientTransport transport = newTransport(negotiator); 325 callMeMaybe(transport.start(clientTransportListener)); 326 final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); 327 transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException()); 328 329 Rpc rpc = new Rpc(transport).halfClose(); 330 try { 331 rpc.waitForClose(); 332 fail("expected exception"); 333 } catch (ExecutionException ex) { 334 assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus()); 335 } 336 } 337 338 @Test handlerExceptionDuringNegotiatonPropagatesToStatus()339 public void handlerExceptionDuringNegotiatonPropagatesToStatus() throws Exception { 340 negotiator = ProtocolNegotiators.serverPlaintext(); 341 startServer(); 342 343 final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator(); 344 final NettyClientTransport transport = newTransport(negotiator); 345 callMeMaybe(transport.start(clientTransportListener)); 346 final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); 347 transport.channel().eventLoop().execute(new Runnable() { 348 @Override 349 public void run() { 350 try { 351 negotiator.handler.exceptionCaught( 352 transport.channel().pipeline().context(negotiator.handler), 353 failureStatus.asRuntimeException()); 354 } catch (Exception ex) { 355 throw new RuntimeException(ex); 356 } 357 } 358 }); 359 360 Rpc rpc = new Rpc(transport).halfClose(); 361 try { 362 rpc.waitForClose(); 363 fail("expected exception"); 364 } catch (ExecutionException ex) { 365 assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus()); 366 } 367 } 368 369 @Test bufferedStreamsShouldBeClosedWhenConnectionTerminates()370 public void bufferedStreamsShouldBeClosedWhenConnectionTerminates() throws Exception { 371 // Only allow a single stream active at a time. 372 startServer(1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); 373 374 NettyClientTransport transport = newTransport(newNegotiator()); 375 callMeMaybe(transport.start(clientTransportListener)); 376 377 // Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS 378 // has been received by the remote endpoint. 379 new Rpc(transport).halfClose().waitForResponse(); 380 381 // Create 3 streams, but don't half-close. The transport will buffer the second and third. 382 Rpc[] rpcs = new Rpc[] { new Rpc(transport), new Rpc(transport), new Rpc(transport) }; 383 384 // Wait for the response for the stream that was actually created. 385 rpcs[0].waitForResponse(); 386 387 // Now forcibly terminate the connection from the server side. 388 serverListener.transports.get(0).channel().pipeline().firstContext().close(); 389 390 // Now wait for both listeners to be closed. 391 for (int i = 1; i < rpcs.length; i++) { 392 try { 393 rpcs[i].waitForClose(); 394 fail("Expected the RPC to fail"); 395 } catch (ExecutionException e) { 396 // Expected. 397 Throwable t = getRootCause(e); 398 // Make sure that the Http2ChannelClosedException got replaced with the real cause of 399 // the shutdown. 400 assertFalse(t instanceof StreamBufferingEncoder.Http2ChannelClosedException); 401 } 402 } 403 } 404 405 public static class CantConstructChannel extends NioSocketChannel { 406 /** Constructor. It doesn't work. Feel free to try. But it doesn't work. */ CantConstructChannel()407 public CantConstructChannel() { 408 // Use an Error because we've seen cases of channels failing to construct due to classloading 409 // problems (like mixing different versions of Netty), and those involve Errors. 410 throw new CantConstructChannelError(); 411 } 412 } 413 414 private static class CantConstructChannelError extends Error {} 415 416 @Test failingToConstructChannelShouldFailGracefully()417 public void failingToConstructChannelShouldFailGracefully() throws Exception { 418 address = TestUtils.testServerAddress(12345); 419 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 420 NettyClientTransport transport = new NettyClientTransport( 421 address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group, 422 newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, 423 GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, 424 null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker()); 425 transports.add(transport); 426 427 // Should not throw 428 callMeMaybe(transport.start(clientTransportListener)); 429 430 // And RPCs and PINGs should fail cleanly, reporting the failure 431 Rpc rpc = new Rpc(transport); 432 try { 433 rpc.waitForResponse(); 434 fail("Expected exception"); 435 } catch (Exception ex) { 436 if (!(getRootCause(ex) instanceof CantConstructChannelError)) { 437 throw new AssertionError("Could not find expected error", ex); 438 } 439 } 440 441 final SettableFuture<Object> pingResult = SettableFuture.create(); 442 FakeClock clock = new FakeClock(); 443 ClientTransport.PingCallback pingCallback = new ClientTransport.PingCallback() { 444 @Override 445 public void onSuccess(long roundTripTimeNanos) { 446 pingResult.set(roundTripTimeNanos); 447 } 448 449 @Override 450 public void onFailure(Throwable cause) { 451 pingResult.setException(cause); 452 } 453 }; 454 transport.ping(pingCallback, clock.getScheduledExecutorService()); 455 assertFalse(pingResult.isDone()); 456 clock.runDueTasks(); 457 assertTrue(pingResult.isDone()); 458 try { 459 pingResult.get(); 460 fail("Expected exception"); 461 } catch (Exception ex) { 462 if (!(getRootCause(ex) instanceof CantConstructChannelError)) { 463 throw new AssertionError("Could not find expected error", ex); 464 } 465 } 466 } 467 468 @Test maxHeaderListSizeShouldBeEnforcedOnClient()469 public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception { 470 startServer(); 471 472 NettyClientTransport transport = 473 newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true); 474 callMeMaybe(transport.start(clientTransportListener)); 475 476 try { 477 // Send a single RPC and wait for the response. 478 new Rpc(transport, new Metadata()).halfClose().waitForResponse(); 479 fail("The stream should have been failed due to client received header exceeds header list" 480 + " size limit!"); 481 } catch (Exception e) { 482 Throwable rootCause = getRootCause(e); 483 Status status = ((StatusException) rootCause).getStatus(); 484 assertEquals(Status.Code.INTERNAL, status.getCode()); 485 assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream", 486 status.getDescription()); 487 } 488 } 489 490 @Test maxHeaderListSizeShouldBeEnforcedOnServer()491 public void maxHeaderListSizeShouldBeEnforcedOnServer() throws Exception { 492 startServer(100, 1); 493 494 NettyClientTransport transport = newTransport(newNegotiator()); 495 callMeMaybe(transport.start(clientTransportListener)); 496 497 try { 498 // Send a single RPC and wait for the response. 499 new Rpc(transport, new Metadata()).halfClose().waitForResponse(); 500 fail("The stream should have been failed due to server received header exceeds header list" 501 + " size limit!"); 502 } catch (Exception e) { 503 Status status = Status.fromThrowable(e); 504 assertEquals(status.toString(), Status.Code.INTERNAL, status.getCode()); 505 } 506 } 507 508 @Test getAttributes_negotiatorHandler()509 public void getAttributes_negotiatorHandler() throws Exception { 510 address = TestUtils.testServerAddress(12345); 511 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 512 513 NettyClientTransport transport = newTransport(new NoopProtocolNegotiator()); 514 callMeMaybe(transport.start(clientTransportListener)); 515 516 assertEquals(Attributes.EMPTY, transport.getAttributes()); 517 } 518 519 @Test getEagAttributes_negotiatorHandler()520 public void getEagAttributes_negotiatorHandler() throws Exception { 521 address = TestUtils.testServerAddress(12345); 522 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 523 524 NoopProtocolNegotiator npn = new NoopProtocolNegotiator(); 525 eagAttributes = Attributes.newBuilder() 526 .set(Attributes.Key.create("trash"), "value") 527 .build(); 528 NettyClientTransport transport = newTransport(npn); 529 callMeMaybe(transport.start(clientTransportListener)); 530 531 // EAG Attributes are available before the negotiation is complete 532 assertSame(eagAttributes, npn.grpcHandler.getEagAttributes()); 533 } 534 535 @Test clientStreamGetsAttributes()536 public void clientStreamGetsAttributes() throws Exception { 537 startServer(); 538 NettyClientTransport transport = newTransport(newNegotiator()); 539 callMeMaybe(transport.start(clientTransportListener)); 540 Rpc rpc = new Rpc(transport).halfClose(); 541 rpc.waitForResponse(); 542 543 assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION)); 544 assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); 545 Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS); 546 assertNotNull(serverTransportAttrs); 547 SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 548 assertNotNull(clientAddr); 549 assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR)); 550 } 551 552 @Test keepAliveEnabled()553 public void keepAliveEnabled() throws Exception { 554 startServer(); 555 NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 556 GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */); 557 callMeMaybe(transport.start(clientTransportListener)); 558 Rpc rpc = new Rpc(transport).halfClose(); 559 rpc.waitForResponse(); 560 561 assertNotNull(transport.keepAliveManager()); 562 } 563 564 @Test keepAliveDisabled()565 public void keepAliveDisabled() throws Exception { 566 startServer(); 567 NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 568 GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */); 569 callMeMaybe(transport.start(clientTransportListener)); 570 Rpc rpc = new Rpc(transport).halfClose(); 571 rpc.waitForResponse(); 572 573 assertNull(transport.keepAliveManager()); 574 } 575 getRootCause(Throwable t)576 private Throwable getRootCause(Throwable t) { 577 if (t.getCause() == null) { 578 return t; 579 } 580 return getRootCause(t.getCause()); 581 } 582 newNegotiator()583 private ProtocolNegotiator newNegotiator() throws IOException { 584 File caCert = TestUtils.loadCert("ca.pem"); 585 SslContext clientContext = GrpcSslContexts.forClient().trustManager(caCert) 586 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build(); 587 return ProtocolNegotiators.tls(clientContext); 588 } 589 newTransport(ProtocolNegotiator negotiator)590 private NettyClientTransport newTransport(ProtocolNegotiator negotiator) { 591 return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, 592 null /* user agent */, true /* keep alive */); 593 } 594 newTransport(ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent, boolean enableKeepAlive)595 private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize, 596 int maxHeaderListSize, String userAgent, boolean enableKeepAlive) { 597 long keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; 598 long keepAliveTimeoutNano = TimeUnit.SECONDS.toNanos(1L); 599 if (enableKeepAlive) { 600 keepAliveTimeNano = TimeUnit.SECONDS.toNanos(10L); 601 } 602 NettyClientTransport transport = new NettyClientTransport( 603 address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator, 604 DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, 605 keepAliveTimeNano, keepAliveTimeoutNano, 606 false, authority, userAgent, tooManyPingsRunnable, 607 new TransportTracer(), eagAttributes, new SocketPicker()); 608 transports.add(transport); 609 return transport; 610 } 611 startServer()612 private void startServer() throws IOException { 613 startServer(100, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); 614 } 615 startServer(int maxStreamsPerConnection, int maxHeaderListSize)616 private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException { 617 server = new NettyServer( 618 TestUtils.testServerAddress(0), 619 NioServerSocketChannel.class, 620 new HashMap<ChannelOption<?>, Object>(), 621 group, group, negotiator, 622 Collections.<ServerStreamTracer.Factory>emptyList(), 623 TransportTracer.getDefaultFactory(), 624 maxStreamsPerConnection, 625 DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, 626 DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, 627 MAX_CONNECTION_IDLE_NANOS_DISABLED, 628 MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0, 629 channelz); 630 server.start(serverListener); 631 address = TestUtils.testServerAddress(server.getPort()); 632 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 633 } 634 callMeMaybe(Runnable r)635 private void callMeMaybe(Runnable r) { 636 if (r != null) { 637 r.run(); 638 } 639 } 640 createSslContext()641 private static SslContext createSslContext() { 642 try { 643 File serverCert = TestUtils.loadCert("server1.pem"); 644 File key = TestUtils.loadCert("server1.key"); 645 return GrpcSslContexts.forServer(serverCert, key) 646 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build(); 647 } catch (IOException ex) { 648 throw new RuntimeException(ex); 649 } 650 } 651 652 private static class Rpc { 653 static final String MESSAGE = "hello"; 654 static final MethodDescriptor<String, String> METHOD = 655 MethodDescriptor.<String, String>newBuilder() 656 .setType(MethodDescriptor.MethodType.UNARY) 657 .setFullMethodName("testService/test") 658 .setRequestMarshaller(StringMarshaller.INSTANCE) 659 .setResponseMarshaller(StringMarshaller.INSTANCE) 660 .build(); 661 662 final ClientStream stream; 663 final TestClientStreamListener listener = new TestClientStreamListener(); 664 Rpc(NettyClientTransport transport)665 Rpc(NettyClientTransport transport) { 666 this(transport, new Metadata()); 667 } 668 Rpc(NettyClientTransport transport, Metadata headers)669 Rpc(NettyClientTransport transport, Metadata headers) { 670 stream = transport.newStream(METHOD, headers, CallOptions.DEFAULT); 671 stream.start(listener); 672 stream.request(1); 673 stream.writeMessage(new ByteArrayInputStream(MESSAGE.getBytes(UTF_8))); 674 stream.flush(); 675 } 676 halfClose()677 Rpc halfClose() { 678 stream.halfClose(); 679 return this; 680 } 681 waitForResponse()682 void waitForResponse() throws InterruptedException, ExecutionException, TimeoutException { 683 listener.responseFuture.get(10, TimeUnit.SECONDS); 684 } 685 waitForClose()686 void waitForClose() throws InterruptedException, ExecutionException, TimeoutException { 687 listener.closedFuture.get(10, TimeUnit.SECONDS); 688 } 689 } 690 691 private static final class TestClientStreamListener implements ClientStreamListener { 692 final SettableFuture<Void> closedFuture = SettableFuture.create(); 693 final SettableFuture<Void> responseFuture = SettableFuture.create(); 694 695 @Override headersRead(Metadata headers)696 public void headersRead(Metadata headers) { 697 } 698 699 @Override closed(Status status, Metadata trailers)700 public void closed(Status status, Metadata trailers) { 701 closed(status, RpcProgress.PROCESSED, trailers); 702 } 703 704 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)705 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 706 if (status.isOk()) { 707 closedFuture.set(null); 708 } else { 709 StatusException e = status.asException(); 710 closedFuture.setException(e); 711 responseFuture.setException(e); 712 } 713 } 714 715 @Override messagesAvailable(MessageProducer producer)716 public void messagesAvailable(MessageProducer producer) { 717 if (producer.next() != null) { 718 responseFuture.set(null); 719 } 720 } 721 722 @Override onReady()723 public void onReady() { 724 } 725 } 726 727 private static final class EchoServerStreamListener implements ServerStreamListener { 728 final ServerStream stream; 729 final String method; 730 final Metadata headers; 731 EchoServerStreamListener(ServerStream stream, String method, Metadata headers)732 EchoServerStreamListener(ServerStream stream, String method, Metadata headers) { 733 this.stream = stream; 734 this.method = method; 735 this.headers = headers; 736 } 737 738 @Override messagesAvailable(MessageProducer producer)739 public void messagesAvailable(MessageProducer producer) { 740 InputStream message; 741 while ((message = producer.next()) != null) { 742 // Just echo back the message. 743 stream.writeMessage(message); 744 stream.flush(); 745 } 746 } 747 748 @Override onReady()749 public void onReady() { 750 } 751 752 @Override halfClosed()753 public void halfClosed() { 754 // Just close when the client closes. 755 stream.close(Status.OK, new Metadata()); 756 } 757 758 @Override closed(Status status)759 public void closed(Status status) { 760 } 761 } 762 763 private final class EchoServerListener implements ServerListener { 764 final List<NettyServerTransport> transports = new ArrayList<>(); 765 final List<EchoServerStreamListener> streamListeners = 766 Collections.synchronizedList(new ArrayList<EchoServerStreamListener>()); 767 768 @Override transportCreated(final ServerTransport transport)769 public ServerTransportListener transportCreated(final ServerTransport transport) { 770 transports.add((NettyServerTransport) transport); 771 return new ServerTransportListener() { 772 @Override 773 public void streamCreated(ServerStream stream, String method, Metadata headers) { 774 EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers); 775 stream.setListener(listener); 776 stream.writeHeaders(new Metadata()); 777 stream.request(1); 778 streamListeners.add(listener); 779 } 780 781 @Override 782 public Attributes transportReady(Attributes transportAttrs) { 783 serverTransportAttributesList.add(transportAttrs); 784 return transportAttrs; 785 } 786 787 @Override 788 public void transportTerminated() {} 789 }; 790 } 791 792 @Override serverShutdown()793 public void serverShutdown() { 794 } 795 } 796 797 private static final class StringMarshaller implements Marshaller<String> { 798 static final StringMarshaller INSTANCE = new StringMarshaller(); 799 800 @Override 801 public InputStream stream(String value) { 802 return new ByteArrayInputStream(value.getBytes(UTF_8)); 803 } 804 805 @Override 806 public String parse(InputStream stream) { 807 try { 808 return new String(ByteStreams.toByteArray(stream), UTF_8); 809 } catch (IOException ex) { 810 throw new RuntimeException(ex); 811 } 812 } 813 } 814 815 private static class NoopHandler extends ProtocolNegotiators.AbstractBufferingHandler 816 implements ProtocolNegotiator.Handler { 817 public NoopHandler(GrpcHttp2ConnectionHandler grpcHandler) { 818 super(grpcHandler); 819 } 820 821 @Override 822 public AsciiString scheme() { 823 return Utils.HTTP; 824 } 825 } 826 827 private static class NoopProtocolNegotiator implements ProtocolNegotiator { 828 GrpcHttp2ConnectionHandler grpcHandler; 829 NoopHandler handler; 830 831 @Override 832 public Handler newHandler(final GrpcHttp2ConnectionHandler grpcHandler) { 833 this.grpcHandler = grpcHandler; 834 return handler = new NoopHandler(grpcHandler); 835 } 836 837 @Override 838 public void close() {} 839 } 840 841 private static final class SocketPicker extends LocalSocketPicker { 842 843 @Nullable 844 @Override 845 public SocketAddress createSocketAddress(SocketAddress remoteAddress, Attributes attrs) { 846 return null; 847 } 848 } 849 } 850