• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import io.grpc.ChannelCredentials;
24 import io.grpc.Grpc;
25 import io.grpc.ManagedChannel;
26 import io.grpc.Server;
27 import io.grpc.ServerCredentials;
28 import io.grpc.TlsChannelCredentials;
29 import io.grpc.TlsServerCredentials;
30 import io.grpc.internal.testing.TestUtils;
31 import io.grpc.stub.StreamObserver;
32 import io.grpc.testing.TlsTesting;
33 import io.grpc.testing.integration.Messages.ResponseParameters;
34 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
35 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.concurrent.CyclicBarrier;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.TimeUnit;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.junit.runners.JUnit4;
49 
50 
51 /**
52  * Tests that gRPC clients and servers can handle concurrent RPCs.
53  *
54  * <p>These tests use TLS to make them more realistic, and because we'd like to test the thread
55  * safety of the TLS-related code paths as well.
56  */
57 // TODO: Consider augmenting this class to perform non-streaming, client streaming, and
58 // bidirectional streaming requests also.
59 @RunWith(JUnit4.class)
60 public class ConcurrencyTest {
61 
62   /**
63    * A response observer that completes a {@code ListenableFuture} when the proper number of
64    * responses arrives and the server signals that the RPC is complete.
65    */
66   private static class SignalingResponseObserver
67       implements StreamObserver<StreamingOutputCallResponse> {
SignalingResponseObserver(SettableFuture<Void> completionFuture)68     public SignalingResponseObserver(SettableFuture<Void> completionFuture) {
69       this.completionFuture = completionFuture;
70     }
71 
72     @Override
onCompleted()73     public void onCompleted() {
74       if (numResponsesReceived != NUM_RESPONSES_PER_REQUEST) {
75         completionFuture.setException(
76             new IllegalStateException("Wrong number of responses: " + numResponsesReceived));
77       } else {
78         completionFuture.set(null);
79       }
80     }
81 
82     @Override
onError(Throwable error)83     public void onError(Throwable error) {
84       completionFuture.setException(error);
85     }
86 
87     @Override
onNext(StreamingOutputCallResponse response)88     public void onNext(StreamingOutputCallResponse response) {
89       numResponsesReceived++;
90     }
91 
92     private final SettableFuture<Void> completionFuture;
93     private int numResponsesReceived = 0;
94   }
95 
96   /**
97    * A client worker task that waits until all client workers are ready, then sends a request for a
98    * server-streaming RPC and arranges for a {@code ListenableFuture} to be signaled when the RPC is
99    * complete.
100    */
101   private class ClientWorker implements Runnable {
ClientWorker(CyclicBarrier startBarrier, SettableFuture<Void> completionFuture)102     public ClientWorker(CyclicBarrier startBarrier, SettableFuture<Void> completionFuture) {
103       this.startBarrier = startBarrier;
104       this.completionFuture = completionFuture;
105     }
106 
107     @Override
run()108     public void run() {
109       try {
110         // Prepare the request.
111         StreamingOutputCallRequest.Builder requestBuilder = StreamingOutputCallRequest.newBuilder();
112         for (int i = 0; i < NUM_RESPONSES_PER_REQUEST; i++) {
113           requestBuilder.addResponseParameters(ResponseParameters.newBuilder()
114               .setSize(1000)
115               .setIntervalUs(0));  // No delay between responses, for maximum concurrency.
116         }
117         StreamingOutputCallRequest request = requestBuilder.build();
118 
119         // Wait until all client worker threads are poised & ready, then send the request. This way
120         // all clients send their requests at approximately the same time.
121         startBarrier.await();
122         clientStub.streamingOutputCall(request, new SignalingResponseObserver(completionFuture));
123       } catch (InterruptedException ex) {
124         Thread.currentThread().interrupt();
125         completionFuture.setException(ex);
126       } catch (Throwable t) {
127         completionFuture.setException(t);
128       }
129     }
130 
131     private final CyclicBarrier startBarrier;
132     private final SettableFuture<Void> completionFuture;
133   }
134 
135   private static final int NUM_SERVER_THREADS = 10;
136   private static final int NUM_CONCURRENT_REQUESTS = 100;
137   private static final int NUM_RESPONSES_PER_REQUEST = 100;
138 
139   private Server server;
140   private ManagedChannel clientChannel;
141   private TestServiceGrpc.TestServiceStub clientStub;
142   private ScheduledExecutorService serverExecutor;
143   private ExecutorService clientExecutor;
144 
145   @Before
setUp()146   public void setUp() throws Exception {
147     serverExecutor = Executors.newScheduledThreadPool(NUM_SERVER_THREADS);
148     clientExecutor = Executors.newFixedThreadPool(NUM_CONCURRENT_REQUESTS);
149 
150     server = newServer();
151 
152     // Create the client. Keep a reference to its channel so we can shut it down during tearDown().
153     clientChannel = newClientChannel();
154     clientStub = TestServiceGrpc.newStub(clientChannel);
155   }
156 
157   @After
tearDown()158   public void tearDown() {
159     if (server != null) {
160       server.shutdown();
161     }
162     if (clientChannel != null) {
163       clientChannel.shutdown();
164     }
165 
166     MoreExecutors.shutdownAndAwaitTermination(serverExecutor, 5, TimeUnit.SECONDS);
167     MoreExecutors.shutdownAndAwaitTermination(clientExecutor, 5, TimeUnit.SECONDS);
168   }
169 
170   /**
171    * Tests that gRPC can handle concurrent server-streaming RPCs.
172    */
173   @Test
serverStreamingTest()174   public void serverStreamingTest() throws Exception {
175     CyclicBarrier startBarrier = new CyclicBarrier(NUM_CONCURRENT_REQUESTS);
176     List<ListenableFuture<Void>> workerFutures = new ArrayList<>(NUM_CONCURRENT_REQUESTS);
177 
178     for (int i = 0; i < NUM_CONCURRENT_REQUESTS; i++) {
179       SettableFuture<Void> future = SettableFuture.create();
180       clientExecutor.execute(new ClientWorker(startBarrier, future));
181       workerFutures.add(future);
182     }
183 
184     Futures.allAsList(workerFutures).get(60, TimeUnit.SECONDS);
185   }
186 
187   /**
188    * Creates and starts a new {@link TestServiceImpl} server.
189    */
newServer()190   private Server newServer() throws IOException {
191     ServerCredentials serverCreds = TlsServerCredentials.newBuilder()
192         .keyManager(TlsTesting.loadCert("server1.pem"), TlsTesting.loadCert("server1.key"))
193         .trustManager(TlsTesting.loadCert("ca.pem"))
194         .clientAuth(TlsServerCredentials.ClientAuth.REQUIRE)
195         .build();
196 
197     return Grpc.newServerBuilderForPort(0, serverCreds)
198         .addService(new TestServiceImpl(serverExecutor))
199         .build()
200         .start();
201   }
202 
newClientChannel()203   private ManagedChannel newClientChannel() throws IOException {
204     ChannelCredentials channelCreds = TlsChannelCredentials.newBuilder()
205         .keyManager(TlsTesting.loadCert("client.pem"), TlsTesting.loadCert("client.key"))
206         .trustManager(TlsTesting.loadCert("ca.pem"))
207         .build();
208 
209     return Grpc.newChannelBuilder("localhost:" + server.getPort(), channelCreds)
210         .overrideAuthority(TestUtils.TEST_SERVER_HOST)
211         .build();
212   }
213 }
214