1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import com.google.common.base.Preconditions; 20 import com.google.common.collect.Queues; 21 import com.google.protobuf.ByteString; 22 import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; 23 import io.grpc.Metadata; 24 import io.grpc.ServerCall; 25 import io.grpc.ServerCallHandler; 26 import io.grpc.ServerInterceptor; 27 import io.grpc.Status; 28 import io.grpc.internal.LogExceptionRunnable; 29 import io.grpc.stub.ServerCallStreamObserver; 30 import io.grpc.stub.StreamObserver; 31 import io.grpc.testing.integration.Messages.Payload; 32 import io.grpc.testing.integration.Messages.PayloadType; 33 import io.grpc.testing.integration.Messages.ResponseParameters; 34 import io.grpc.testing.integration.Messages.SimpleRequest; 35 import io.grpc.testing.integration.Messages.SimpleResponse; 36 import io.grpc.testing.integration.Messages.StreamingInputCallRequest; 37 import io.grpc.testing.integration.Messages.StreamingInputCallResponse; 38 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; 39 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; 40 import java.io.IOException; 41 import java.io.InputStream; 42 import java.util.ArrayDeque; 43 import java.util.Arrays; 44 import java.util.HashSet; 45 import java.util.List; 46 import java.util.Queue; 47 import java.util.Random; 48 import java.util.Set; 49 import java.util.concurrent.Future; 50 import java.util.concurrent.ScheduledExecutorService; 51 import java.util.concurrent.TimeUnit; 52 import javax.annotation.concurrent.GuardedBy; 53 54 /** 55 * Implementation of the business logic for the TestService. Uses an executor to schedule chunks 56 * sent in response streams. 57 */ 58 public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { 59 private static final String UNCOMPRESSABLE_FILE = 60 "/io/grpc/testing/integration/testdata/uncompressable.bin"; 61 private final Random random = new Random(); 62 63 private final ScheduledExecutorService executor; 64 private final ByteString uncompressableBuffer; 65 private final ByteString compressableBuffer; 66 67 /** 68 * Constructs a controller using the given executor for scheduling response stream chunks. 69 */ TestServiceImpl(ScheduledExecutorService executor)70 public TestServiceImpl(ScheduledExecutorService executor) { 71 this.executor = executor; 72 this.compressableBuffer = ByteString.copyFrom(new byte[1024]); 73 this.uncompressableBuffer = createBufferFromFile(UNCOMPRESSABLE_FILE); 74 } 75 76 @Override emptyCall(EmptyProtos.Empty empty, StreamObserver<EmptyProtos.Empty> responseObserver)77 public void emptyCall(EmptyProtos.Empty empty, 78 StreamObserver<EmptyProtos.Empty> responseObserver) { 79 responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance()); 80 responseObserver.onCompleted(); 81 } 82 83 /** 84 * Immediately responds with a payload of the type and size specified in the request. 85 */ 86 @Override unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver)87 public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) { 88 ServerCallStreamObserver<SimpleResponse> obs = 89 (ServerCallStreamObserver<SimpleResponse>) responseObserver; 90 SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder(); 91 try { 92 if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) { 93 obs.setCompression("gzip"); 94 } else { 95 obs.setCompression("identity"); 96 } 97 } catch (IllegalArgumentException e) { 98 obs.onError(Status.UNIMPLEMENTED 99 .withDescription("compression not supported.") 100 .withCause(e) 101 .asRuntimeException()); 102 return; 103 } 104 105 if (req.getResponseSize() != 0) { 106 boolean compressable = compressableResponse(req.getResponseType()); 107 ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer; 108 // For consistency with the c++ TestServiceImpl, use a random offset for unary calls. 109 // TODO(wonderfly): whether or not this is a good approach needs further discussion. 110 int offset = random.nextInt( 111 compressable ? compressableBuffer.size() : uncompressableBuffer.size()); 112 ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize()); 113 responseBuilder.setPayload( 114 Payload.newBuilder() 115 .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE) 116 .setBody(payload)); 117 } 118 119 if (req.hasResponseStatus()) { 120 obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode()) 121 .withDescription(req.getResponseStatus().getMessage()) 122 .asRuntimeException()); 123 return; 124 } 125 126 responseObserver.onNext(responseBuilder.build()); 127 responseObserver.onCompleted(); 128 } 129 130 /** 131 * Given a request that specifies chunk size and interval between responses, creates and schedules 132 * the response stream. 133 */ 134 @Override streamingOutputCall(StreamingOutputCallRequest request, StreamObserver<StreamingOutputCallResponse> responseObserver)135 public void streamingOutputCall(StreamingOutputCallRequest request, 136 StreamObserver<StreamingOutputCallResponse> responseObserver) { 137 // Create and start the response dispatcher. 138 new ResponseDispatcher(responseObserver).enqueue(toChunkQueue(request)).completeInput(); 139 } 140 141 /** 142 * Waits until we have received all of the request messages and then returns the aggregate payload 143 * size for all of the received requests. 144 */ 145 @Override streamingInputCall( final StreamObserver<Messages.StreamingInputCallResponse> responseObserver)146 public StreamObserver<Messages.StreamingInputCallRequest> streamingInputCall( 147 final StreamObserver<Messages.StreamingInputCallResponse> responseObserver) { 148 return new StreamObserver<StreamingInputCallRequest>() { 149 private int totalPayloadSize; 150 151 @Override 152 public void onNext(StreamingInputCallRequest message) { 153 totalPayloadSize += message.getPayload().getBody().size(); 154 } 155 156 @Override 157 public void onCompleted() { 158 responseObserver.onNext(StreamingInputCallResponse.newBuilder() 159 .setAggregatedPayloadSize(totalPayloadSize).build()); 160 responseObserver.onCompleted(); 161 } 162 163 @Override 164 public void onError(Throwable cause) { 165 responseObserver.onError(cause); 166 } 167 }; 168 } 169 170 /** 171 * True bi-directional streaming. Processes requests as they come in. Begins streaming results 172 * immediately. 173 */ 174 @Override fullDuplexCall( final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver)175 public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall( 176 final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) { 177 final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); 178 return new StreamObserver<StreamingOutputCallRequest>() { 179 @Override 180 public void onNext(StreamingOutputCallRequest request) { 181 if (request.hasResponseStatus()) { 182 dispatcher.cancel(); 183 dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode()) 184 .withDescription(request.getResponseStatus().getMessage()) 185 .asRuntimeException()); 186 return; 187 } 188 dispatcher.enqueue(toChunkQueue(request)); 189 } 190 191 @Override 192 public void onCompleted() { 193 if (!dispatcher.isCancelled()) { 194 // Tell the dispatcher that all input has been received. 195 dispatcher.completeInput(); 196 } 197 } 198 199 @Override 200 public void onError(Throwable cause) { 201 dispatcher.onError(cause); 202 } 203 }; 204 } 205 206 /** 207 * Similar to {@link #fullDuplexCall}, except that it waits for all streaming requests to be 208 * received before starting the streaming responses. 209 */ 210 @Override 211 public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall( 212 final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) { 213 final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); 214 final Queue<Chunk> chunks = new ArrayDeque<Chunk>(); 215 return new StreamObserver<StreamingOutputCallRequest>() { 216 @Override 217 public void onNext(StreamingOutputCallRequest request) { 218 chunks.addAll(toChunkQueue(request)); 219 } 220 221 @Override 222 public void onCompleted() { 223 // Dispatch all of the chunks in one shot. 224 dispatcher.enqueue(chunks).completeInput(); 225 } 226 227 @Override 228 public void onError(Throwable cause) { 229 dispatcher.onError(cause); 230 } 231 }; 232 } 233 234 /** 235 * Schedules the dispatch of a queue of chunks. Whenever chunks are added or input is completed, 236 * the next response chunk is scheduled for delivery to the client. When no more chunks are 237 * available, the stream is half-closed. 238 */ 239 private class ResponseDispatcher { 240 private final Chunk completionChunk = new Chunk(0, 0, 0, false); 241 private final Queue<Chunk> chunks; 242 private final StreamObserver<StreamingOutputCallResponse> responseStream; 243 private boolean scheduled; 244 @GuardedBy("this") private boolean cancelled; 245 private Throwable failure; 246 private Runnable dispatchTask = new Runnable() { 247 @Override 248 public void run() { 249 try { 250 251 // Dispatch the current chunk to the client. 252 try { 253 dispatchChunk(); 254 } catch (RuntimeException e) { 255 // Indicate that nothing is scheduled and re-throw. 256 synchronized (ResponseDispatcher.this) { 257 scheduled = false; 258 } 259 throw e; 260 } 261 262 // Schedule the next chunk if there is one. 263 synchronized (ResponseDispatcher.this) { 264 // Indicate that nothing is scheduled. 265 scheduled = false; 266 scheduleNextChunk(); 267 } 268 } catch (Throwable t) { 269 t.printStackTrace(); 270 } 271 } 272 }; 273 274 /** 275 * The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to 276 * {@link StreamObserver} must be synchronized across threads, no further calls should be made 277 * directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}. 278 */ 279 public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) { 280 this.chunks = Queues.newLinkedBlockingQueue(); 281 this.responseStream = responseStream; 282 } 283 284 /** 285 * Adds the given chunks to the response stream and schedules the next chunk to be delivered if 286 * needed. 287 */ 288 public synchronized ResponseDispatcher enqueue(Queue<Chunk> moreChunks) { 289 assertNotFailed(); 290 chunks.addAll(moreChunks); 291 scheduleNextChunk(); 292 return this; 293 } 294 295 /** 296 * Indicates that the input is completed and the currently enqueued response chunks are all that 297 * remain to be scheduled for dispatch to the client. 298 */ 299 public ResponseDispatcher completeInput() { 300 assertNotFailed(); 301 chunks.add(completionChunk); 302 scheduleNextChunk(); 303 return this; 304 } 305 306 /** 307 * Allows the service to cancel the remaining responses. 308 */ 309 public synchronized void cancel() { 310 Preconditions.checkState(!cancelled, "Dispatcher already cancelled"); 311 chunks.clear(); 312 cancelled = true; 313 } 314 315 public synchronized boolean isCancelled() { 316 return cancelled; 317 } 318 319 private synchronized void onError(Throwable cause) { 320 responseStream.onError(cause); 321 } 322 323 /** 324 * Dispatches the current response chunk to the client. This is only called by the executor. At 325 * any time, a given dispatch task should only be registered with the executor once. 326 */ 327 private synchronized void dispatchChunk() { 328 if (cancelled) { 329 return; 330 } 331 try { 332 // Pop off the next chunk and send it to the client. 333 Chunk chunk = chunks.remove(); 334 if (chunk == completionChunk) { 335 responseStream.onCompleted(); 336 } else { 337 responseStream.onNext(chunk.toResponse()); 338 } 339 } catch (Throwable e) { 340 failure = e; 341 if (Status.fromThrowable(e).getCode() == Status.CANCELLED.getCode()) { 342 // Stream was cancelled by client, responseStream.onError() might be called already or 343 // will be called soon by inbounding StreamObserver. 344 chunks.clear(); 345 } else { 346 responseStream.onError(e); 347 } 348 } 349 } 350 351 /** 352 * Schedules the next response chunk to be dispatched. If all input has been received and there 353 * are no more chunks in the queue, the stream is closed. 354 */ 355 private void scheduleNextChunk() { 356 synchronized (this) { 357 if (scheduled) { 358 // Dispatch task is already scheduled. 359 return; 360 } 361 362 // Schedule the next response chunk if there is one. 363 Chunk nextChunk = chunks.peek(); 364 if (nextChunk != null) { 365 scheduled = true; 366 // TODO(ejona): cancel future if RPC is cancelled 367 Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask), 368 nextChunk.delayMicroseconds, TimeUnit.MICROSECONDS); 369 return; 370 } 371 } 372 } 373 374 private void assertNotFailed() { 375 if (failure != null) { 376 throw new IllegalStateException("Stream already failed", failure); 377 } 378 } 379 } 380 381 /** 382 * Breaks down the request and creates a queue of response chunks for the given request. 383 */ 384 public Queue<Chunk> toChunkQueue(StreamingOutputCallRequest request) { 385 Queue<Chunk> chunkQueue = new ArrayDeque<Chunk>(); 386 int offset = 0; 387 boolean compressable = compressableResponse(request.getResponseType()); 388 for (ResponseParameters params : request.getResponseParametersList()) { 389 chunkQueue.add(new Chunk(params.getIntervalUs(), offset, params.getSize(), compressable)); 390 391 // Increment the offset past this chunk. 392 // Both buffers need to be circular. 393 offset = (offset + params.getSize()) 394 % (compressable ? compressableBuffer.size() : uncompressableBuffer.size()); 395 } 396 return chunkQueue; 397 } 398 399 /** 400 * A single chunk of a response stream. Contains delivery information for the dispatcher and can 401 * be converted to a streaming response proto. A chunk just references it's payload in the 402 * {@link #uncompressableBuffer} array. The payload isn't actually created until {@link 403 * #toResponse()} is called. 404 */ 405 private class Chunk { 406 private final int delayMicroseconds; 407 private final int offset; 408 private final int length; 409 private final boolean compressable; 410 411 public Chunk(int delayMicroseconds, int offset, int length, boolean compressable) { 412 this.delayMicroseconds = delayMicroseconds; 413 this.offset = offset; 414 this.length = length; 415 this.compressable = compressable; 416 } 417 418 /** 419 * Convert this chunk into a streaming response proto. 420 */ 421 private StreamingOutputCallResponse toResponse() { 422 StreamingOutputCallResponse.Builder responseBuilder = 423 StreamingOutputCallResponse.newBuilder(); 424 ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer; 425 ByteString payload = generatePayload(dataBuffer, offset, length); 426 responseBuilder.setPayload( 427 Payload.newBuilder() 428 .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE) 429 .setBody(payload)); 430 return responseBuilder.build(); 431 } 432 } 433 434 /** 435 * Creates a buffer with data read from a file. 436 */ 437 @SuppressWarnings("Finally") // Not concerned about suppression; expected to be exceedingly rare 438 private ByteString createBufferFromFile(String fileClassPath) { 439 ByteString buffer = ByteString.EMPTY; 440 InputStream inputStream = getClass().getResourceAsStream(fileClassPath); 441 if (inputStream == null) { 442 throw new IllegalArgumentException("Unable to locate file on classpath: " + fileClassPath); 443 } 444 445 try { 446 buffer = ByteString.readFrom(inputStream); 447 } catch (IOException e) { 448 throw new RuntimeException(e); 449 } finally { 450 try { 451 inputStream.close(); 452 } catch (IOException ignorable) { 453 // ignore 454 } 455 } 456 return buffer; 457 } 458 459 /** 460 * Indicates whether or not the response for this type should be compressable. If {@code RANDOM}, 461 * picks a random boolean. 462 */ 463 private boolean compressableResponse(PayloadType responseType) { 464 switch (responseType) { 465 case COMPRESSABLE: 466 return true; 467 case RANDOM: 468 return random.nextBoolean(); 469 case UNCOMPRESSABLE: 470 default: 471 return false; 472 } 473 } 474 475 /** 476 * Generates a payload of desired type and size. Reads compressableBuffer or 477 * uncompressableBuffer as a circular buffer. 478 */ 479 private ByteString generatePayload(ByteString dataBuffer, int offset, int size) { 480 ByteString payload = ByteString.EMPTY; 481 // This offset would never pass the array boundary. 482 int begin = offset; 483 int end = 0; 484 int bytesLeft = size; 485 while (bytesLeft > 0) { 486 end = Math.min(begin + bytesLeft, dataBuffer.size()); 487 // ByteString.substring returns the substring from begin, inclusive, to end, exclusive. 488 payload = payload.concat(dataBuffer.substring(begin, end)); 489 bytesLeft -= (end - begin); 490 begin = end % dataBuffer.size(); 491 } 492 return payload; 493 } 494 495 /** Returns interceptors necessary for full service implementation. */ 496 public static List<ServerInterceptor> interceptors() { 497 return Arrays.asList( 498 echoRequestHeadersInterceptor(Util.METADATA_KEY), 499 echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY), 500 echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY)); 501 } 502 503 /** 504 * Echo the request headers from a client into response headers and trailers. Useful for 505 * testing end-to-end metadata propagation. 506 */ 507 private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) { 508 final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys)); 509 return new ServerInterceptor() { 510 @Override 511 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 512 ServerCall<ReqT, RespT> call, 513 final Metadata requestHeaders, 514 ServerCallHandler<ReqT, RespT> next) { 515 return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { 516 @Override 517 public void sendHeaders(Metadata responseHeaders) { 518 responseHeaders.merge(requestHeaders, keySet); 519 super.sendHeaders(responseHeaders); 520 } 521 522 @Override 523 public void close(Status status, Metadata trailers) { 524 trailers.merge(requestHeaders, keySet); 525 super.close(status, trailers); 526 } 527 }, requestHeaders); 528 } 529 }; 530 } 531 532 /** 533 * Echoes request headers with the specified key(s) from a client into response headers only. 534 */ 535 private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) { 536 final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys)); 537 return new ServerInterceptor() { 538 @Override 539 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 540 ServerCall<ReqT, RespT> call, 541 final Metadata requestHeaders, 542 ServerCallHandler<ReqT, RespT> next) { 543 return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { 544 @Override 545 public void sendHeaders(Metadata responseHeaders) { 546 responseHeaders.merge(requestHeaders, keySet); 547 super.sendHeaders(responseHeaders); 548 } 549 550 @Override 551 public void close(Status status, Metadata trailers) { 552 super.close(status, trailers); 553 } 554 }, requestHeaders); 555 } 556 }; 557 } 558 559 /** 560 * Echoes request headers with the specified key(s) from a client into response trailers only. 561 */ 562 private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) { 563 final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys)); 564 return new ServerInterceptor() { 565 @Override 566 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 567 ServerCall<ReqT, RespT> call, 568 final Metadata requestHeaders, 569 ServerCallHandler<ReqT, RespT> next) { 570 return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { 571 @Override 572 public void sendHeaders(Metadata responseHeaders) { 573 super.sendHeaders(responseHeaders); 574 } 575 576 @Override 577 public void close(Status status, Metadata trailers) { 578 trailers.merge(requestHeaders, keySet); 579 super.close(status, trailers); 580 } 581 }, requestHeaders); 582 } 583 }; 584 } 585 } 586