• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static com.google.common.base.Preconditions.checkState;
22 import static io.grpc.internal.GrpcAttributes.ATTR_SECURITY_LEVEL;
23 import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
24 import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
25 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
26 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
27 
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.common.base.Throwables;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import io.grpc.Attributes;
32 import io.grpc.Codec;
33 import io.grpc.Compressor;
34 import io.grpc.CompressorRegistry;
35 import io.grpc.Context;
36 import io.grpc.DecompressorRegistry;
37 import io.grpc.InternalDecompressorRegistry;
38 import io.grpc.InternalStatus;
39 import io.grpc.Metadata;
40 import io.grpc.MethodDescriptor;
41 import io.grpc.SecurityLevel;
42 import io.grpc.ServerCall;
43 import io.grpc.Status;
44 import io.perfmark.PerfMark;
45 import io.perfmark.Tag;
46 import io.perfmark.TaskCloseable;
47 import java.io.InputStream;
48 import java.util.logging.Level;
49 import java.util.logging.Logger;
50 
51 final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
52 
53   private static final Logger log = Logger.getLogger(ServerCallImpl.class.getName());
54 
55   @VisibleForTesting
56   static final String TOO_MANY_RESPONSES = "Too many responses";
57   @VisibleForTesting
58   static final String MISSING_RESPONSE = "Completed without a response";
59 
60   private final ServerStream stream;
61   private final MethodDescriptor<ReqT, RespT> method;
62   private final Tag tag;
63   private final Context.CancellableContext context;
64   private final byte[] messageAcceptEncoding;
65   private final DecompressorRegistry decompressorRegistry;
66   private final CompressorRegistry compressorRegistry;
67   private CallTracer serverCallTracer;
68 
69   // state
70   private volatile boolean cancelled;
71   private boolean sendHeadersCalled;
72   private boolean closeCalled;
73   private Compressor compressor;
74   private boolean messageSent;
75 
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method, Metadata inboundHeaders, Context.CancellableContext context, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, CallTracer serverCallTracer, Tag tag)76   ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
77       Metadata inboundHeaders, Context.CancellableContext context,
78       DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
79       CallTracer serverCallTracer, Tag tag) {
80     this.stream = stream;
81     this.method = method;
82     this.context = context;
83     this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
84     this.decompressorRegistry = decompressorRegistry;
85     this.compressorRegistry = compressorRegistry;
86     this.serverCallTracer = serverCallTracer;
87     this.serverCallTracer.reportCallStarted();
88     this.tag = tag;
89   }
90 
91   @Override
request(int numMessages)92   public void request(int numMessages) {
93     try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.request")) {
94       PerfMark.attachTag(tag);
95       stream.request(numMessages);
96     }
97   }
98 
99   @Override
sendHeaders(Metadata headers)100   public void sendHeaders(Metadata headers) {
101     try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendHeaders")) {
102       PerfMark.attachTag(tag);
103       sendHeadersInternal(headers);
104     }
105   }
106 
sendHeadersInternal(Metadata headers)107   private void sendHeadersInternal(Metadata headers) {
108     checkState(!sendHeadersCalled, "sendHeaders has already been called");
109     checkState(!closeCalled, "call is closed");
110 
111     headers.discardAll(CONTENT_LENGTH_KEY);
112     headers.discardAll(MESSAGE_ENCODING_KEY);
113     if (compressor == null) {
114       compressor = Codec.Identity.NONE;
115     } else {
116       if (messageAcceptEncoding != null) {
117         // TODO(carl-mastrangelo): remove the string allocation.
118         if (!GrpcUtil.iterableContains(
119             ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)),
120             compressor.getMessageEncoding())) {
121           // resort to using no compression.
122           compressor = Codec.Identity.NONE;
123         }
124       } else {
125         compressor = Codec.Identity.NONE;
126       }
127     }
128 
129     // Always put compressor, even if it's identity.
130     headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
131 
132     stream.setCompressor(compressor);
133 
134     headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
135     byte[] advertisedEncodings =
136         InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
137     if (advertisedEncodings.length != 0) {
138       headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
139     }
140 
141     // Don't check if sendMessage has been called, since it requires that sendHeaders was already
142     // called.
143     sendHeadersCalled = true;
144     stream.writeHeaders(headers);
145   }
146 
147   @Override
sendMessage(RespT message)148   public void sendMessage(RespT message) {
149     try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendMessage")) {
150       PerfMark.attachTag(tag);
151       sendMessageInternal(message);
152     }
153   }
154 
sendMessageInternal(RespT message)155   private void sendMessageInternal(RespT message) {
156     checkState(sendHeadersCalled, "sendHeaders has not been called");
157     checkState(!closeCalled, "call is closed");
158 
159     if (method.getType().serverSendsOneMessage() && messageSent) {
160       internalClose(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES));
161       return;
162     }
163 
164     messageSent = true;
165     try {
166       InputStream resp = method.streamResponse(message);
167       stream.writeMessage(resp);
168       if (!getMethodDescriptor().getType().serverSendsOneMessage()) {
169         stream.flush();
170       }
171     } catch (RuntimeException e) {
172       close(Status.fromThrowable(e), new Metadata());
173     } catch (Error e) {
174       close(
175           Status.CANCELLED.withDescription("Server sendMessage() failed with Error"),
176           new Metadata());
177       throw e;
178     }
179   }
180 
181   @Override
setMessageCompression(boolean enable)182   public void setMessageCompression(boolean enable) {
183     stream.setMessageCompression(enable);
184   }
185 
186   @Override
setCompression(String compressorName)187   public void setCompression(String compressorName) {
188     // Added here to give a better error message.
189     checkState(!sendHeadersCalled, "sendHeaders has been called");
190 
191     compressor = compressorRegistry.lookupCompressor(compressorName);
192     checkArgument(compressor != null, "Unable to find compressor by name %s", compressorName);
193   }
194 
195   @Override
isReady()196   public boolean isReady() {
197     if (closeCalled) {
198       return false;
199     }
200     return stream.isReady();
201   }
202 
203   @Override
close(Status status, Metadata trailers)204   public void close(Status status, Metadata trailers) {
205     try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.close")) {
206       PerfMark.attachTag(tag);
207       closeInternal(status, trailers);
208     }
209   }
210 
closeInternal(Status status, Metadata trailers)211   private void closeInternal(Status status, Metadata trailers) {
212     checkState(!closeCalled, "call already closed");
213     try {
214       closeCalled = true;
215 
216       if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
217         internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE));
218         return;
219       }
220 
221       stream.close(status, trailers);
222     } finally {
223       serverCallTracer.reportCallEnded(status.isOk());
224     }
225   }
226 
227   @Override
isCancelled()228   public boolean isCancelled() {
229     return cancelled;
230   }
231 
newServerStreamListener(ServerCall.Listener<ReqT> listener)232   ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener) {
233     return new ServerStreamListenerImpl<>(this, listener, context);
234   }
235 
236   @Override
getAttributes()237   public Attributes getAttributes() {
238     return stream.getAttributes();
239   }
240 
241   @Override
getAuthority()242   public String getAuthority() {
243     return stream.getAuthority();
244   }
245 
246   @Override
getMethodDescriptor()247   public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
248     return method;
249   }
250 
251   @Override
getSecurityLevel()252   public SecurityLevel getSecurityLevel() {
253     final Attributes attributes = getAttributes();
254     if (attributes == null) {
255       return super.getSecurityLevel();
256     }
257     final SecurityLevel securityLevel = attributes.get(ATTR_SECURITY_LEVEL);
258     return securityLevel == null ? super.getSecurityLevel() : securityLevel;
259   }
260 
261   /**
262    * Close the {@link ServerStream} because an internal error occurred. Allow the application to
263    * run until completion, but silently ignore interactions with the {@link ServerStream} from now
264    * on.
265    */
internalClose(Status internalError)266   private void internalClose(Status internalError) {
267     log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
268     stream.cancel(internalError);
269     serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false
270   }
271 
272   /**
273    * All of these callbacks are assumed to called on an application thread, and the caller is
274    * responsible for handling thrown exceptions.
275    */
276   @VisibleForTesting
277   static final class ServerStreamListenerImpl<ReqT> implements ServerStreamListener {
278     private final ServerCallImpl<ReqT, ?> call;
279     private final ServerCall.Listener<ReqT> listener;
280     private final Context.CancellableContext context;
281 
ServerStreamListenerImpl( ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener, Context.CancellableContext context)282     public ServerStreamListenerImpl(
283         ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener,
284         Context.CancellableContext context) {
285       this.call = checkNotNull(call, "call");
286       this.listener = checkNotNull(listener, "listener must not be null");
287       this.context = checkNotNull(context, "context");
288       // Wire ourselves up so that if the context is cancelled, our flag call.cancelled also
289       // reflects the new state. Use a DirectExecutor so that it happens in the same thread
290       // as the caller of {@link Context#cancel}.
291       this.context.addListener(
292           new Context.CancellationListener() {
293             @Override
294             public void cancelled(Context context) {
295               // If the context has a cancellation cause then something exceptional happened
296               // and we should also mark the call as cancelled.
297               if (context.cancellationCause() != null) {
298                 ServerStreamListenerImpl.this.call.cancelled = true;
299               }
300             }
301           },
302           MoreExecutors.directExecutor());
303     }
304 
305     @Override
messagesAvailable(MessageProducer producer)306     public void messagesAvailable(MessageProducer producer) {
307       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) {
308         PerfMark.attachTag(call.tag);
309         messagesAvailableInternal(producer);
310       }
311     }
312 
313     @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
messagesAvailableInternal(final MessageProducer producer)314     private void messagesAvailableInternal(final MessageProducer producer) {
315       if (call.cancelled) {
316         GrpcUtil.closeQuietly(producer);
317         return;
318       }
319 
320       InputStream message;
321       try {
322         while ((message = producer.next()) != null) {
323           try {
324             listener.onMessage(call.method.parseRequest(message));
325           } catch (Throwable t) {
326             GrpcUtil.closeQuietly(message);
327             throw t;
328           }
329           message.close();
330         }
331       } catch (Throwable t) {
332         GrpcUtil.closeQuietly(producer);
333         Throwables.throwIfUnchecked(t);
334         throw new RuntimeException(t);
335       }
336     }
337 
338     @Override
halfClosed()339     public void halfClosed() {
340       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) {
341         PerfMark.attachTag(call.tag);
342         if (call.cancelled) {
343           return;
344         }
345 
346         listener.onHalfClose();
347       }
348     }
349 
350     @Override
closed(Status status)351     public void closed(Status status) {
352       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) {
353         PerfMark.attachTag(call.tag);
354         closedInternal(status);
355       }
356     }
357 
closedInternal(Status status)358     private void closedInternal(Status status) {
359       Throwable cancelCause = null;
360       try {
361         if (status.isOk()) {
362           listener.onComplete();
363         } else {
364           call.cancelled = true;
365           listener.onCancel();
366           // The status will not have a cause in all failure scenarios but we want to make sure
367           // we always cancel the context with one to keep the context cancelled state consistent.
368           cancelCause = InternalStatus.asRuntimeException(
369               Status.CANCELLED.withDescription("RPC cancelled"), null, false);
370         }
371       } finally {
372         // Cancel context after delivering RPC closure notification to allow the application to
373         // clean up and update any state based on whether onComplete or onCancel was called.
374         context.cancel(cancelCause);
375       }
376     }
377 
378     @Override
onReady()379     public void onReady() {
380       try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) {
381         PerfMark.attachTag(call.tag);
382         if (call.cancelled) {
383           return;
384         }
385         listener.onReady();
386       }
387     }
388   }
389 }
390