1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import com.google.common.base.Preconditions; 20 import com.google.common.collect.Queues; 21 import com.google.protobuf.ByteString; 22 import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; 23 import io.grpc.Metadata; 24 import io.grpc.ServerCall; 25 import io.grpc.ServerCallHandler; 26 import io.grpc.ServerInterceptor; 27 import io.grpc.Status; 28 import io.grpc.StatusRuntimeException; 29 import io.grpc.internal.LogExceptionRunnable; 30 import io.grpc.services.CallMetricRecorder; 31 import io.grpc.services.MetricRecorder; 32 import io.grpc.stub.ServerCallStreamObserver; 33 import io.grpc.stub.StreamObserver; 34 import io.grpc.testing.integration.Messages.Payload; 35 import io.grpc.testing.integration.Messages.ResponseParameters; 36 import io.grpc.testing.integration.Messages.SimpleRequest; 37 import io.grpc.testing.integration.Messages.SimpleResponse; 38 import io.grpc.testing.integration.Messages.StreamingInputCallRequest; 39 import io.grpc.testing.integration.Messages.StreamingInputCallResponse; 40 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; 41 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; 42 import io.grpc.testing.integration.Messages.TestOrcaReport; 43 import io.grpc.testing.integration.TestServiceGrpc.AsyncService; 44 import java.util.ArrayDeque; 45 import java.util.Arrays; 46 import java.util.HashMap; 47 import java.util.HashSet; 48 import java.util.List; 49 import java.util.Map; 50 import java.util.Queue; 51 import java.util.Random; 52 import java.util.Set; 53 import java.util.concurrent.Future; 54 import java.util.concurrent.ScheduledExecutorService; 55 import java.util.concurrent.Semaphore; 56 import java.util.concurrent.TimeUnit; 57 import javax.annotation.concurrent.GuardedBy; 58 59 /** 60 * Implementation of the business logic for the TestService. Uses an executor to schedule chunks 61 * sent in response streams. 62 */ 63 public class TestServiceImpl implements io.grpc.BindableService, AsyncService { 64 private final Random random = new Random(); 65 66 private final ScheduledExecutorService executor; 67 private final ByteString compressableBuffer; 68 private final MetricRecorder metricRecorder; 69 final Semaphore lock = new Semaphore(1); 70 71 /** 72 * Constructs a controller using the given executor for scheduling response stream chunks. 73 */ TestServiceImpl(ScheduledExecutorService executor, MetricRecorder metricRecorder)74 public TestServiceImpl(ScheduledExecutorService executor, MetricRecorder metricRecorder) { 75 this.executor = executor; 76 this.compressableBuffer = ByteString.copyFrom(new byte[1024]); 77 this.metricRecorder = metricRecorder; 78 } 79 TestServiceImpl(ScheduledExecutorService executor)80 public TestServiceImpl(ScheduledExecutorService executor) { 81 this(executor, MetricRecorder.newInstance()); 82 } 83 84 @Override bindService()85 public final io.grpc.ServerServiceDefinition bindService() { 86 return TestServiceGrpc.bindService(this); 87 } 88 89 @Override emptyCall(EmptyProtos.Empty request, StreamObserver<EmptyProtos.Empty> responseObserver)90 public void emptyCall(EmptyProtos.Empty request, 91 StreamObserver<EmptyProtos.Empty> responseObserver) { 92 responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance()); 93 responseObserver.onCompleted(); 94 } 95 96 /** 97 * Immediately responds with a payload of the type and size specified in the request. 98 */ 99 @Override unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver)100 public void unaryCall(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) { 101 ServerCallStreamObserver<SimpleResponse> obs = 102 (ServerCallStreamObserver<SimpleResponse>) responseObserver; 103 SimpleResponse.Builder responseBuilder = SimpleResponse.newBuilder(); 104 try { 105 if (req.hasResponseCompressed() && req.getResponseCompressed().getValue()) { 106 obs.setCompression("gzip"); 107 } else { 108 obs.setCompression("identity"); 109 } 110 } catch (IllegalArgumentException e) { 111 obs.onError(Status.UNIMPLEMENTED 112 .withDescription("compression not supported.") 113 .withCause(e) 114 .asRuntimeException()); 115 return; 116 } 117 118 if (req.getResponseSize() != 0) { 119 // For consistency with the c++ TestServiceImpl, use a random offset for unary calls. 120 // TODO(wonderfly): whether or not this is a good approach needs further discussion. 121 int offset = random.nextInt(compressableBuffer.size()); 122 ByteString payload = generatePayload(compressableBuffer, offset, req.getResponseSize()); 123 responseBuilder.setPayload( 124 Payload.newBuilder() 125 .setBody(payload)); 126 } 127 128 if (req.hasResponseStatus()) { 129 obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode()) 130 .withDescription(req.getResponseStatus().getMessage()) 131 .asRuntimeException()); 132 return; 133 } 134 135 if (req.hasOrcaPerQueryReport()) { 136 echoCallMetricsFromPayload(req.getOrcaPerQueryReport()); 137 } 138 responseObserver.onNext(responseBuilder.build()); 139 responseObserver.onCompleted(); 140 } 141 echoCallMetricsFromPayload(TestOrcaReport report)142 private static void echoCallMetricsFromPayload(TestOrcaReport report) { 143 CallMetricRecorder recorder = CallMetricRecorder.getCurrent() 144 .recordCpuUtilizationMetric(report.getCpuUtilization()) 145 .recordMemoryUtilizationMetric(report.getMemoryUtilization()); 146 for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) { 147 recorder.recordUtilizationMetric(entry.getKey(), entry.getValue()); 148 } 149 for (Map.Entry<String, Double> entry : report.getRequestCostMap().entrySet()) { 150 recorder.recordRequestCostMetric(entry.getKey(), entry.getValue()); 151 } 152 } 153 echoMetricsFromPayload(TestOrcaReport report)154 private void echoMetricsFromPayload(TestOrcaReport report) { 155 metricRecorder.setCpuUtilizationMetric(report.getCpuUtilization()); 156 metricRecorder.setMemoryUtilizationMetric(report.getMemoryUtilization()); 157 metricRecorder.setAllUtilizationMetrics(new HashMap<>()); 158 for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) { 159 metricRecorder.putUtilizationMetric(entry.getKey(), entry.getValue()); 160 } 161 } 162 163 /** 164 * Given a request that specifies chunk size and interval between responses, creates and schedules 165 * the response stream. 166 */ 167 @Override streamingOutputCall(StreamingOutputCallRequest request, StreamObserver<StreamingOutputCallResponse> responseObserver)168 public void streamingOutputCall(StreamingOutputCallRequest request, 169 StreamObserver<StreamingOutputCallResponse> responseObserver) { 170 // Create and start the response dispatcher. 171 new ResponseDispatcher(responseObserver).enqueue(toChunkQueue(request)).completeInput(); 172 } 173 174 /** 175 * Waits until we have received all of the request messages and then returns the aggregate payload 176 * size for all of the received requests. 177 */ 178 @Override streamingInputCall( final StreamObserver<Messages.StreamingInputCallResponse> responseObserver)179 public StreamObserver<Messages.StreamingInputCallRequest> streamingInputCall( 180 final StreamObserver<Messages.StreamingInputCallResponse> responseObserver) { 181 return new StreamObserver<StreamingInputCallRequest>() { 182 private int totalPayloadSize; 183 184 @Override 185 public void onNext(StreamingInputCallRequest message) { 186 totalPayloadSize += message.getPayload().getBody().size(); 187 } 188 189 @Override 190 public void onCompleted() { 191 responseObserver.onNext(StreamingInputCallResponse.newBuilder() 192 .setAggregatedPayloadSize(totalPayloadSize).build()); 193 responseObserver.onCompleted(); 194 } 195 196 @Override 197 public void onError(Throwable cause) { 198 responseObserver.onError(cause); 199 } 200 }; 201 } 202 203 /** 204 * True bi-directional streaming. Processes requests as they come in. Begins streaming results 205 * immediately. 206 */ 207 @Override fullDuplexCall( final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver)208 public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall( 209 final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) { 210 final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); 211 return new StreamObserver<StreamingOutputCallRequest>() { 212 boolean oobTestLocked; 213 214 @Override 215 public void onNext(StreamingOutputCallRequest request) { 216 // to facilitate multiple clients running orca_oob test in parallel, the server allows 217 // only one orca_oob test client to run at a time to avoid conflicts. 218 if (request.hasOrcaOobReport()) { 219 if (!oobTestLocked) { 220 try { 221 lock.acquire(); 222 } catch (InterruptedException ex) { 223 responseObserver.onError(new StatusRuntimeException( 224 Status.ABORTED.withDescription("server service interrupted").withCause(ex))); 225 return; 226 } 227 oobTestLocked = true; 228 } 229 echoMetricsFromPayload(request.getOrcaOobReport()); 230 } 231 if (request.hasResponseStatus()) { 232 dispatcher.cancel(); 233 dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode()) 234 .withDescription(request.getResponseStatus().getMessage()) 235 .asRuntimeException()); 236 return; 237 } 238 dispatcher.enqueue(toChunkQueue(request)); 239 } 240 241 @Override 242 public void onCompleted() { 243 if (oobTestLocked) { 244 lock.release(); 245 oobTestLocked = false; 246 } 247 if (!dispatcher.isCancelled()) { 248 // Tell the dispatcher that all input has been received. 249 dispatcher.completeInput(); 250 } 251 } 252 253 @Override 254 public void onError(Throwable cause) { 255 if (oobTestLocked) { 256 lock.release(); 257 oobTestLocked = false; 258 } 259 dispatcher.onError(cause); 260 } 261 }; 262 } 263 264 /** 265 * Similar to {@link #fullDuplexCall}, except that it waits for all streaming requests to be 266 * received before starting the streaming responses. 267 */ 268 @Override 269 public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall( 270 final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) { 271 final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); 272 final Queue<Chunk> chunks = new ArrayDeque<>(); 273 return new StreamObserver<StreamingOutputCallRequest>() { 274 @Override 275 public void onNext(StreamingOutputCallRequest request) { 276 chunks.addAll(toChunkQueue(request)); 277 } 278 279 @Override 280 public void onCompleted() { 281 // Dispatch all of the chunks in one shot. 282 dispatcher.enqueue(chunks).completeInput(); 283 } 284 285 @Override 286 public void onError(Throwable cause) { 287 dispatcher.onError(cause); 288 } 289 }; 290 } 291 292 /** 293 * Schedules the dispatch of a queue of chunks. Whenever chunks are added or input is completed, 294 * the next response chunk is scheduled for delivery to the client. When no more chunks are 295 * available, the stream is half-closed. 296 */ 297 private class ResponseDispatcher { 298 private final Chunk completionChunk = new Chunk(0, 0, 0); 299 private final Queue<Chunk> chunks; 300 private final StreamObserver<StreamingOutputCallResponse> responseStream; 301 private boolean scheduled; 302 @GuardedBy("this") private boolean cancelled; 303 private Throwable failure; 304 private Runnable dispatchTask = new Runnable() { 305 @Override 306 @SuppressWarnings("CatchAndPrintStackTrace") 307 public void run() { 308 try { 309 310 // Dispatch the current chunk to the client. 311 try { 312 dispatchChunk(); 313 } catch (RuntimeException e) { 314 // Indicate that nothing is scheduled and re-throw. 315 synchronized (ResponseDispatcher.this) { 316 scheduled = false; 317 } 318 throw e; 319 } 320 321 // Schedule the next chunk if there is one. 322 synchronized (ResponseDispatcher.this) { 323 // Indicate that nothing is scheduled. 324 scheduled = false; 325 scheduleNextChunk(); 326 } 327 } catch (Throwable t) { 328 t.printStackTrace(); 329 } 330 } 331 }; 332 333 /** 334 * The {@link StreamObserver} will be used to send the queue of response chunks. Since calls to 335 * {@link StreamObserver} must be synchronized across threads, no further calls should be made 336 * directly on {@code responseStream} after it is provided to the {@link ResponseDispatcher}. 337 */ 338 public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) { 339 this.chunks = Queues.newLinkedBlockingQueue(); 340 this.responseStream = responseStream; 341 } 342 343 /** 344 * Adds the given chunks to the response stream and schedules the next chunk to be delivered if 345 * needed. 346 */ 347 public synchronized ResponseDispatcher enqueue(Queue<Chunk> moreChunks) { 348 assertNotFailed(); 349 chunks.addAll(moreChunks); 350 scheduleNextChunk(); 351 return this; 352 } 353 354 /** 355 * Indicates that the input is completed and the currently enqueued response chunks are all that 356 * remain to be scheduled for dispatch to the client. 357 */ 358 public ResponseDispatcher completeInput() { 359 assertNotFailed(); 360 chunks.add(completionChunk); 361 scheduleNextChunk(); 362 return this; 363 } 364 365 /** 366 * Allows the service to cancel the remaining responses. 367 */ 368 public synchronized void cancel() { 369 Preconditions.checkState(!cancelled, "Dispatcher already cancelled"); 370 chunks.clear(); 371 cancelled = true; 372 } 373 374 public synchronized boolean isCancelled() { 375 return cancelled; 376 } 377 378 private synchronized void onError(Throwable cause) { 379 responseStream.onError(cause); 380 } 381 382 /** 383 * Dispatches the current response chunk to the client. This is only called by the executor. At 384 * any time, a given dispatch task should only be registered with the executor once. 385 */ 386 private synchronized void dispatchChunk() { 387 if (cancelled) { 388 return; 389 } 390 try { 391 // Pop off the next chunk and send it to the client. 392 Chunk chunk = chunks.remove(); 393 if (chunk == completionChunk) { 394 responseStream.onCompleted(); 395 } else { 396 responseStream.onNext(chunk.toResponse()); 397 } 398 } catch (Throwable e) { 399 failure = e; 400 if (Status.fromThrowable(e).getCode() == Status.CANCELLED.getCode()) { 401 // Stream was cancelled by client, responseStream.onError() might be called already or 402 // will be called soon by inbounding StreamObserver. 403 chunks.clear(); 404 } else { 405 responseStream.onError(e); 406 } 407 } 408 } 409 410 /** 411 * Schedules the next response chunk to be dispatched. If all input has been received and there 412 * are no more chunks in the queue, the stream is closed. 413 */ 414 private void scheduleNextChunk() { 415 synchronized (this) { 416 if (scheduled) { 417 // Dispatch task is already scheduled. 418 return; 419 } 420 421 // Schedule the next response chunk if there is one. 422 Chunk nextChunk = chunks.peek(); 423 if (nextChunk != null && !executor.isShutdown()) { 424 scheduled = true; 425 // TODO(ejona): cancel future if RPC is cancelled 426 Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask), 427 nextChunk.delayMicroseconds, TimeUnit.MICROSECONDS); 428 } 429 } 430 } 431 432 private void assertNotFailed() { 433 if (failure != null) { 434 throw new IllegalStateException("Stream already failed", failure); 435 } 436 } 437 } 438 439 /** 440 * Breaks down the request and creates a queue of response chunks for the given request. 441 */ 442 public Queue<Chunk> toChunkQueue(StreamingOutputCallRequest request) { 443 Queue<Chunk> chunkQueue = new ArrayDeque<>(); 444 int offset = 0; 445 for (ResponseParameters params : request.getResponseParametersList()) { 446 chunkQueue.add(new Chunk(params.getIntervalUs(), offset, params.getSize())); 447 448 // Increment the offset past this chunk. Buffer need to be circular. 449 offset = (offset + params.getSize()) % compressableBuffer.size(); 450 } 451 return chunkQueue; 452 } 453 454 /** 455 * A single chunk of a response stream. Contains delivery information for the dispatcher and can 456 * be converted to a streaming response proto. A chunk just references it's payload in the 457 * {@link #compressableBuffer} array. The payload isn't actually created until {@link 458 * #toResponse()} is called. 459 */ 460 private class Chunk { 461 private final int delayMicroseconds; 462 private final int offset; 463 private final int length; 464 465 public Chunk(int delayMicroseconds, int offset, int length) { 466 this.delayMicroseconds = delayMicroseconds; 467 this.offset = offset; 468 this.length = length; 469 } 470 471 /** 472 * Convert this chunk into a streaming response proto. 473 */ 474 private StreamingOutputCallResponse toResponse() { 475 StreamingOutputCallResponse.Builder responseBuilder = 476 StreamingOutputCallResponse.newBuilder(); 477 ByteString payload = generatePayload(compressableBuffer, offset, length); 478 responseBuilder.setPayload( 479 Payload.newBuilder() 480 .setBody(payload)); 481 return responseBuilder.build(); 482 } 483 } 484 485 /** 486 * Generates a payload of desired type and size. Reads compressableBuffer or 487 * uncompressableBuffer as a circular buffer. 488 */ 489 private ByteString generatePayload(ByteString dataBuffer, int offset, int size) { 490 ByteString payload = ByteString.EMPTY; 491 // This offset would never pass the array boundary. 492 int begin = offset; 493 int end = 0; 494 int bytesLeft = size; 495 while (bytesLeft > 0) { 496 end = Math.min(begin + bytesLeft, dataBuffer.size()); 497 // ByteString.substring returns the substring from begin, inclusive, to end, exclusive. 498 payload = payload.concat(dataBuffer.substring(begin, end)); 499 bytesLeft -= (end - begin); 500 begin = end % dataBuffer.size(); 501 } 502 return payload; 503 } 504 505 /** Returns interceptors necessary for full service implementation. */ 506 public static List<ServerInterceptor> interceptors() { 507 return Arrays.asList( 508 echoRequestHeadersInterceptor(Util.METADATA_KEY), 509 echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY), 510 echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY)); 511 } 512 513 /** 514 * Echo the request headers from a client into response headers and trailers. Useful for 515 * testing end-to-end metadata propagation. 516 */ 517 private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) { 518 final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys)); 519 return new ServerInterceptor() { 520 @Override 521 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 522 ServerCall<ReqT, RespT> call, 523 final Metadata requestHeaders, 524 ServerCallHandler<ReqT, RespT> next) { 525 return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { 526 @Override 527 public void sendHeaders(Metadata responseHeaders) { 528 responseHeaders.merge(requestHeaders, keySet); 529 super.sendHeaders(responseHeaders); 530 } 531 532 @Override 533 public void close(Status status, Metadata trailers) { 534 trailers.merge(requestHeaders, keySet); 535 super.close(status, trailers); 536 } 537 }, requestHeaders); 538 } 539 }; 540 } 541 542 /** 543 * Echoes request headers with the specified key(s) from a client into response headers only. 544 */ 545 private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) { 546 final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys)); 547 return new ServerInterceptor() { 548 @Override 549 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 550 ServerCall<ReqT, RespT> call, 551 final Metadata requestHeaders, 552 ServerCallHandler<ReqT, RespT> next) { 553 return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { 554 @Override 555 public void sendHeaders(Metadata responseHeaders) { 556 responseHeaders.merge(requestHeaders, keySet); 557 super.sendHeaders(responseHeaders); 558 } 559 560 @Override 561 public void close(Status status, Metadata trailers) { 562 super.close(status, trailers); 563 } 564 }, requestHeaders); 565 } 566 }; 567 } 568 569 /** 570 * Echoes request headers with the specified key(s) from a client into response trailers only. 571 */ 572 private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) { 573 final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys)); 574 return new ServerInterceptor() { 575 @Override 576 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 577 ServerCall<ReqT, RespT> call, 578 final Metadata requestHeaders, 579 ServerCallHandler<ReqT, RespT> next) { 580 return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) { 581 @Override 582 public void sendHeaders(Metadata responseHeaders) { 583 super.sendHeaders(responseHeaders); 584 } 585 586 @Override 587 public void close(Status status, Metadata trailers) { 588 trailers.merge(requestHeaders, keySet); 589 super.close(status, trailers); 590 } 591 }, requestHeaders); 592 } 593 }; 594 } 595 } 596