1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Charsets.UTF_8; 20 import static com.google.common.truth.Truth.assertThat; 21 import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; 22 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; 23 import static io.grpc.internal.GrpcUtil.DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; 24 import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; 25 import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; 26 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; 27 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; 28 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; 29 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; 30 import static org.junit.Assert.assertEquals; 31 import static org.junit.Assert.assertFalse; 32 import static org.junit.Assert.assertNotNull; 33 import static org.junit.Assert.assertNull; 34 import static org.junit.Assert.assertSame; 35 import static org.junit.Assert.assertTrue; 36 import static org.junit.Assert.fail; 37 38 import com.google.common.io.ByteStreams; 39 import com.google.common.util.concurrent.SettableFuture; 40 import io.grpc.Attributes; 41 import io.grpc.CallOptions; 42 import io.grpc.Grpc; 43 import io.grpc.InternalChannelz; 44 import io.grpc.Metadata; 45 import io.grpc.MethodDescriptor; 46 import io.grpc.MethodDescriptor.Marshaller; 47 import io.grpc.ServerStreamTracer; 48 import io.grpc.Status; 49 import io.grpc.Status.Code; 50 import io.grpc.StatusException; 51 import io.grpc.internal.ClientStream; 52 import io.grpc.internal.ClientStreamListener; 53 import io.grpc.internal.ClientTransport; 54 import io.grpc.internal.FakeClock; 55 import io.grpc.internal.GrpcUtil; 56 import io.grpc.internal.ManagedClientTransport; 57 import io.grpc.internal.ServerListener; 58 import io.grpc.internal.ServerStream; 59 import io.grpc.internal.ServerStreamListener; 60 import io.grpc.internal.ServerTransport; 61 import io.grpc.internal.ServerTransportListener; 62 import io.grpc.internal.TransportTracer; 63 import io.grpc.internal.testing.TestUtils; 64 import io.netty.channel.ChannelConfig; 65 import io.netty.channel.ChannelOption; 66 import io.netty.channel.nio.NioEventLoopGroup; 67 import io.netty.channel.socket.SocketChannelConfig; 68 import io.netty.channel.socket.nio.NioServerSocketChannel; 69 import io.netty.channel.socket.nio.NioSocketChannel; 70 import io.netty.handler.codec.http2.StreamBufferingEncoder; 71 import io.netty.handler.ssl.ClientAuth; 72 import io.netty.handler.ssl.SslContext; 73 import io.netty.handler.ssl.SupportedCipherSuiteFilter; 74 import io.netty.util.AsciiString; 75 import java.io.ByteArrayInputStream; 76 import java.io.File; 77 import java.io.IOException; 78 import java.io.InputStream; 79 import java.net.InetSocketAddress; 80 import java.util.ArrayList; 81 import java.util.Collections; 82 import java.util.HashMap; 83 import java.util.List; 84 import java.util.Map; 85 import java.util.concurrent.ExecutionException; 86 import java.util.concurrent.TimeUnit; 87 import java.util.concurrent.TimeoutException; 88 import javax.net.ssl.SSLHandshakeException; 89 import org.junit.After; 90 import org.junit.Before; 91 import org.junit.Test; 92 import org.junit.runner.RunWith; 93 import org.junit.runners.JUnit4; 94 import org.mockito.Mock; 95 import org.mockito.MockitoAnnotations; 96 97 /** 98 * Tests for {@link NettyClientTransport}. 99 */ 100 @RunWith(JUnit4.class) 101 public class NettyClientTransportTest { 102 private static final SslContext SSL_CONTEXT = createSslContext(); 103 104 @Mock 105 private ManagedClientTransport.Listener clientTransportListener; 106 107 private final List<NettyClientTransport> transports = new ArrayList<>(); 108 private final NioEventLoopGroup group = new NioEventLoopGroup(1); 109 private final EchoServerListener serverListener = new EchoServerListener(); 110 private final InternalChannelz channelz = new InternalChannelz(); 111 private Runnable tooManyPingsRunnable = new Runnable() { 112 // Throwing is useless in this method, because Netty doesn't propagate the exception 113 @Override public void run() {} 114 }; 115 private Attributes eagAttributes = Attributes.EMPTY; 116 117 private ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(SSL_CONTEXT); 118 119 private InetSocketAddress address; 120 private String authority; 121 private NettyServer server; 122 123 @Before setup()124 public void setup() { 125 MockitoAnnotations.initMocks(this); 126 } 127 128 @After teardown()129 public void teardown() throws Exception { 130 for (NettyClientTransport transport : transports) { 131 transport.shutdown(Status.UNAVAILABLE); 132 } 133 134 if (server != null) { 135 server.shutdown(); 136 } 137 138 group.shutdownGracefully(0, 10, TimeUnit.SECONDS); 139 } 140 141 @Test testToString()142 public void testToString() throws Exception { 143 address = TestUtils.testServerAddress(12345); 144 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 145 String s = newTransport(newNegotiator()).toString(); 146 transports.clear(); 147 assertTrue("Unexpected: " + s, s.contains("NettyClientTransport")); 148 assertTrue("Unexpected: " + s, s.contains(address.toString())); 149 } 150 151 @Test addDefaultUserAgent()152 public void addDefaultUserAgent() throws Exception { 153 startServer(); 154 NettyClientTransport transport = newTransport(newNegotiator()); 155 callMeMaybe(transport.start(clientTransportListener)); 156 157 // Send a single RPC and wait for the response. 158 new Rpc(transport).halfClose().waitForResponse(); 159 160 // Verify that the received headers contained the User-Agent. 161 assertEquals(1, serverListener.streamListeners.size()); 162 163 Metadata headers = serverListener.streamListeners.get(0).headers; 164 assertEquals(GrpcUtil.getGrpcUserAgent("netty", null), headers.get(USER_AGENT_KEY)); 165 } 166 167 @Test setSoLingerChannelOption()168 public void setSoLingerChannelOption() throws IOException { 169 startServer(); 170 Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>(); 171 // set SO_LINGER option 172 int soLinger = 123; 173 channelOptions.put(ChannelOption.SO_LINGER, soLinger); 174 NettyClientTransport transport = new NettyClientTransport( 175 address, NioSocketChannel.class, channelOptions, group, newNegotiator(), 176 DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, 177 KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, 178 tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY); 179 transports.add(transport); 180 callMeMaybe(transport.start(clientTransportListener)); 181 182 // verify SO_LINGER has been set 183 ChannelConfig config = transport.channel().config(); 184 assertTrue(config instanceof SocketChannelConfig); 185 assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger()); 186 } 187 188 @Test overrideDefaultUserAgent()189 public void overrideDefaultUserAgent() throws Exception { 190 startServer(); 191 NettyClientTransport transport = newTransport(newNegotiator(), 192 DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true); 193 callMeMaybe(transport.start(clientTransportListener)); 194 195 new Rpc(transport, new Metadata()).halfClose().waitForResponse(); 196 197 // Verify that the received headers contained the User-Agent. 198 assertEquals(1, serverListener.streamListeners.size()); 199 Metadata receivedHeaders = serverListener.streamListeners.get(0).headers; 200 assertEquals(GrpcUtil.getGrpcUserAgent("netty", "testUserAgent"), 201 receivedHeaders.get(USER_AGENT_KEY)); 202 } 203 204 @Test maxMessageSizeShouldBeEnforced()205 public void maxMessageSizeShouldBeEnforced() throws Throwable { 206 startServer(); 207 // Allow the response payloads of up to 1 byte. 208 NettyClientTransport transport = newTransport(newNegotiator(), 209 1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true); 210 callMeMaybe(transport.start(clientTransportListener)); 211 212 try { 213 // Send a single RPC and wait for the response. 214 new Rpc(transport).halfClose().waitForResponse(); 215 fail("Expected the stream to fail."); 216 } catch (ExecutionException e) { 217 Status status = Status.fromThrowable(e); 218 assertEquals(Code.RESOURCE_EXHAUSTED, status.getCode()); 219 assertTrue("Missing exceeds maximum from: " + status.getDescription(), 220 status.getDescription().contains("exceeds maximum")); 221 } 222 } 223 224 /** 225 * Verifies that we can create multiple TLS client transports from the same builder. 226 */ 227 @Test creatingMultipleTlsTransportsShouldSucceed()228 public void creatingMultipleTlsTransportsShouldSucceed() throws Exception { 229 startServer(); 230 231 // Create a couple client transports. 232 ProtocolNegotiator negotiator = newNegotiator(); 233 for (int index = 0; index < 2; ++index) { 234 NettyClientTransport transport = newTransport(negotiator); 235 callMeMaybe(transport.start(clientTransportListener)); 236 } 237 238 // Send a single RPC on each transport. 239 final List<Rpc> rpcs = new ArrayList<>(transports.size()); 240 for (NettyClientTransport transport : transports) { 241 rpcs.add(new Rpc(transport).halfClose()); 242 } 243 244 // Wait for the RPCs to complete. 245 for (Rpc rpc : rpcs) { 246 rpc.waitForResponse(); 247 } 248 } 249 250 @Test negotiationFailurePropagatesToStatus()251 public void negotiationFailurePropagatesToStatus() throws Exception { 252 negotiator = ProtocolNegotiators.serverPlaintext(); 253 startServer(); 254 255 final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator(); 256 final NettyClientTransport transport = newTransport(negotiator); 257 callMeMaybe(transport.start(clientTransportListener)); 258 final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); 259 transport.channel().eventLoop().execute(new Runnable() { 260 @Override 261 public void run() { 262 negotiator.handler.fail(transport.channel().pipeline().context(negotiator.handler), 263 failureStatus.asRuntimeException()); 264 } 265 }); 266 267 Rpc rpc = new Rpc(transport).halfClose(); 268 try { 269 rpc.waitForClose(); 270 fail("expected exception"); 271 } catch (ExecutionException ex) { 272 assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus()); 273 } 274 } 275 276 @Test tlsNegotiationFailurePropagatesToStatus()277 public void tlsNegotiationFailurePropagatesToStatus() throws Exception { 278 File serverCert = TestUtils.loadCert("server1.pem"); 279 File serverKey = TestUtils.loadCert("server1.key"); 280 // Don't trust ca.pem, so that client auth fails 281 SslContext sslContext = GrpcSslContexts.forServer(serverCert, serverKey) 282 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE) 283 .clientAuth(ClientAuth.REQUIRE) 284 .build(); 285 negotiator = ProtocolNegotiators.serverTls(sslContext); 286 startServer(); 287 288 File caCert = TestUtils.loadCert("ca.pem"); 289 File clientCert = TestUtils.loadCert("client.pem"); 290 File clientKey = TestUtils.loadCert("client.key"); 291 SslContext clientContext = GrpcSslContexts.forClient() 292 .trustManager(caCert) 293 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE) 294 .keyManager(clientCert, clientKey) 295 .build(); 296 ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext); 297 final NettyClientTransport transport = newTransport(negotiator); 298 callMeMaybe(transport.start(clientTransportListener)); 299 300 Rpc rpc = new Rpc(transport).halfClose(); 301 try { 302 rpc.waitForClose(); 303 fail("expected exception"); 304 } catch (ExecutionException ex) { 305 StatusException sre = (StatusException) ex.getCause(); 306 assertEquals(Status.Code.UNAVAILABLE, sre.getStatus().getCode()); 307 assertThat(sre.getCause()).isInstanceOf(SSLHandshakeException.class); 308 assertThat(sre.getCause().getMessage()).contains("SSLV3_ALERT_HANDSHAKE_FAILURE"); 309 } 310 } 311 312 @Test channelExceptionDuringNegotiatonPropagatesToStatus()313 public void channelExceptionDuringNegotiatonPropagatesToStatus() throws Exception { 314 negotiator = ProtocolNegotiators.serverPlaintext(); 315 startServer(); 316 317 NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator(); 318 NettyClientTransport transport = newTransport(negotiator); 319 callMeMaybe(transport.start(clientTransportListener)); 320 final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); 321 transport.channel().pipeline().fireExceptionCaught(failureStatus.asRuntimeException()); 322 323 Rpc rpc = new Rpc(transport).halfClose(); 324 try { 325 rpc.waitForClose(); 326 fail("expected exception"); 327 } catch (ExecutionException ex) { 328 assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus()); 329 } 330 } 331 332 @Test handlerExceptionDuringNegotiatonPropagatesToStatus()333 public void handlerExceptionDuringNegotiatonPropagatesToStatus() throws Exception { 334 negotiator = ProtocolNegotiators.serverPlaintext(); 335 startServer(); 336 337 final NoopProtocolNegotiator negotiator = new NoopProtocolNegotiator(); 338 final NettyClientTransport transport = newTransport(negotiator); 339 callMeMaybe(transport.start(clientTransportListener)); 340 final Status failureStatus = Status.UNAVAILABLE.withDescription("oh noes!"); 341 transport.channel().eventLoop().execute(new Runnable() { 342 @Override 343 public void run() { 344 try { 345 negotiator.handler.exceptionCaught( 346 transport.channel().pipeline().context(negotiator.handler), 347 failureStatus.asRuntimeException()); 348 } catch (Exception ex) { 349 throw new RuntimeException(ex); 350 } 351 } 352 }); 353 354 Rpc rpc = new Rpc(transport).halfClose(); 355 try { 356 rpc.waitForClose(); 357 fail("expected exception"); 358 } catch (ExecutionException ex) { 359 assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus()); 360 } 361 } 362 363 @Test bufferedStreamsShouldBeClosedWhenConnectionTerminates()364 public void bufferedStreamsShouldBeClosedWhenConnectionTerminates() throws Exception { 365 // Only allow a single stream active at a time. 366 startServer(1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); 367 368 NettyClientTransport transport = newTransport(newNegotiator()); 369 callMeMaybe(transport.start(clientTransportListener)); 370 371 // Send a dummy RPC in order to ensure that the updated SETTINGS_MAX_CONCURRENT_STREAMS 372 // has been received by the remote endpoint. 373 new Rpc(transport).halfClose().waitForResponse(); 374 375 // Create 3 streams, but don't half-close. The transport will buffer the second and third. 376 Rpc[] rpcs = new Rpc[] { new Rpc(transport), new Rpc(transport), new Rpc(transport) }; 377 378 // Wait for the response for the stream that was actually created. 379 rpcs[0].waitForResponse(); 380 381 // Now forcibly terminate the connection from the server side. 382 serverListener.transports.get(0).channel().pipeline().firstContext().close(); 383 384 // Now wait for both listeners to be closed. 385 for (int i = 1; i < rpcs.length; i++) { 386 try { 387 rpcs[i].waitForClose(); 388 fail("Expected the RPC to fail"); 389 } catch (ExecutionException e) { 390 // Expected. 391 Throwable t = getRootCause(e); 392 // Make sure that the Http2ChannelClosedException got replaced with the real cause of 393 // the shutdown. 394 assertFalse(t instanceof StreamBufferingEncoder.Http2ChannelClosedException); 395 } 396 } 397 } 398 399 public static class CantConstructChannel extends NioSocketChannel { 400 /** Constructor. It doesn't work. Feel free to try. But it doesn't work. */ CantConstructChannel()401 public CantConstructChannel() { 402 // Use an Error because we've seen cases of channels failing to construct due to classloading 403 // problems (like mixing different versions of Netty), and those involve Errors. 404 throw new CantConstructChannelError(); 405 } 406 } 407 408 private static class CantConstructChannelError extends Error {} 409 410 @Test failingToConstructChannelShouldFailGracefully()411 public void failingToConstructChannelShouldFailGracefully() throws Exception { 412 address = TestUtils.testServerAddress(12345); 413 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 414 NettyClientTransport transport = new NettyClientTransport( 415 address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group, 416 newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, 417 GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, 418 null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY); 419 transports.add(transport); 420 421 // Should not throw 422 callMeMaybe(transport.start(clientTransportListener)); 423 424 // And RPCs and PINGs should fail cleanly, reporting the failure 425 Rpc rpc = new Rpc(transport); 426 try { 427 rpc.waitForResponse(); 428 fail("Expected exception"); 429 } catch (Exception ex) { 430 if (!(getRootCause(ex) instanceof CantConstructChannelError)) { 431 throw new AssertionError("Could not find expected error", ex); 432 } 433 } 434 435 final SettableFuture<Object> pingResult = SettableFuture.create(); 436 FakeClock clock = new FakeClock(); 437 ClientTransport.PingCallback pingCallback = new ClientTransport.PingCallback() { 438 @Override 439 public void onSuccess(long roundTripTimeNanos) { 440 pingResult.set(roundTripTimeNanos); 441 } 442 443 @Override 444 public void onFailure(Throwable cause) { 445 pingResult.setException(cause); 446 } 447 }; 448 transport.ping(pingCallback, clock.getScheduledExecutorService()); 449 assertFalse(pingResult.isDone()); 450 clock.runDueTasks(); 451 assertTrue(pingResult.isDone()); 452 try { 453 pingResult.get(); 454 fail("Expected exception"); 455 } catch (Exception ex) { 456 if (!(getRootCause(ex) instanceof CantConstructChannelError)) { 457 throw new AssertionError("Could not find expected error", ex); 458 } 459 } 460 } 461 462 @Test maxHeaderListSizeShouldBeEnforcedOnClient()463 public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception { 464 startServer(); 465 466 NettyClientTransport transport = 467 newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true); 468 callMeMaybe(transport.start(clientTransportListener)); 469 470 try { 471 // Send a single RPC and wait for the response. 472 new Rpc(transport, new Metadata()).halfClose().waitForResponse(); 473 fail("The stream should have been failed due to client received header exceeds header list" 474 + " size limit!"); 475 } catch (Exception e) { 476 Throwable rootCause = getRootCause(e); 477 Status status = ((StatusException) rootCause).getStatus(); 478 assertEquals(Status.Code.INTERNAL, status.getCode()); 479 assertEquals("HTTP/2 error code: PROTOCOL_ERROR\nReceived Rst Stream", 480 status.getDescription()); 481 } 482 } 483 484 @Test maxHeaderListSizeShouldBeEnforcedOnServer()485 public void maxHeaderListSizeShouldBeEnforcedOnServer() throws Exception { 486 startServer(100, 1); 487 488 NettyClientTransport transport = newTransport(newNegotiator()); 489 callMeMaybe(transport.start(clientTransportListener)); 490 491 try { 492 // Send a single RPC and wait for the response. 493 new Rpc(transport, new Metadata()).halfClose().waitForResponse(); 494 fail("The stream should have been failed due to server received header exceeds header list" 495 + " size limit!"); 496 } catch (Exception e) { 497 Status status = Status.fromThrowable(e); 498 assertEquals(status.toString(), Status.Code.INTERNAL, status.getCode()); 499 } 500 } 501 502 @Test getAttributes_negotiatorHandler()503 public void getAttributes_negotiatorHandler() throws Exception { 504 address = TestUtils.testServerAddress(12345); 505 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 506 507 NettyClientTransport transport = newTransport(new NoopProtocolNegotiator()); 508 callMeMaybe(transport.start(clientTransportListener)); 509 510 assertEquals(Attributes.EMPTY, transport.getAttributes()); 511 } 512 513 @Test getEagAttributes_negotiatorHandler()514 public void getEagAttributes_negotiatorHandler() throws Exception { 515 address = TestUtils.testServerAddress(12345); 516 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 517 518 NoopProtocolNegotiator npn = new NoopProtocolNegotiator(); 519 eagAttributes = Attributes.newBuilder() 520 .set(Attributes.Key.create("trash"), "value") 521 .build(); 522 NettyClientTransport transport = newTransport(npn); 523 callMeMaybe(transport.start(clientTransportListener)); 524 525 // EAG Attributes are available before the negotiation is complete 526 assertSame(eagAttributes, npn.grpcHandler.getEagAttributes()); 527 } 528 529 @Test clientStreamGetsAttributes()530 public void clientStreamGetsAttributes() throws Exception { 531 startServer(); 532 NettyClientTransport transport = newTransport(newNegotiator()); 533 callMeMaybe(transport.start(clientTransportListener)); 534 Rpc rpc = new Rpc(transport).halfClose(); 535 rpc.waitForResponse(); 536 537 assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION)); 538 assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); 539 } 540 541 @Test keepAliveEnabled()542 public void keepAliveEnabled() throws Exception { 543 startServer(); 544 NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 545 GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */); 546 callMeMaybe(transport.start(clientTransportListener)); 547 Rpc rpc = new Rpc(transport).halfClose(); 548 rpc.waitForResponse(); 549 550 assertNotNull(transport.keepAliveManager()); 551 } 552 553 @Test keepAliveDisabled()554 public void keepAliveDisabled() throws Exception { 555 startServer(); 556 NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 557 GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */); 558 callMeMaybe(transport.start(clientTransportListener)); 559 Rpc rpc = new Rpc(transport).halfClose(); 560 rpc.waitForResponse(); 561 562 assertNull(transport.keepAliveManager()); 563 } 564 getRootCause(Throwable t)565 private Throwable getRootCause(Throwable t) { 566 if (t.getCause() == null) { 567 return t; 568 } 569 return getRootCause(t.getCause()); 570 } 571 newNegotiator()572 private ProtocolNegotiator newNegotiator() throws IOException { 573 File caCert = TestUtils.loadCert("ca.pem"); 574 SslContext clientContext = GrpcSslContexts.forClient().trustManager(caCert) 575 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build(); 576 return ProtocolNegotiators.tls(clientContext); 577 } 578 newTransport(ProtocolNegotiator negotiator)579 private NettyClientTransport newTransport(ProtocolNegotiator negotiator) { 580 return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, 581 null /* user agent */, true /* keep alive */); 582 } 583 newTransport(ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent, boolean enableKeepAlive)584 private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize, 585 int maxHeaderListSize, String userAgent, boolean enableKeepAlive) { 586 long keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; 587 long keepAliveTimeoutNano = TimeUnit.SECONDS.toNanos(1L); 588 if (enableKeepAlive) { 589 keepAliveTimeNano = TimeUnit.SECONDS.toNanos(10L); 590 } 591 NettyClientTransport transport = new NettyClientTransport( 592 address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator, 593 DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, 594 keepAliveTimeNano, keepAliveTimeoutNano, 595 false, authority, userAgent, tooManyPingsRunnable, 596 new TransportTracer(), eagAttributes); 597 transports.add(transport); 598 return transport; 599 } 600 startServer()601 private void startServer() throws IOException { 602 startServer(100, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); 603 } 604 startServer(int maxStreamsPerConnection, int maxHeaderListSize)605 private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException { 606 server = new NettyServer( 607 TestUtils.testServerAddress(0), 608 NioServerSocketChannel.class, 609 new HashMap<ChannelOption<?>, Object>(), 610 group, group, negotiator, 611 Collections.<ServerStreamTracer.Factory>emptyList(), 612 TransportTracer.getDefaultFactory(), 613 maxStreamsPerConnection, 614 DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, 615 DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, 616 MAX_CONNECTION_IDLE_NANOS_DISABLED, 617 MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0, 618 channelz); 619 server.start(serverListener); 620 address = TestUtils.testServerAddress(server.getPort()); 621 authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); 622 } 623 callMeMaybe(Runnable r)624 private void callMeMaybe(Runnable r) { 625 if (r != null) { 626 r.run(); 627 } 628 } 629 createSslContext()630 private static SslContext createSslContext() { 631 try { 632 File serverCert = TestUtils.loadCert("server1.pem"); 633 File key = TestUtils.loadCert("server1.key"); 634 return GrpcSslContexts.forServer(serverCert, key) 635 .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build(); 636 } catch (IOException ex) { 637 throw new RuntimeException(ex); 638 } 639 } 640 641 private static class Rpc { 642 static final String MESSAGE = "hello"; 643 static final MethodDescriptor<String, String> METHOD = 644 MethodDescriptor.<String, String>newBuilder() 645 .setType(MethodDescriptor.MethodType.UNARY) 646 .setFullMethodName("testService/test") 647 .setRequestMarshaller(StringMarshaller.INSTANCE) 648 .setResponseMarshaller(StringMarshaller.INSTANCE) 649 .build(); 650 651 final ClientStream stream; 652 final TestClientStreamListener listener = new TestClientStreamListener(); 653 Rpc(NettyClientTransport transport)654 Rpc(NettyClientTransport transport) { 655 this(transport, new Metadata()); 656 } 657 Rpc(NettyClientTransport transport, Metadata headers)658 Rpc(NettyClientTransport transport, Metadata headers) { 659 stream = transport.newStream(METHOD, headers, CallOptions.DEFAULT); 660 stream.start(listener); 661 stream.request(1); 662 stream.writeMessage(new ByteArrayInputStream(MESSAGE.getBytes(UTF_8))); 663 stream.flush(); 664 } 665 halfClose()666 Rpc halfClose() { 667 stream.halfClose(); 668 return this; 669 } 670 waitForResponse()671 void waitForResponse() throws InterruptedException, ExecutionException, TimeoutException { 672 listener.responseFuture.get(10, TimeUnit.SECONDS); 673 } 674 waitForClose()675 void waitForClose() throws InterruptedException, ExecutionException, TimeoutException { 676 listener.closedFuture.get(10, TimeUnit.SECONDS); 677 } 678 } 679 680 private static final class TestClientStreamListener implements ClientStreamListener { 681 final SettableFuture<Void> closedFuture = SettableFuture.create(); 682 final SettableFuture<Void> responseFuture = SettableFuture.create(); 683 684 @Override headersRead(Metadata headers)685 public void headersRead(Metadata headers) { 686 } 687 688 @Override closed(Status status, Metadata trailers)689 public void closed(Status status, Metadata trailers) { 690 closed(status, RpcProgress.PROCESSED, trailers); 691 } 692 693 @Override closed(Status status, RpcProgress rpcProgress, Metadata trailers)694 public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { 695 if (status.isOk()) { 696 closedFuture.set(null); 697 } else { 698 StatusException e = status.asException(); 699 closedFuture.setException(e); 700 responseFuture.setException(e); 701 } 702 } 703 704 @Override messagesAvailable(MessageProducer producer)705 public void messagesAvailable(MessageProducer producer) { 706 if (producer.next() != null) { 707 responseFuture.set(null); 708 } 709 } 710 711 @Override onReady()712 public void onReady() { 713 } 714 } 715 716 private static final class EchoServerStreamListener implements ServerStreamListener { 717 final ServerStream stream; 718 final String method; 719 final Metadata headers; 720 EchoServerStreamListener(ServerStream stream, String method, Metadata headers)721 EchoServerStreamListener(ServerStream stream, String method, Metadata headers) { 722 this.stream = stream; 723 this.method = method; 724 this.headers = headers; 725 } 726 727 @Override messagesAvailable(MessageProducer producer)728 public void messagesAvailable(MessageProducer producer) { 729 InputStream message; 730 while ((message = producer.next()) != null) { 731 // Just echo back the message. 732 stream.writeMessage(message); 733 stream.flush(); 734 } 735 } 736 737 @Override onReady()738 public void onReady() { 739 } 740 741 @Override halfClosed()742 public void halfClosed() { 743 // Just close when the client closes. 744 stream.close(Status.OK, new Metadata()); 745 } 746 747 @Override closed(Status status)748 public void closed(Status status) { 749 } 750 } 751 752 private static final class EchoServerListener implements ServerListener { 753 final List<NettyServerTransport> transports = new ArrayList<>(); 754 final List<EchoServerStreamListener> streamListeners = 755 Collections.synchronizedList(new ArrayList<EchoServerStreamListener>()); 756 757 @Override transportCreated(final ServerTransport transport)758 public ServerTransportListener transportCreated(final ServerTransport transport) { 759 transports.add((NettyServerTransport) transport); 760 return new ServerTransportListener() { 761 @Override 762 public void streamCreated(ServerStream stream, String method, Metadata headers) { 763 EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers); 764 stream.setListener(listener); 765 stream.writeHeaders(new Metadata()); 766 stream.request(1); 767 streamListeners.add(listener); 768 } 769 770 @Override 771 public Attributes transportReady(Attributes transportAttrs) { 772 return transportAttrs; 773 } 774 775 @Override 776 public void transportTerminated() {} 777 }; 778 } 779 780 @Override serverShutdown()781 public void serverShutdown() { 782 } 783 } 784 785 private static final class StringMarshaller implements Marshaller<String> { 786 static final StringMarshaller INSTANCE = new StringMarshaller(); 787 788 @Override 789 public InputStream stream(String value) { 790 return new ByteArrayInputStream(value.getBytes(UTF_8)); 791 } 792 793 @Override 794 public String parse(InputStream stream) { 795 try { 796 return new String(ByteStreams.toByteArray(stream), UTF_8); 797 } catch (IOException ex) { 798 throw new RuntimeException(ex); 799 } 800 } 801 } 802 803 private static class NoopHandler extends ProtocolNegotiators.AbstractBufferingHandler 804 implements ProtocolNegotiator.Handler { 805 public NoopHandler(GrpcHttp2ConnectionHandler grpcHandler) { 806 super(grpcHandler); 807 } 808 809 @Override 810 public AsciiString scheme() { 811 return Utils.HTTP; 812 } 813 } 814 815 private static class NoopProtocolNegotiator implements ProtocolNegotiator { 816 GrpcHttp2ConnectionHandler grpcHandler; 817 NoopHandler handler; 818 819 @Override 820 public Handler newHandler(final GrpcHttp2ConnectionHandler grpcHandler) { 821 this.grpcHandler = grpcHandler; 822 return handler = new NoopHandler(grpcHandler); 823 } 824 } 825 } 826