1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 import static io.grpc.internal.GrpcAttributes.ATTR_SECURITY_LEVEL; 23 import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER; 24 import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY; 25 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; 26 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 27 28 import com.google.common.annotations.VisibleForTesting; 29 import com.google.common.base.Throwables; 30 import com.google.common.util.concurrent.MoreExecutors; 31 import io.grpc.Attributes; 32 import io.grpc.Codec; 33 import io.grpc.Compressor; 34 import io.grpc.CompressorRegistry; 35 import io.grpc.Context; 36 import io.grpc.DecompressorRegistry; 37 import io.grpc.InternalDecompressorRegistry; 38 import io.grpc.InternalStatus; 39 import io.grpc.Metadata; 40 import io.grpc.MethodDescriptor; 41 import io.grpc.SecurityLevel; 42 import io.grpc.ServerCall; 43 import io.grpc.Status; 44 import io.perfmark.PerfMark; 45 import io.perfmark.Tag; 46 import io.perfmark.TaskCloseable; 47 import java.io.InputStream; 48 import java.util.logging.Level; 49 import java.util.logging.Logger; 50 51 final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> { 52 53 private static final Logger log = Logger.getLogger(ServerCallImpl.class.getName()); 54 55 @VisibleForTesting 56 static final String TOO_MANY_RESPONSES = "Too many responses"; 57 @VisibleForTesting 58 static final String MISSING_RESPONSE = "Completed without a response"; 59 60 private final ServerStream stream; 61 private final MethodDescriptor<ReqT, RespT> method; 62 private final Tag tag; 63 private final Context.CancellableContext context; 64 private final byte[] messageAcceptEncoding; 65 private final DecompressorRegistry decompressorRegistry; 66 private final CompressorRegistry compressorRegistry; 67 private CallTracer serverCallTracer; 68 69 // state 70 private volatile boolean cancelled; 71 private boolean sendHeadersCalled; 72 private boolean closeCalled; 73 private Compressor compressor; 74 private boolean messageSent; 75 ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method, Metadata inboundHeaders, Context.CancellableContext context, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, CallTracer serverCallTracer, Tag tag)76 ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method, 77 Metadata inboundHeaders, Context.CancellableContext context, 78 DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, 79 CallTracer serverCallTracer, Tag tag) { 80 this.stream = stream; 81 this.method = method; 82 this.context = context; 83 this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); 84 this.decompressorRegistry = decompressorRegistry; 85 this.compressorRegistry = compressorRegistry; 86 this.serverCallTracer = serverCallTracer; 87 this.serverCallTracer.reportCallStarted(); 88 this.tag = tag; 89 } 90 91 @Override request(int numMessages)92 public void request(int numMessages) { 93 try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.request")) { 94 PerfMark.attachTag(tag); 95 stream.request(numMessages); 96 } 97 } 98 99 @Override sendHeaders(Metadata headers)100 public void sendHeaders(Metadata headers) { 101 try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendHeaders")) { 102 PerfMark.attachTag(tag); 103 sendHeadersInternal(headers); 104 } 105 } 106 sendHeadersInternal(Metadata headers)107 private void sendHeadersInternal(Metadata headers) { 108 checkState(!sendHeadersCalled, "sendHeaders has already been called"); 109 checkState(!closeCalled, "call is closed"); 110 111 headers.discardAll(CONTENT_LENGTH_KEY); 112 headers.discardAll(MESSAGE_ENCODING_KEY); 113 if (compressor == null) { 114 compressor = Codec.Identity.NONE; 115 } else { 116 if (messageAcceptEncoding != null) { 117 // TODO(carl-mastrangelo): remove the string allocation. 118 if (!GrpcUtil.iterableContains( 119 ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)), 120 compressor.getMessageEncoding())) { 121 // resort to using no compression. 122 compressor = Codec.Identity.NONE; 123 } 124 } else { 125 compressor = Codec.Identity.NONE; 126 } 127 } 128 129 // Always put compressor, even if it's identity. 130 headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); 131 132 stream.setCompressor(compressor); 133 134 headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY); 135 byte[] advertisedEncodings = 136 InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry); 137 if (advertisedEncodings.length != 0) { 138 headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings); 139 } 140 141 // Don't check if sendMessage has been called, since it requires that sendHeaders was already 142 // called. 143 sendHeadersCalled = true; 144 stream.writeHeaders(headers); 145 } 146 147 @Override sendMessage(RespT message)148 public void sendMessage(RespT message) { 149 try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendMessage")) { 150 PerfMark.attachTag(tag); 151 sendMessageInternal(message); 152 } 153 } 154 sendMessageInternal(RespT message)155 private void sendMessageInternal(RespT message) { 156 checkState(sendHeadersCalled, "sendHeaders has not been called"); 157 checkState(!closeCalled, "call is closed"); 158 159 if (method.getType().serverSendsOneMessage() && messageSent) { 160 internalClose(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES)); 161 return; 162 } 163 164 messageSent = true; 165 try { 166 InputStream resp = method.streamResponse(message); 167 stream.writeMessage(resp); 168 if (!getMethodDescriptor().getType().serverSendsOneMessage()) { 169 stream.flush(); 170 } 171 } catch (RuntimeException e) { 172 close(Status.fromThrowable(e), new Metadata()); 173 } catch (Error e) { 174 close( 175 Status.CANCELLED.withDescription("Server sendMessage() failed with Error"), 176 new Metadata()); 177 throw e; 178 } 179 } 180 181 @Override setMessageCompression(boolean enable)182 public void setMessageCompression(boolean enable) { 183 stream.setMessageCompression(enable); 184 } 185 186 @Override setCompression(String compressorName)187 public void setCompression(String compressorName) { 188 // Added here to give a better error message. 189 checkState(!sendHeadersCalled, "sendHeaders has been called"); 190 191 compressor = compressorRegistry.lookupCompressor(compressorName); 192 checkArgument(compressor != null, "Unable to find compressor by name %s", compressorName); 193 } 194 195 @Override isReady()196 public boolean isReady() { 197 if (closeCalled) { 198 return false; 199 } 200 return stream.isReady(); 201 } 202 203 @Override close(Status status, Metadata trailers)204 public void close(Status status, Metadata trailers) { 205 try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.close")) { 206 PerfMark.attachTag(tag); 207 closeInternal(status, trailers); 208 } 209 } 210 closeInternal(Status status, Metadata trailers)211 private void closeInternal(Status status, Metadata trailers) { 212 checkState(!closeCalled, "call already closed"); 213 try { 214 closeCalled = true; 215 216 if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) { 217 internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE)); 218 return; 219 } 220 221 stream.close(status, trailers); 222 } finally { 223 serverCallTracer.reportCallEnded(status.isOk()); 224 } 225 } 226 227 @Override isCancelled()228 public boolean isCancelled() { 229 return cancelled; 230 } 231 newServerStreamListener(ServerCall.Listener<ReqT> listener)232 ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener) { 233 return new ServerStreamListenerImpl<>(this, listener, context); 234 } 235 236 @Override getAttributes()237 public Attributes getAttributes() { 238 return stream.getAttributes(); 239 } 240 241 @Override getAuthority()242 public String getAuthority() { 243 return stream.getAuthority(); 244 } 245 246 @Override getMethodDescriptor()247 public MethodDescriptor<ReqT, RespT> getMethodDescriptor() { 248 return method; 249 } 250 251 @Override getSecurityLevel()252 public SecurityLevel getSecurityLevel() { 253 final Attributes attributes = getAttributes(); 254 if (attributes == null) { 255 return super.getSecurityLevel(); 256 } 257 final SecurityLevel securityLevel = attributes.get(ATTR_SECURITY_LEVEL); 258 return securityLevel == null ? super.getSecurityLevel() : securityLevel; 259 } 260 261 /** 262 * Close the {@link ServerStream} because an internal error occurred. Allow the application to 263 * run until completion, but silently ignore interactions with the {@link ServerStream} from now 264 * on. 265 */ internalClose(Status internalError)266 private void internalClose(Status internalError) { 267 log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError}); 268 stream.cancel(internalError); 269 serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false 270 } 271 272 /** 273 * All of these callbacks are assumed to called on an application thread, and the caller is 274 * responsible for handling thrown exceptions. 275 */ 276 @VisibleForTesting 277 static final class ServerStreamListenerImpl<ReqT> implements ServerStreamListener { 278 private final ServerCallImpl<ReqT, ?> call; 279 private final ServerCall.Listener<ReqT> listener; 280 private final Context.CancellableContext context; 281 ServerStreamListenerImpl( ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener, Context.CancellableContext context)282 public ServerStreamListenerImpl( 283 ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener, 284 Context.CancellableContext context) { 285 this.call = checkNotNull(call, "call"); 286 this.listener = checkNotNull(listener, "listener must not be null"); 287 this.context = checkNotNull(context, "context"); 288 // Wire ourselves up so that if the context is cancelled, our flag call.cancelled also 289 // reflects the new state. Use a DirectExecutor so that it happens in the same thread 290 // as the caller of {@link Context#cancel}. 291 this.context.addListener( 292 new Context.CancellationListener() { 293 @Override 294 public void cancelled(Context context) { 295 // If the context has a cancellation cause then something exceptional happened 296 // and we should also mark the call as cancelled. 297 if (context.cancellationCause() != null) { 298 ServerStreamListenerImpl.this.call.cancelled = true; 299 } 300 } 301 }, 302 MoreExecutors.directExecutor()); 303 } 304 305 @Override messagesAvailable(MessageProducer producer)306 public void messagesAvailable(MessageProducer producer) { 307 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) { 308 PerfMark.attachTag(call.tag); 309 messagesAvailableInternal(producer); 310 } 311 } 312 313 @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try messagesAvailableInternal(final MessageProducer producer)314 private void messagesAvailableInternal(final MessageProducer producer) { 315 if (call.cancelled) { 316 GrpcUtil.closeQuietly(producer); 317 return; 318 } 319 320 InputStream message; 321 try { 322 while ((message = producer.next()) != null) { 323 try { 324 listener.onMessage(call.method.parseRequest(message)); 325 } catch (Throwable t) { 326 GrpcUtil.closeQuietly(message); 327 throw t; 328 } 329 message.close(); 330 } 331 } catch (Throwable t) { 332 GrpcUtil.closeQuietly(producer); 333 Throwables.throwIfUnchecked(t); 334 throw new RuntimeException(t); 335 } 336 } 337 338 @Override halfClosed()339 public void halfClosed() { 340 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) { 341 PerfMark.attachTag(call.tag); 342 if (call.cancelled) { 343 return; 344 } 345 346 listener.onHalfClose(); 347 } 348 } 349 350 @Override closed(Status status)351 public void closed(Status status) { 352 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) { 353 PerfMark.attachTag(call.tag); 354 closedInternal(status); 355 } 356 } 357 closedInternal(Status status)358 private void closedInternal(Status status) { 359 Throwable cancelCause = null; 360 try { 361 if (status.isOk()) { 362 listener.onComplete(); 363 } else { 364 call.cancelled = true; 365 listener.onCancel(); 366 // The status will not have a cause in all failure scenarios but we want to make sure 367 // we always cancel the context with one to keep the context cancelled state consistent. 368 cancelCause = InternalStatus.asRuntimeException( 369 Status.CANCELLED.withDescription("RPC cancelled"), null, false); 370 } 371 } finally { 372 // Cancel context after delivering RPC closure notification to allow the application to 373 // clean up and update any state based on whether onComplete or onCancel was called. 374 context.cancel(cancelCause); 375 } 376 } 377 378 @Override onReady()379 public void onReady() { 380 try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) { 381 PerfMark.attachTag(call.tag); 382 if (call.cancelled) { 383 return; 384 } 385 listener.onReady(); 386 } 387 } 388 } 389 } 390