• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Queues;
21 import com.google.protobuf.ByteString;
22 import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
23 import io.grpc.Metadata;
24 import io.grpc.ServerCall;
25 import io.grpc.ServerCallHandler;
26 import io.grpc.ServerInterceptor;
27 import io.grpc.Status;
28 import io.grpc.StatusRuntimeException;
29 import io.grpc.internal.LogExceptionRunnable;
30 import io.grpc.services.CallMetricRecorder;
31 import io.grpc.services.MetricRecorder;
32 import io.grpc.stub.ServerCallStreamObserver;
33 import io.grpc.stub.StreamObserver;
34 import io.grpc.testing.integration.Messages.Payload;
35 import io.grpc.testing.integration.Messages.ResponseParameters;
36 import io.grpc.testing.integration.Messages.SimpleRequest;
37 import io.grpc.testing.integration.Messages.SimpleResponse;
38 import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
39 import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
40 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
41 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
42 import io.grpc.testing.integration.Messages.TestOrcaReport;
43 import io.grpc.testing.integration.TestServiceGrpc.AsyncService;
44 import java.util.ArrayDeque;
45 import java.util.Arrays;
46 import java.util.HashMap;
47 import java.util.HashSet;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.Queue;
51 import java.util.Random;
52 import java.util.Set;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.ScheduledExecutorService;
55 import java.util.concurrent.Semaphore;
56 import java.util.concurrent.TimeUnit;
57 import javax.annotation.concurrent.GuardedBy;
58 
59 /**
60  * Implementation of the business logic for the TestService. Uses an executor to schedule chunks
61  * sent in response streams.
62  */
63 public class TestServiceImpl implements io.grpc.BindableService, AsyncService {
64   private final Random random = new Random();
65 
66   private final ScheduledExecutorService executor;
67   private final ByteString compressableBuffer;
68   private final MetricRecorder metricRecorder;
69   final Semaphore lock = new Semaphore(1);
70 
71   /**
72    * Constructs a controller using the given executor for scheduling response stream chunks.
73    */
TestServiceImpl(ScheduledExecutorService executor, MetricRecorder metricRecorder)74   public TestServiceImpl(ScheduledExecutorService executor, MetricRecorder metricRecorder) {
75     this.executor = executor;
76     this.compressableBuffer = ByteString.copyFrom(new byte[1024]);
77     this.metricRecorder = metricRecorder;
78   }
79 
TestServiceImpl(ScheduledExecutorService executor)80   public TestServiceImpl(ScheduledExecutorService executor) {
81     this(executor, MetricRecorder.newInstance());
82   }
83 
84   @Override
bindService()85   public final io.grpc.ServerServiceDefinition bindService() {
86     return TestServiceGrpc.bindService(this);
87   }
88 
89   @Override
emptyCall(EmptyProtos.Empty request, StreamObserver<EmptyProtos.Empty> responseObserver)90   public void emptyCall(EmptyProtos.Empty request,
91       StreamObserver<EmptyProtos.Empty> responseObserver) {
92     responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
93     responseObserver.onCompleted();
94   }
95 
96   /**
97    * Immediately responds with a payload of the type and size specified in the request.
98    */
99   @Override
unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver)100   public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
101     ServerCallStreamObserver<SimpleResponse> obs =
102         (ServerCallStreamObserver<SimpleResponse>) responseObserver;
103     SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder();
104     try {
105       if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) {
106         obs.setCompression("gzip");
107       } else {
108         obs.setCompression("identity");
109       }
110     } catch (IllegalArgumentException e) {
111       obs.onError(Status.UNIMPLEMENTED
112           .withDescription("compression not supported.")
113           .withCause(e)
114           .asRuntimeException());
115       return;
116     }
117 
118     if (req.getResponseSize() != 0) {
119       // For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
120       // TODO(wonderfly): whether or not this is a good approach needs further discussion.
121       int offset = random.nextInt(compressableBuffer.size());
122       ByteString payload = generatePayload(compressableBuffer, offset, req.getResponseSize());
123       responseBuilder.setPayload(
124           Payload.newBuilder()
125               .setBody(payload));
126     }
127 
128     if (req.hasResponseStatus()) {
129       obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
130           .withDescription(req.getResponseStatus().getMessage())
131           .asRuntimeException());
132       return;
133     }
134 
135     if (req.hasOrcaPerQueryReport()) {
136       echoCallMetricsFromPayload(req.getOrcaPerQueryReport());
137     }
138     responseObserver.onNext(responseBuilder.build());
139     responseObserver.onCompleted();
140   }
141 
echoCallMetricsFromPayload(TestOrcaReport report)142   private static void echoCallMetricsFromPayload(TestOrcaReport report) {
143     CallMetricRecorder recorder = CallMetricRecorder.getCurrent()
144         .recordCpuUtilizationMetric(report.getCpuUtilization())
145         .recordMemoryUtilizationMetric(report.getMemoryUtilization());
146     for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) {
147       recorder.recordUtilizationMetric(entry.getKey(), entry.getValue());
148     }
149     for (Map.Entry<String, Double> entry : report.getRequestCostMap().entrySet()) {
150       recorder.recordRequestCostMetric(entry.getKey(), entry.getValue());
151     }
152   }
153 
echoMetricsFromPayload(TestOrcaReport report)154   private void echoMetricsFromPayload(TestOrcaReport report) {
155     metricRecorder.setCpuUtilizationMetric(report.getCpuUtilization());
156     metricRecorder.setMemoryUtilizationMetric(report.getMemoryUtilization());
157     metricRecorder.setAllUtilizationMetrics(new HashMap<>());
158     for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) {
159       metricRecorder.putUtilizationMetric(entry.getKey(), entry.getValue());
160     }
161   }
162 
163   /**
164    * Given a request that specifies chunk size and interval between responses, creates and schedules
165    * the response stream.
166    */
167   @Override
streamingOutputCall(StreamingOutputCallRequest request, StreamObserver<StreamingOutputCallResponse> responseObserver)168   public void streamingOutputCall(StreamingOutputCallRequest request,
169       StreamObserver<StreamingOutputCallResponse> responseObserver) {
170     // Create and start the response dispatcher.
171     new ResponseDispatcher(responseObserver).enqueue(toChunkQueue(request)).completeInput();
172   }
173 
174   /**
175    * Waits until we have received all of the request messages and then returns the aggregate payload
176    * size for all of the received requests.
177    */
178   @Override
streamingInputCall( final StreamObserver<Messages.StreamingInputCallResponse> responseObserver)179   public StreamObserver<Messages.StreamingInputCallRequest> streamingInputCall(
180       final StreamObserver<Messages.StreamingInputCallResponse> responseObserver) {
181     return new StreamObserver<StreamingInputCallRequest>() {
182       private int totalPayloadSize;
183 
184       @Override
185       public void onNext(StreamingInputCallRequest message) {
186         totalPayloadSize += message.getPayload().getBody().size();
187       }
188 
189       @Override
190       public void onCompleted() {
191         responseObserver.onNext(StreamingInputCallResponse.newBuilder()
192             .setAggregatedPayloadSize(totalPayloadSize).build());
193         responseObserver.onCompleted();
194       }
195 
196       @Override
197       public void onError(Throwable cause) {
198         responseObserver.onError(cause);
199       }
200     };
201   }
202 
203   /**
204    * True bi-directional streaming. Processes requests as they come in. Begins streaming results
205    * immediately.
206    */
207   @Override
fullDuplexCall( final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver)208   public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall(
209       final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
210     final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
211     return new StreamObserver<StreamingOutputCallRequest>() {
212       boolean oobTestLocked;
213 
214       @Override
215       public void onNext(StreamingOutputCallRequest request) {
216         // to facilitate multiple clients running orca_oob test in parallel, the server allows
217         // only one orca_oob test client to run at a time to avoid conflicts.
218         if (request.hasOrcaOobReport()) {
219           if (!oobTestLocked) {
220             try {
221               lock.acquire();
222             } catch (InterruptedException ex) {
223               responseObserver.onError(new StatusRuntimeException(
224                   Status.ABORTED.withDescription("server service interrupted").withCause(ex)));
225               return;
226             }
227             oobTestLocked = true;
228           }
229           echoMetricsFromPayload(request.getOrcaOobReport());
230         }
231         if (request.hasResponseStatus()) {
232           dispatcher.cancel();
233           dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode())
234               .withDescription(request.getResponseStatus().getMessage())
235               .asRuntimeException());
236           return;
237         }
238         dispatcher.enqueue(toChunkQueue(request));
239       }
240 
241       @Override
242       public void onCompleted() {
243         if (oobTestLocked) {
244           lock.release();
245           oobTestLocked = false;
246         }
247         if (!dispatcher.isCancelled()) {
248           // Tell the dispatcher that all input has been received.
249           dispatcher.completeInput();
250         }
251       }
252 
253       @Override
254       public void onError(Throwable cause) {
255         if (oobTestLocked) {
256           lock.release();
257           oobTestLocked = false;
258         }
259         dispatcher.onError(cause);
260       }
261     };
262   }
263 
264   /**
265    * Similar to {@link #fullDuplexCall}, except that it waits for all streaming requests to be
266    * received before starting the streaming responses.
267    */
268   @Override
269   public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
270       final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
271     final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
272     final Queue<Chunk> chunks = new ArrayDeque<>();
273     return new StreamObserver<StreamingOutputCallRequest>() {
274       @Override
275       public void onNext(StreamingOutputCallRequest request) {
276         chunks.addAll(toChunkQueue(request));
277       }
278 
279       @Override
280       public void onCompleted() {
281         // Dispatch all of the chunks in one shot.
282         dispatcher.enqueue(chunks).completeInput();
283       }
284 
285       @Override
286       public void onError(Throwable cause) {
287         dispatcher.onError(cause);
288       }
289     };
290   }
291 
292   /**
293    * Schedules the dispatch of a queue of chunks. Whenever chunks are added or input is completed,
294    * the next response chunk is scheduled for delivery to the client. When no more chunks are
295    * available, the stream is half-closed.
296    */
297   private class ResponseDispatcher {
298     private final Chunk completionChunk = new Chunk(0, 0, 0);
299     private final Queue<Chunk> chunks;
300     private final StreamObserver<StreamingOutputCallResponse> responseStream;
301     private boolean scheduled;
302     @GuardedBy("this") private boolean cancelled;
303     private Throwable failure;
304     private Runnable dispatchTask = new Runnable() {
305       @Override
306       @SuppressWarnings("CatchAndPrintStackTrace")
307       public void run() {
308         try {
309 
310           // Dispatch the current chunk to the client.
311           try {
312             dispatchChunk();
313           } catch (RuntimeException e) {
314             // Indicate that nothing is scheduled and re-throw.
315             synchronized (ResponseDispatcher.this) {
316               scheduled = false;
317             }
318             throw e;
319           }
320 
321           // Schedule the next chunk if there is one.
322           synchronized (ResponseDispatcher.this) {
323             // Indicate that nothing is scheduled.
324             scheduled = false;
325             scheduleNextChunk();
326           }
327         } catch (Throwable t) {
328           t.printStackTrace();
329         }
330       }
331     };
332 
333     /**
334      * The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to
335      * {@link StreamObserver} must be synchronized across threads, no further calls should be made
336      * directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}.
337      */
338     public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
339       this.chunks = Queues.newLinkedBlockingQueue();
340       this.responseStream = responseStream;
341     }
342 
343     /**
344      * Adds the given chunks to the response stream and schedules the next chunk to be delivered if
345      * needed.
346      */
347     public synchronized ResponseDispatcher enqueue(Queue<Chunk> moreChunks) {
348       assertNotFailed();
349       chunks.addAll(moreChunks);
350       scheduleNextChunk();
351       return this;
352     }
353 
354     /**
355      * Indicates that the input is completed and the currently enqueued response chunks are all that
356      * remain to be scheduled for dispatch to the client.
357      */
358     public ResponseDispatcher completeInput() {
359       assertNotFailed();
360       chunks.add(completionChunk);
361       scheduleNextChunk();
362       return this;
363     }
364 
365     /**
366      * Allows the service to cancel the remaining responses.
367      */
368     public synchronized void cancel() {
369       Preconditions.checkState(!cancelled, "Dispatcher already cancelled");
370       chunks.clear();
371       cancelled = true;
372     }
373 
374     public synchronized boolean isCancelled() {
375       return cancelled;
376     }
377 
378     private synchronized void onError(Throwable cause) {
379       responseStream.onError(cause);
380     }
381 
382     /**
383      * Dispatches the current response chunk to the client. This is only called by the executor. At
384      * any time, a given dispatch task should only be registered with the executor once.
385      */
386     private synchronized void dispatchChunk() {
387       if (cancelled) {
388         return;
389       }
390       try {
391         // Pop off the next chunk and send it to the client.
392         Chunk chunk = chunks.remove();
393         if (chunk == completionChunk) {
394           responseStream.onCompleted();
395         } else {
396           responseStream.onNext(chunk.toResponse());
397         }
398       } catch (Throwable e) {
399         failure = e;
400         if (Status.fromThrowable(e).getCode() == Status.CANCELLED.getCode()) {
401           // Stream was cancelled by client, responseStream.onError() might be called already or
402           // will be called soon by inbounding StreamObserver.
403           chunks.clear();
404         } else {
405           responseStream.onError(e);
406         }
407       }
408     }
409 
410     /**
411      * Schedules the next response chunk to be dispatched. If all input has been received and there
412      * are no more chunks in the queue, the stream is closed.
413      */
414     private void scheduleNextChunk() {
415       synchronized (this) {
416         if (scheduled) {
417           // Dispatch task is already scheduled.
418           return;
419         }
420 
421         // Schedule the next response chunk if there is one.
422         Chunk nextChunk = chunks.peek();
423         if (nextChunk != null && !executor.isShutdown()) {
424           scheduled = true;
425           // TODO(ejona): cancel future if RPC is cancelled
426           Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask),
427               nextChunk.delayMicroseconds, TimeUnit.MICROSECONDS);
428         }
429       }
430     }
431 
432     private void assertNotFailed() {
433       if (failure != null) {
434         throw new IllegalStateException("Stream already failed", failure);
435       }
436     }
437   }
438 
439   /**
440    * Breaks down the request and creates a queue of response chunks for the given request.
441    */
442   public Queue<Chunk> toChunkQueue(StreamingOutputCallRequest request) {
443     Queue<Chunk> chunkQueue = new ArrayDeque<>();
444     int offset = 0;
445     for (ResponseParameters params : request.getResponseParametersList()) {
446       chunkQueue.add(new Chunk(params.getIntervalUs(), offset, params.getSize()));
447 
448       // Increment the offset past this chunk. Buffer need to be circular.
449       offset = (offset + params.getSize()) % compressableBuffer.size();
450     }
451     return chunkQueue;
452   }
453 
454   /**
455    * A single chunk of a response stream. Contains delivery information for the dispatcher and can
456    * be converted to a streaming response proto. A chunk just references it's payload in the
457    * {@link #compressableBuffer} array. The payload isn't actually created until {@link
458    * #toResponse()} is called.
459    */
460   private class Chunk {
461     private final int delayMicroseconds;
462     private final int offset;
463     private final int length;
464 
465     public Chunk(int delayMicroseconds, int offset, int length) {
466       this.delayMicroseconds = delayMicroseconds;
467       this.offset = offset;
468       this.length = length;
469     }
470 
471     /**
472      * Convert this chunk into a streaming response proto.
473      */
474     private StreamingOutputCallResponse toResponse() {
475       StreamingOutputCallResponse.Builder responseBuilder =
476           StreamingOutputCallResponse.newBuilder();
477       ByteString payload = generatePayload(compressableBuffer, offset, length);
478       responseBuilder.setPayload(
479           Payload.newBuilder()
480               .setBody(payload));
481       return responseBuilder.build();
482     }
483   }
484 
485   /**
486    * Generates a payload of desired type and size. Reads compressableBuffer or
487    * uncompressableBuffer as a circular buffer.
488    */
489   private ByteString generatePayload(ByteString dataBuffer, int offset, int size) {
490     ByteString payload = ByteString.EMPTY;
491     // This offset would never pass the array boundary.
492     int begin = offset;
493     int end = 0;
494     int bytesLeft = size;
495     while (bytesLeft > 0) {
496       end = Math.min(begin + bytesLeft, dataBuffer.size());
497       // ByteString.substring returns the substring from begin, inclusive, to end, exclusive.
498       payload = payload.concat(dataBuffer.substring(begin, end));
499       bytesLeft -= (end - begin);
500       begin = end % dataBuffer.size();
501     }
502     return payload;
503   }
504 
505   /** Returns interceptors necessary for full service implementation. */
506   public static List<ServerInterceptor> interceptors() {
507     return Arrays.asList(
508         echoRequestHeadersInterceptor(Util.METADATA_KEY),
509         echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY),
510         echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY));
511   }
512 
513   /**
514    * Echo the request headers from a client into response headers and trailers. Useful for
515    * testing end-to-end metadata propagation.
516    */
517   private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
518     final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
519     return new ServerInterceptor() {
520       @Override
521       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
522           ServerCall<ReqT, RespT> call,
523           final Metadata requestHeaders,
524           ServerCallHandler<ReqT, RespT> next) {
525         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
526               @Override
527               public void sendHeaders(Metadata responseHeaders) {
528                 responseHeaders.merge(requestHeaders, keySet);
529                 super.sendHeaders(responseHeaders);
530               }
531 
532               @Override
533               public void close(Status status, Metadata trailers) {
534                 trailers.merge(requestHeaders, keySet);
535                 super.close(status, trailers);
536               }
537             }, requestHeaders);
538       }
539     };
540   }
541 
542   /**
543    * Echoes request headers with the specified key(s) from a client into response headers only.
544    */
545   private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
546     final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
547     return new ServerInterceptor() {
548       @Override
549       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
550           ServerCall<ReqT, RespT> call,
551           final Metadata requestHeaders,
552           ServerCallHandler<ReqT, RespT> next) {
553         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
554           @Override
555           public void sendHeaders(Metadata responseHeaders) {
556             responseHeaders.merge(requestHeaders, keySet);
557             super.sendHeaders(responseHeaders);
558           }
559 
560           @Override
561           public void close(Status status, Metadata trailers) {
562             super.close(status, trailers);
563           }
564         }, requestHeaders);
565       }
566     };
567   }
568 
569   /**
570    * Echoes request headers with the specified key(s) from a client into response trailers only.
571    */
572   private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
573     final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
574     return new ServerInterceptor() {
575       @Override
576       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
577           ServerCall<ReqT, RespT> call,
578           final Metadata requestHeaders,
579           ServerCallHandler<ReqT, RespT> next) {
580         return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
581           @Override
582           public void sendHeaders(Metadata responseHeaders) {
583             super.sendHeaders(responseHeaders);
584           }
585 
586           @Override
587           public void close(Status status, Metadata trailers) {
588             trailers.merge(requestHeaders, keySet);
589             super.close(status, trailers);
590           }
591         }, requestHeaders);
592       }
593     };
594   }
595 }
596