1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import com.google.common.util.concurrent.Futures; 20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.MoreExecutors; 22 import com.google.common.util.concurrent.SettableFuture; 23 import io.grpc.ChannelCredentials; 24 import io.grpc.Grpc; 25 import io.grpc.ManagedChannel; 26 import io.grpc.Server; 27 import io.grpc.ServerCredentials; 28 import io.grpc.TlsChannelCredentials; 29 import io.grpc.TlsServerCredentials; 30 import io.grpc.internal.testing.TestUtils; 31 import io.grpc.stub.StreamObserver; 32 import io.grpc.testing.TlsTesting; 33 import io.grpc.testing.integration.Messages.ResponseParameters; 34 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; 35 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; 36 import java.io.IOException; 37 import java.util.ArrayList; 38 import java.util.List; 39 import java.util.concurrent.CyclicBarrier; 40 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.Executors; 42 import java.util.concurrent.ScheduledExecutorService; 43 import java.util.concurrent.TimeUnit; 44 import org.junit.After; 45 import org.junit.Before; 46 import org.junit.Test; 47 import org.junit.runner.RunWith; 48 import org.junit.runners.JUnit4; 49 50 51 /** 52 * Tests that gRPC clients and servers can handle concurrent RPCs. 53 * 54 * <p>These tests use TLS to make them more realistic, and because we'd like to test the thread 55 * safety of the TLS-related code paths as well. 56 */ 57 // TODO: Consider augmenting this class to perform non-streaming, client streaming, and 58 // bidirectional streaming requests also. 59 @RunWith(JUnit4.class) 60 public class ConcurrencyTest { 61 62 /** 63 * A response observer that completes a {@code ListenableFuture} when the proper number of 64 * responses arrives and the server signals that the RPC is complete. 65 */ 66 private static class SignalingResponseObserver 67 implements StreamObserver<StreamingOutputCallResponse> { SignalingResponseObserver(SettableFuture<Void> completionFuture)68 public SignalingResponseObserver(SettableFuture<Void> completionFuture) { 69 this.completionFuture = completionFuture; 70 } 71 72 @Override onCompleted()73 public void onCompleted() { 74 if (numResponsesReceived != NUM_RESPONSES_PER_REQUEST) { 75 completionFuture.setException( 76 new IllegalStateException("Wrong number of responses: " + numResponsesReceived)); 77 } else { 78 completionFuture.set(null); 79 } 80 } 81 82 @Override onError(Throwable error)83 public void onError(Throwable error) { 84 completionFuture.setException(error); 85 } 86 87 @Override onNext(StreamingOutputCallResponse response)88 public void onNext(StreamingOutputCallResponse response) { 89 numResponsesReceived++; 90 } 91 92 private final SettableFuture<Void> completionFuture; 93 private int numResponsesReceived = 0; 94 } 95 96 /** 97 * A client worker task that waits until all client workers are ready, then sends a request for a 98 * server-streaming RPC and arranges for a {@code ListenableFuture} to be signaled when the RPC is 99 * complete. 100 */ 101 private class ClientWorker implements Runnable { ClientWorker(CyclicBarrier startBarrier, SettableFuture<Void> completionFuture)102 public ClientWorker(CyclicBarrier startBarrier, SettableFuture<Void> completionFuture) { 103 this.startBarrier = startBarrier; 104 this.completionFuture = completionFuture; 105 } 106 107 @Override run()108 public void run() { 109 try { 110 // Prepare the request. 111 StreamingOutputCallRequest.Builder requestBuilder = StreamingOutputCallRequest.newBuilder(); 112 for (int i = 0; i < NUM_RESPONSES_PER_REQUEST; i++) { 113 requestBuilder.addResponseParameters(ResponseParameters.newBuilder() 114 .setSize(1000) 115 .setIntervalUs(0)); // No delay between responses, for maximum concurrency. 116 } 117 StreamingOutputCallRequest request = requestBuilder.build(); 118 119 // Wait until all client worker threads are poised & ready, then send the request. This way 120 // all clients send their requests at approximately the same time. 121 startBarrier.await(); 122 clientStub.streamingOutputCall(request, new SignalingResponseObserver(completionFuture)); 123 } catch (InterruptedException ex) { 124 Thread.currentThread().interrupt(); 125 completionFuture.setException(ex); 126 } catch (Throwable t) { 127 completionFuture.setException(t); 128 } 129 } 130 131 private final CyclicBarrier startBarrier; 132 private final SettableFuture<Void> completionFuture; 133 } 134 135 private static final int NUM_SERVER_THREADS = 10; 136 private static final int NUM_CONCURRENT_REQUESTS = 100; 137 private static final int NUM_RESPONSES_PER_REQUEST = 100; 138 139 private Server server; 140 private ManagedChannel clientChannel; 141 private TestServiceGrpc.TestServiceStub clientStub; 142 private ScheduledExecutorService serverExecutor; 143 private ExecutorService clientExecutor; 144 145 @Before setUp()146 public void setUp() throws Exception { 147 serverExecutor = Executors.newScheduledThreadPool(NUM_SERVER_THREADS); 148 clientExecutor = Executors.newFixedThreadPool(NUM_CONCURRENT_REQUESTS); 149 150 server = newServer(); 151 152 // Create the client. Keep a reference to its channel so we can shut it down during tearDown(). 153 clientChannel = newClientChannel(); 154 clientStub = TestServiceGrpc.newStub(clientChannel); 155 } 156 157 @After tearDown()158 public void tearDown() { 159 if (server != null) { 160 server.shutdown(); 161 } 162 if (clientChannel != null) { 163 clientChannel.shutdown(); 164 } 165 166 MoreExecutors.shutdownAndAwaitTermination(serverExecutor, 5, TimeUnit.SECONDS); 167 MoreExecutors.shutdownAndAwaitTermination(clientExecutor, 5, TimeUnit.SECONDS); 168 } 169 170 /** 171 * Tests that gRPC can handle concurrent server-streaming RPCs. 172 */ 173 @Test serverStreamingTest()174 public void serverStreamingTest() throws Exception { 175 CyclicBarrier startBarrier = new CyclicBarrier(NUM_CONCURRENT_REQUESTS); 176 List<ListenableFuture<Void>> workerFutures = new ArrayList<>(NUM_CONCURRENT_REQUESTS); 177 178 for (int i = 0; i < NUM_CONCURRENT_REQUESTS; i++) { 179 SettableFuture<Void> future = SettableFuture.create(); 180 clientExecutor.execute(new ClientWorker(startBarrier, future)); 181 workerFutures.add(future); 182 } 183 184 Futures.allAsList(workerFutures).get(60, TimeUnit.SECONDS); 185 } 186 187 /** 188 * Creates and starts a new {@link TestServiceImpl} server. 189 */ newServer()190 private Server newServer() throws IOException { 191 ServerCredentials serverCreds = TlsServerCredentials.newBuilder() 192 .keyManager(TlsTesting.loadCert("server1.pem"), TlsTesting.loadCert("server1.key")) 193 .trustManager(TlsTesting.loadCert("ca.pem")) 194 .clientAuth(TlsServerCredentials.ClientAuth.REQUIRE) 195 .build(); 196 197 return Grpc.newServerBuilderForPort(0, serverCreds) 198 .addService(new TestServiceImpl(serverExecutor)) 199 .build() 200 .start(); 201 } 202 newClientChannel()203 private ManagedChannel newClientChannel() throws IOException { 204 ChannelCredentials channelCreds = TlsChannelCredentials.newBuilder() 205 .keyManager(TlsTesting.loadCert("client.pem"), TlsTesting.loadCert("client.key")) 206 .trustManager(TlsTesting.loadCert("ca.pem")) 207 .build(); 208 209 return Grpc.newChannelBuilder("localhost:" + server.getPort(), channelCreds) 210 .overrideAuthority(TestUtils.TEST_SERVER_HOST) 211 .build(); 212 } 213 } 214