• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.base.Preconditions;
22 import io.grpc.Attributes;
23 import io.grpc.Metadata;
24 import io.grpc.Status;
25 import io.grpc.internal.AbstractServerStream;
26 import io.grpc.internal.StatsTraceContext;
27 import io.grpc.internal.TransportTracer;
28 import io.grpc.internal.WritableBuffer;
29 import io.netty.buffer.ByteBuf;
30 import io.netty.channel.Channel;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.ChannelFutureListener;
33 import io.netty.channel.EventLoop;
34 import io.netty.handler.codec.http2.Http2Headers;
35 import io.netty.handler.codec.http2.Http2Stream;
36 import io.perfmark.Link;
37 import io.perfmark.PerfMark;
38 import io.perfmark.Tag;
39 import io.perfmark.TaskCloseable;
40 import java.util.logging.Level;
41 import java.util.logging.Logger;
42 
43 /**
44  * Server stream for a Netty HTTP2 transport. Must only be called from the sending application
45  * thread.
46  */
47 class NettyServerStream extends AbstractServerStream {
48   private static final Logger log = Logger.getLogger(NettyServerStream.class.getName());
49 
50   private final Sink sink = new Sink();
51   private final TransportState state;
52   private final WriteQueue writeQueue;
53   private final Attributes attributes;
54   private final String authority;
55   private final TransportTracer transportTracer;
56   private final int streamId;
57 
NettyServerStream( Channel channel, TransportState state, Attributes transportAttrs, String authority, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)58   public NettyServerStream(
59       Channel channel,
60       TransportState state,
61       Attributes transportAttrs,
62       String authority,
63       StatsTraceContext statsTraceCtx,
64       TransportTracer transportTracer) {
65     super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx);
66     this.state = checkNotNull(state, "transportState");
67     this.writeQueue = state.handler.getWriteQueue();
68     this.attributes = checkNotNull(transportAttrs);
69     this.authority = authority;
70     this.transportTracer = checkNotNull(transportTracer, "transportTracer");
71     // Read the id early to avoid reading transportState later.
72     this.streamId = transportState().id();
73   }
74 
75   @Override
transportState()76   protected TransportState transportState() {
77     return state;
78   }
79 
80   @Override
abstractServerStreamSink()81   protected Sink abstractServerStreamSink() {
82     return sink;
83   }
84 
85   @Override
getAttributes()86   public Attributes getAttributes() {
87     return attributes;
88   }
89 
90   @Override
getAuthority()91   public String getAuthority() {
92     return authority;
93   }
94 
95   private class Sink implements AbstractServerStream.Sink {
96     @Override
writeHeaders(Metadata headers)97     public void writeHeaders(Metadata headers) {
98       try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeHeaders")) {
99         writeQueue.enqueue(
100             SendResponseHeadersCommand.createHeaders(
101                 transportState(),
102                 Utils.convertServerHeaders(headers)),
103             true);
104       }
105     }
106 
writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages)107     private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) {
108       Preconditions.checkArgument(numMessages >= 0);
109       ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf().touch();
110       final int numBytes = bytebuf.readableBytes();
111       // Add the bytes to outbound flow control.
112       onSendingBytes(numBytes);
113       writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, false), flush)
114           .addListener(new ChannelFutureListener() {
115             @Override
116             public void operationComplete(ChannelFuture future) throws Exception {
117               // Remove the bytes from outbound flow control, optionally notifying
118               // the client that they can send more bytes.
119               transportState().onSentBytes(numBytes);
120               if (future.isSuccess()) {
121                 transportTracer.reportMessageSent(numMessages);
122               }
123             }
124           });
125     }
126 
127     @Override
writeFrame(WritableBuffer frame, boolean flush, final int numMessages)128     public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
129       try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeFrame")) {
130         writeFrameInternal(frame, flush, numMessages);
131       }
132     }
133 
134     @Override
writeTrailers(Metadata trailers, boolean headersSent, Status status)135     public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
136       try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeTrailers")) {
137         Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
138         writeQueue.enqueue(
139             SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
140             true);
141       }
142     }
143 
144     @Override
cancel(Status status)145     public void cancel(Status status) {
146       try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) {
147         writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
148       }
149     }
150   }
151 
152   /** This should only be called from the transport thread. */
153   public static class TransportState extends AbstractServerStream.TransportState
154       implements StreamIdHolder {
155     private final Http2Stream http2Stream;
156     private final NettyServerHandler handler;
157     private final EventLoop eventLoop;
158     private final Tag tag;
159 
TransportState( NettyServerHandler handler, EventLoop eventLoop, Http2Stream http2Stream, int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, String methodName)160     public TransportState(
161         NettyServerHandler handler,
162         EventLoop eventLoop,
163         Http2Stream http2Stream,
164         int maxMessageSize,
165         StatsTraceContext statsTraceCtx,
166         TransportTracer transportTracer,
167         String methodName) {
168       super(maxMessageSize, statsTraceCtx, transportTracer);
169       this.http2Stream = checkNotNull(http2Stream, "http2Stream");
170       this.handler = checkNotNull(handler, "handler");
171       this.eventLoop = eventLoop;
172       this.tag = PerfMark.createTag(methodName, http2Stream.id());
173     }
174 
175     @Override
runOnTransportThread(final Runnable r)176     public void runOnTransportThread(final Runnable r) {
177       if (eventLoop.inEventLoop()) {
178         r.run();
179       } else {
180         final Link link = PerfMark.linkOut();
181         eventLoop.execute(new Runnable() {
182           @Override
183           public void run() {
184             try (TaskCloseable ignore =
185                      PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) {
186               PerfMark.attachTag(tag);
187               PerfMark.linkIn(link);
188               r.run();
189             }
190           }
191         });
192       }
193     }
194 
195     @Override
bytesRead(int processedBytes)196     public void bytesRead(int processedBytes) {
197       handler.returnProcessedBytes(http2Stream, processedBytes);
198       handler.getWriteQueue().scheduleFlush();
199     }
200 
201     @Override
deframeFailed(Throwable cause)202     public void deframeFailed(Throwable cause) {
203       log.log(Level.WARNING, "Exception processing message", cause);
204       Status status = Status.fromThrowable(cause);
205       transportReportStatus(status);
206       handler.getWriteQueue().enqueue(new CancelServerStreamCommand(this, status), true);
207     }
208 
inboundDataReceived(ByteBuf frame, boolean endOfStream)209     void inboundDataReceived(ByteBuf frame, boolean endOfStream) {
210       super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
211     }
212 
213     @Override
id()214     public int id() {
215       return http2Stream.id();
216     }
217 
218     @Override
tag()219     public Tag tag() {
220       return tag;
221     }
222   }
223 
224   @Override
streamId()225   public int streamId() {
226     return streamId;
227   }
228 }
229