1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.base.Preconditions; 22 import io.grpc.Attributes; 23 import io.grpc.Metadata; 24 import io.grpc.Status; 25 import io.grpc.internal.AbstractServerStream; 26 import io.grpc.internal.StatsTraceContext; 27 import io.grpc.internal.TransportTracer; 28 import io.grpc.internal.WritableBuffer; 29 import io.netty.buffer.ByteBuf; 30 import io.netty.channel.Channel; 31 import io.netty.channel.ChannelFuture; 32 import io.netty.channel.ChannelFutureListener; 33 import io.netty.channel.EventLoop; 34 import io.netty.handler.codec.http2.Http2Headers; 35 import io.netty.handler.codec.http2.Http2Stream; 36 import io.perfmark.Link; 37 import io.perfmark.PerfMark; 38 import io.perfmark.Tag; 39 import io.perfmark.TaskCloseable; 40 import java.util.logging.Level; 41 import java.util.logging.Logger; 42 43 /** 44 * Server stream for a Netty HTTP2 transport. Must only be called from the sending application 45 * thread. 46 */ 47 class NettyServerStream extends AbstractServerStream { 48 private static final Logger log = Logger.getLogger(NettyServerStream.class.getName()); 49 50 private final Sink sink = new Sink(); 51 private final TransportState state; 52 private final WriteQueue writeQueue; 53 private final Attributes attributes; 54 private final String authority; 55 private final TransportTracer transportTracer; 56 private final int streamId; 57 NettyServerStream( Channel channel, TransportState state, Attributes transportAttrs, String authority, StatsTraceContext statsTraceCtx, TransportTracer transportTracer)58 public NettyServerStream( 59 Channel channel, 60 TransportState state, 61 Attributes transportAttrs, 62 String authority, 63 StatsTraceContext statsTraceCtx, 64 TransportTracer transportTracer) { 65 super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx); 66 this.state = checkNotNull(state, "transportState"); 67 this.writeQueue = state.handler.getWriteQueue(); 68 this.attributes = checkNotNull(transportAttrs); 69 this.authority = authority; 70 this.transportTracer = checkNotNull(transportTracer, "transportTracer"); 71 // Read the id early to avoid reading transportState later. 72 this.streamId = transportState().id(); 73 } 74 75 @Override transportState()76 protected TransportState transportState() { 77 return state; 78 } 79 80 @Override abstractServerStreamSink()81 protected Sink abstractServerStreamSink() { 82 return sink; 83 } 84 85 @Override getAttributes()86 public Attributes getAttributes() { 87 return attributes; 88 } 89 90 @Override getAuthority()91 public String getAuthority() { 92 return authority; 93 } 94 95 private class Sink implements AbstractServerStream.Sink { 96 @Override writeHeaders(Metadata headers)97 public void writeHeaders(Metadata headers) { 98 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeHeaders")) { 99 writeQueue.enqueue( 100 SendResponseHeadersCommand.createHeaders( 101 transportState(), 102 Utils.convertServerHeaders(headers)), 103 true); 104 } 105 } 106 writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages)107 private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) { 108 Preconditions.checkArgument(numMessages >= 0); 109 ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf().touch(); 110 final int numBytes = bytebuf.readableBytes(); 111 // Add the bytes to outbound flow control. 112 onSendingBytes(numBytes); 113 writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, false), flush) 114 .addListener(new ChannelFutureListener() { 115 @Override 116 public void operationComplete(ChannelFuture future) throws Exception { 117 // Remove the bytes from outbound flow control, optionally notifying 118 // the client that they can send more bytes. 119 transportState().onSentBytes(numBytes); 120 if (future.isSuccess()) { 121 transportTracer.reportMessageSent(numMessages); 122 } 123 } 124 }); 125 } 126 127 @Override writeFrame(WritableBuffer frame, boolean flush, final int numMessages)128 public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { 129 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeFrame")) { 130 writeFrameInternal(frame, flush, numMessages); 131 } 132 } 133 134 @Override writeTrailers(Metadata trailers, boolean headersSent, Status status)135 public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { 136 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeTrailers")) { 137 Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); 138 writeQueue.enqueue( 139 SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), 140 true); 141 } 142 } 143 144 @Override cancel(Status status)145 public void cancel(Status status) { 146 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) { 147 writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); 148 } 149 } 150 } 151 152 /** This should only be called from the transport thread. */ 153 public static class TransportState extends AbstractServerStream.TransportState 154 implements StreamIdHolder { 155 private final Http2Stream http2Stream; 156 private final NettyServerHandler handler; 157 private final EventLoop eventLoop; 158 private final Tag tag; 159 TransportState( NettyServerHandler handler, EventLoop eventLoop, Http2Stream http2Stream, int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer, String methodName)160 public TransportState( 161 NettyServerHandler handler, 162 EventLoop eventLoop, 163 Http2Stream http2Stream, 164 int maxMessageSize, 165 StatsTraceContext statsTraceCtx, 166 TransportTracer transportTracer, 167 String methodName) { 168 super(maxMessageSize, statsTraceCtx, transportTracer); 169 this.http2Stream = checkNotNull(http2Stream, "http2Stream"); 170 this.handler = checkNotNull(handler, "handler"); 171 this.eventLoop = eventLoop; 172 this.tag = PerfMark.createTag(methodName, http2Stream.id()); 173 } 174 175 @Override runOnTransportThread(final Runnable r)176 public void runOnTransportThread(final Runnable r) { 177 if (eventLoop.inEventLoop()) { 178 r.run(); 179 } else { 180 final Link link = PerfMark.linkOut(); 181 eventLoop.execute(new Runnable() { 182 @Override 183 public void run() { 184 try (TaskCloseable ignore = 185 PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) { 186 PerfMark.attachTag(tag); 187 PerfMark.linkIn(link); 188 r.run(); 189 } 190 } 191 }); 192 } 193 } 194 195 @Override bytesRead(int processedBytes)196 public void bytesRead(int processedBytes) { 197 handler.returnProcessedBytes(http2Stream, processedBytes); 198 handler.getWriteQueue().scheduleFlush(); 199 } 200 201 @Override deframeFailed(Throwable cause)202 public void deframeFailed(Throwable cause) { 203 log.log(Level.WARNING, "Exception processing message", cause); 204 Status status = Status.fromThrowable(cause); 205 transportReportStatus(status); 206 handler.getWriteQueue().enqueue(new CancelServerStreamCommand(this, status), true); 207 } 208 inboundDataReceived(ByteBuf frame, boolean endOfStream)209 void inboundDataReceived(ByteBuf frame, boolean endOfStream) { 210 super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); 211 } 212 213 @Override id()214 public int id() { 215 return http2Stream.id(); 216 } 217 218 @Override tag()219 public Tag tag() { 220 return tag; 221 } 222 } 223 224 @Override streamId()225 public int streamId() { 226 return streamId; 227 } 228 } 229