• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <assert.h>
25 #include <stdbool.h>
26 
27 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
28 #include "src/core/ext/transport/chttp2/transport/frame.h"
29 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
30 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
31 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
32 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
33 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
34 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
35 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
36 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
38 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
39 #include "src/core/lib/channel/channelz.h"
40 #include "src/core/lib/compression/stream_compression.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/combiner.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/timer.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/lib/transport/transport_impl.h"
47 
48 namespace grpc_core {
49 class ContextList;
50 }
51 
52 /* streams are kept in various linked lists depending on what things need to
53    happen to them... this enum labels each list */
54 typedef enum {
55   /* If a stream is in the following two lists, an explicit ref is associated
56      with the stream */
57   GRPC_CHTTP2_LIST_WRITABLE,
58   GRPC_CHTTP2_LIST_WRITING,
59   /* No additional ref is taken for the following refs. Make sure to remove the
60      stream from these lists when the stream is removed. */
61   GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
62   GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
63   /** streams that are waiting to start because there are too many concurrent
64       streams on the connection */
65   GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
66   STREAM_LIST_COUNT /* must be last */
67 } grpc_chttp2_stream_list_id;
68 
69 typedef enum {
70   GRPC_CHTTP2_WRITE_STATE_IDLE,
71   GRPC_CHTTP2_WRITE_STATE_WRITING,
72   GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
73 } grpc_chttp2_write_state;
74 
75 typedef enum {
76   GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
77   GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
78 } grpc_chttp2_optimization_target;
79 
80 typedef enum {
81   GRPC_CHTTP2_PCL_INITIATE = 0,
82   GRPC_CHTTP2_PCL_NEXT,
83   GRPC_CHTTP2_PCL_INFLIGHT,
84   GRPC_CHTTP2_PCL_COUNT /* must be last */
85 } grpc_chttp2_ping_closure_list;
86 
87 typedef enum {
88   GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
89   GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
90   GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
91   GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
92   GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
93   GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
94   GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
95   GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
96   GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
97   GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
98   GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
99   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
100   GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
101   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
102   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
103   GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
104   GRPC_CHTTP2_INITIATE_WRITE_BDP_PING,
105   GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
106   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
107   GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
108   GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
109 } grpc_chttp2_initiate_write_reason;
110 
111 const char* grpc_chttp2_initiate_write_reason_string(
112     grpc_chttp2_initiate_write_reason reason);
113 
114 struct grpc_chttp2_ping_queue {
115   grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
116   uint64_t inflight_id = 0;
117 };
118 struct grpc_chttp2_repeated_ping_policy {
119   int max_pings_without_data;
120   int max_ping_strikes;
121   grpc_millis min_recv_ping_interval_without_data;
122 };
123 struct grpc_chttp2_repeated_ping_state {
124   grpc_millis last_ping_sent_time;
125   int pings_before_data_required;
126   grpc_timer delayed_ping_timer;
127   bool is_delayed_ping_timer_set;
128 };
129 struct grpc_chttp2_server_ping_recv_state {
130   grpc_millis last_ping_recv_time;
131   int ping_strikes;
132 };
133 /* deframer state for the overall http2 stream of bytes */
134 typedef enum {
135   /* prefix: one entry per http2 connection prefix byte */
136   GRPC_DTS_CLIENT_PREFIX_0 = 0,
137   GRPC_DTS_CLIENT_PREFIX_1,
138   GRPC_DTS_CLIENT_PREFIX_2,
139   GRPC_DTS_CLIENT_PREFIX_3,
140   GRPC_DTS_CLIENT_PREFIX_4,
141   GRPC_DTS_CLIENT_PREFIX_5,
142   GRPC_DTS_CLIENT_PREFIX_6,
143   GRPC_DTS_CLIENT_PREFIX_7,
144   GRPC_DTS_CLIENT_PREFIX_8,
145   GRPC_DTS_CLIENT_PREFIX_9,
146   GRPC_DTS_CLIENT_PREFIX_10,
147   GRPC_DTS_CLIENT_PREFIX_11,
148   GRPC_DTS_CLIENT_PREFIX_12,
149   GRPC_DTS_CLIENT_PREFIX_13,
150   GRPC_DTS_CLIENT_PREFIX_14,
151   GRPC_DTS_CLIENT_PREFIX_15,
152   GRPC_DTS_CLIENT_PREFIX_16,
153   GRPC_DTS_CLIENT_PREFIX_17,
154   GRPC_DTS_CLIENT_PREFIX_18,
155   GRPC_DTS_CLIENT_PREFIX_19,
156   GRPC_DTS_CLIENT_PREFIX_20,
157   GRPC_DTS_CLIENT_PREFIX_21,
158   GRPC_DTS_CLIENT_PREFIX_22,
159   GRPC_DTS_CLIENT_PREFIX_23,
160   /* frame header byte 0... */
161   /* must follow from the prefix states */
162   GRPC_DTS_FH_0,
163   GRPC_DTS_FH_1,
164   GRPC_DTS_FH_2,
165   GRPC_DTS_FH_3,
166   GRPC_DTS_FH_4,
167   GRPC_DTS_FH_5,
168   GRPC_DTS_FH_6,
169   GRPC_DTS_FH_7,
170   /* ... frame header byte 8 */
171   GRPC_DTS_FH_8,
172   /* inside a http2 frame */
173   GRPC_DTS_FRAME
174 } grpc_chttp2_deframe_transport_state;
175 
176 struct grpc_chttp2_stream_list {
177   grpc_chttp2_stream* head;
178   grpc_chttp2_stream* tail;
179 };
180 struct grpc_chttp2_stream_link {
181   grpc_chttp2_stream* next;
182   grpc_chttp2_stream* prev;
183 };
184 /* We keep several sets of connection wide parameters */
185 typedef enum {
186   /* The settings our peer has asked for (and we have acked) */
187   GRPC_PEER_SETTINGS = 0,
188   /* The settings we'd like to have */
189   GRPC_LOCAL_SETTINGS,
190   /* The settings we've published to our peer */
191   GRPC_SENT_SETTINGS,
192   /* The settings the peer has acked */
193   GRPC_ACKED_SETTINGS,
194   GRPC_NUM_SETTING_SETS
195 } grpc_chttp2_setting_set;
196 
197 typedef enum {
198   GRPC_CHTTP2_NO_GOAWAY_SEND,
199   GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
200   GRPC_CHTTP2_GOAWAY_SENT,
201 } grpc_chttp2_sent_goaway_state;
202 
203 typedef struct grpc_chttp2_write_cb {
204   int64_t call_at_byte;
205   grpc_closure* closure;
206   struct grpc_chttp2_write_cb* next;
207 } grpc_chttp2_write_cb;
208 
209 namespace grpc_core {
210 
211 class Chttp2IncomingByteStream : public ByteStream {
212  public:
213   Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
214                            grpc_chttp2_stream* stream, uint32_t frame_size,
215                            uint32_t flags);
216 
217   void Orphan() override;
218 
219   bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
220   grpc_error_handle Pull(grpc_slice* slice) override;
221   void Shutdown(grpc_error_handle error) override;
222 
223   // TODO(roth): When I converted this class to C++, I wanted to make it
224   // inherit from RefCounted or InternallyRefCounted instead of continuing
225   // to use its own custom ref-counting code.  However, that would require
226   // using multiple inheritance, which sucks in general.  And to make matters
227   // worse, it causes problems with our New<> and Delete<> wrappers.
228   // Specifically, unless RefCounted is first in the list of parent classes,
229   // it will see a different value of the address of the object than the one
230   // we actually allocated, in which case gpr_free() will be called on a
231   // different address than the one we got from gpr_malloc(), thus causing a
232   // crash.  Given the fragility of depending on that, as well as a desire to
233   // avoid multiple inheritance in general, I've decided to leave this
234   // alone for now.  We can revisit this once we're able to link against
235   // libc++, at which point we can eliminate New<> and Delete<> and
236   // switch to std::shared_ptr<>.
Ref()237   void Ref() { refs_.Ref(); }
Unref()238   void Unref() {
239     if (GPR_UNLIKELY(refs_.Unref())) {
240       delete this;
241     }
242   }
243 
244   void PublishError(grpc_error_handle error);
245 
246   grpc_error_handle Push(const grpc_slice& slice, grpc_slice* slice_out);
247 
248   grpc_error_handle Finished(grpc_error_handle error, bool reset_on_error);
249 
remaining_bytes()250   uint32_t remaining_bytes() const { return remaining_bytes_; }
251 
252  private:
253   static void NextLocked(void* arg, grpc_error_handle error_ignored);
254   static void OrphanLocked(void* arg, grpc_error_handle error_ignored);
255 
256   void MaybeCreateStreamDecompressionCtx();
257 
258   grpc_chttp2_transport* transport_;  // Immutable.
259   grpc_chttp2_stream* stream_;        // Immutable.
260 
261   grpc_core::RefCount refs_;
262 
263   /* Accessed only by transport thread when stream->pending_byte_stream == false
264    * Accessed only by application thread when stream->pending_byte_stream ==
265    * true */
266   uint32_t remaining_bytes_;
267 
268   /* Accessed only by transport thread when stream->pending_byte_stream == false
269    * Accessed only by application thread when stream->pending_byte_stream ==
270    * true */
271   struct {
272     grpc_closure closure;
273     size_t max_size_hint;
274     grpc_closure* on_complete;
275   } next_action_;
276   grpc_closure destroy_action_;
277 };
278 
279 }  // namespace grpc_core
280 
281 typedef enum {
282   GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
283   GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
284   GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
285   GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
286 } grpc_chttp2_keepalive_state;
287 
288 struct grpc_chttp2_transport {
289   grpc_chttp2_transport(const grpc_channel_args* channel_args,
290                         grpc_endpoint* ep, bool is_client,
291                         grpc_resource_user* resource_user);
292   ~grpc_chttp2_transport();
293 
294   grpc_transport base; /* must be first */
295   grpc_core::RefCount refs;
296   grpc_endpoint* ep;
297   std::string peer_string;
298 
299   grpc_resource_user* resource_user;
300 
301   grpc_core::Combiner* combiner;
302 
303   grpc_closure* notify_on_receive_settings = nullptr;
304   grpc_closure* notify_on_close = nullptr;
305 
306   /** write execution state of the transport */
307   grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
308 
309   /** is the transport destroying itself? */
310   uint8_t destroying = false;
311   /** has the upper layer closed the transport? */
312   grpc_error_handle closed_with_error = GRPC_ERROR_NONE;
313 
314   /** is there a read request to the endpoint outstanding? */
315   uint8_t endpoint_reading = 1;
316 
317   /** various lists of streams */
318   grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
319 
320   /** maps stream id to grpc_chttp2_stream objects */
321   grpc_chttp2_stream_map stream_map;
322 
323   grpc_closure write_action_begin_locked;
324   grpc_closure write_action;
325   grpc_closure write_action_end_locked;
326 
327   grpc_closure read_action_locked;
328 
329   /** incoming read bytes */
330   grpc_slice_buffer read_buffer;
331 
332   /** address to place a newly accepted stream - set and unset by
333       grpc_chttp2_parsing_accept_stream; used by init_stream to
334       publish the accepted server stream */
335   grpc_chttp2_stream** accepting_stream = nullptr;
336 
337   /* accept stream callback */
338   void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
339                            const void* server_data);
340   void* accept_stream_cb_user_data;
341 
342   /** connectivity tracking */
343   grpc_core::ConnectivityStateTracker state_tracker;
344 
345   /** data to write now */
346   grpc_slice_buffer outbuf;
347   /** hpack encoding */
348   grpc_chttp2_hpack_compressor hpack_compressor;
349   /** is this a client? */
350   bool is_client;
351 
352   /** data to write next write */
353   grpc_slice_buffer qbuf;
354 
355   /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
356    */
357   uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
358 
359   /** Set to a grpc_error object if a goaway frame is received. By default, set
360    * to GRPC_ERROR_NONE */
361   grpc_error_handle goaway_error = GRPC_ERROR_NONE;
362 
363   grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
364 
365   /** are the local settings dirty and need to be sent? */
366   bool dirtied_local_settings = true;
367   /** have local settings been sent? */
368   bool sent_local_settings = false;
369   /** bitmask of setting indexes to send out
370       Hack: it's common for implementations to assume 65536 bytes initial send
371       window -- this should by rights be 0 */
372   uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
373   /** settings values */
374   uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
375 
376   /** what is the next stream id to be allocated by this peer?
377       copied to next_stream_id in parsing when parsing commences */
378   uint32_t next_stream_id = 0;
379 
380   /** last new stream id */
381   uint32_t last_new_stream_id = 0;
382 
383   /** ping queues for various ping insertion points */
384   grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
385   grpc_chttp2_repeated_ping_policy ping_policy;
386   grpc_chttp2_repeated_ping_state ping_state;
387   uint64_t ping_ctr = 0; /* unique id for pings */
388   grpc_closure retry_initiate_ping_locked;
389 
390   /** ping acks */
391   size_t ping_ack_count = 0;
392   size_t ping_ack_capacity = 0;
393   uint64_t* ping_acks = nullptr;
394   grpc_chttp2_server_ping_recv_state ping_recv_state;
395 
396   /** parser for headers */
397   grpc_chttp2_hpack_parser hpack_parser;
398   /** simple one shot parsers */
399   union {
400     grpc_chttp2_window_update_parser window_update;
401     grpc_chttp2_settings_parser settings;
402     grpc_chttp2_ping_parser ping;
403     grpc_chttp2_rst_stream_parser rst_stream;
404   } simple;
405   /** parser for goaway frames */
406   grpc_chttp2_goaway_parser goaway_parser;
407 
408   grpc_core::PolymorphicManualConstructor<
409       grpc_core::chttp2::TransportFlowControlBase,
410       grpc_core::chttp2::TransportFlowControl,
411       grpc_core::chttp2::TransportFlowControlDisabled>
412       flow_control;
413   /** initial window change. This is tracked as we parse settings frames from
414    * the remote peer. If there is a positive delta, then we will make all
415    * streams readable since they may have become unstalled */
416   int64_t initial_window_update = 0;
417 
418   /* deframing */
419   grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
420   uint8_t incoming_frame_type = 0;
421   uint8_t incoming_frame_flags = 0;
422   uint8_t header_eof = 0;
423   bool is_first_frame = true;
424   uint32_t expect_continuation_stream_id = 0;
425   uint32_t incoming_frame_size = 0;
426   uint32_t incoming_stream_id = 0;
427 
428   /* active parser */
429   void* parser_data = nullptr;
430   grpc_chttp2_stream* incoming_stream = nullptr;
431   grpc_error_handle (*parser)(void* parser_user_data, grpc_chttp2_transport* t,
432                               grpc_chttp2_stream* s, const grpc_slice& slice,
433                               int is_last);
434 
435   grpc_chttp2_write_cb* write_cb_pool = nullptr;
436 
437   /* bdp estimator */
438   bool bdp_ping_blocked =
439       false; /* Is the BDP blocked due to not receiving any data? */
440   grpc_closure next_bdp_ping_timer_expired_locked;
441   grpc_closure start_bdp_ping_locked;
442   grpc_closure finish_bdp_ping_locked;
443 
444   /* if non-NULL, close the transport with this error when writes are finished
445    */
446   grpc_error_handle close_transport_on_writes_finished = GRPC_ERROR_NONE;
447 
448   /* a list of closures to run after writes are finished */
449   grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
450 
451   /* buffer pool state */
452   /** have we scheduled a benign cleanup? */
453   bool benign_reclaimer_registered = false;
454   /** have we scheduled a destructive cleanup? */
455   bool destructive_reclaimer_registered = false;
456   /** benign cleanup closure */
457   grpc_closure benign_reclaimer_locked;
458   /** destructive cleanup closure */
459   grpc_closure destructive_reclaimer_locked;
460 
461   /* next bdp ping timer */
462   bool have_next_bdp_ping_timer = false;
463   /** If start_bdp_ping_locked has been called */
464   bool bdp_ping_started = false;
465   grpc_timer next_bdp_ping_timer;
466 
467   /* keep-alive ping support */
468   /** Closure to initialize a keepalive ping */
469   grpc_closure init_keepalive_ping_locked;
470   /** Closure to run when the keepalive ping is sent */
471   grpc_closure start_keepalive_ping_locked;
472   /** Cousure to run when the keepalive ping ack is received */
473   grpc_closure finish_keepalive_ping_locked;
474   /** Closrue to run when the keepalive ping timeouts */
475   grpc_closure keepalive_watchdog_fired_locked;
476   /** timer to initiate ping events */
477   grpc_timer keepalive_ping_timer;
478   /** watchdog to kill the transport when waiting for the keepalive ping */
479   grpc_timer keepalive_watchdog_timer;
480   /** time duration in between pings */
481   grpc_millis keepalive_time;
482   /** grace period for a ping to complete before watchdog kicks in */
483   grpc_millis keepalive_timeout;
484   /** if keepalive pings are allowed when there's no outstanding streams */
485   bool keepalive_permit_without_calls = false;
486   /** If start_keepalive_ping_locked has been called */
487   bool keepalive_ping_started = false;
488   /** keep-alive state machine state */
489   grpc_chttp2_keepalive_state keepalive_state;
490   grpc_core::ContextList* cl = nullptr;
491   grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
492   uint32_t num_messages_in_next_write = 0;
493   /** The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
494    * RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
495    * DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
496    * only continue reading when we are able to write to the socket again,
497    * thereby reducing the number of induced frames. */
498   uint32_t num_pending_induced_frames = 0;
499   bool reading_paused_on_pending_induced_frames = false;
500 };
501 
502 typedef enum {
503   GRPC_METADATA_NOT_PUBLISHED,
504   GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
505   GRPC_METADATA_PUBLISHED_FROM_WIRE,
506   GRPC_METADATA_PUBLISHED_AT_CLOSE
507 } grpc_published_metadata_method;
508 
509 struct grpc_chttp2_stream {
510   grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
511                      const void* server_data, grpc_core::Arena* arena);
512   ~grpc_chttp2_stream();
513 
514   void* context;
515   grpc_chttp2_transport* t;
516   grpc_stream_refcount* refcount;
517   // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
518   // before initializing the rest of the stream, to avoid cache misses. This
519   // field MUST be right after `t` and `refcount`.
520   struct Reffer {
521     explicit Reffer(grpc_chttp2_stream* s);
522   } reffer;
523 
524   grpc_closure destroy_stream;
525   grpc_closure* destroy_stream_arg;
526 
527   grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
528   uint8_t included[STREAM_LIST_COUNT] = {};
529 
530   /** HTTP2 stream id for this stream, or zero if one has not been assigned */
531   uint32_t id = 0;
532 
533   /** things the upper layers would like to send */
534   grpc_metadata_batch* send_initial_metadata = nullptr;
535   grpc_closure* send_initial_metadata_finished = nullptr;
536   grpc_metadata_batch* send_trailing_metadata = nullptr;
537   // TODO(yashykt): Find a better name for the below field and others in this
538   //                struct to betteer distinguish inputs, return values, and
539   //                internal state.
540   // sent_trailing_metadata_op allows the transport to fill in to the upper
541   // layer whether this stream was able to send its trailing metadata (used for
542   // detecting cancellation on the server-side)..
543   bool* sent_trailing_metadata_op = nullptr;
544   grpc_closure* send_trailing_metadata_finished = nullptr;
545 
546   grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
547   uint32_t fetched_send_message_length = 0;
548   grpc_slice fetching_slice = grpc_empty_slice();
549   int64_t next_message_end_offset;
550   int64_t flow_controlled_bytes_written = 0;
551   int64_t flow_controlled_bytes_flowed = 0;
552   grpc_closure complete_fetch_locked;
553   grpc_closure* fetching_send_message_finished = nullptr;
554 
555   grpc_metadata_batch* recv_initial_metadata;
556   grpc_closure* recv_initial_metadata_ready = nullptr;
557   bool* trailing_metadata_available = nullptr;
558   grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
559   grpc_closure* recv_message_ready = nullptr;
560   grpc_metadata_batch* recv_trailing_metadata;
561   grpc_closure* recv_trailing_metadata_finished = nullptr;
562 
563   grpc_transport_stream_stats* collecting_stats = nullptr;
564   grpc_transport_stream_stats stats = grpc_transport_stream_stats();
565 
566   /** Is this stream closed for writing. */
567   bool write_closed = false;
568   /** Is this stream reading half-closed. */
569   bool read_closed = false;
570   /** Are all published incoming byte streams closed. */
571   bool all_incoming_byte_streams_finished = false;
572   /** Has this stream seen an error.
573       If true, then pending incoming frames can be thrown away. */
574   bool seen_error = false;
575   /** Are we buffering writes on this stream? If yes, we won't become writable
576       until there's enough queued up in the flow_controlled_buffer */
577   bool write_buffering = false;
578 
579   /* have we sent or received the EOS bit? */
580   bool eos_received = false;
581   bool eos_sent = false;
582 
583   /** the error that resulted in this stream being read-closed */
584   grpc_error_handle read_closed_error = GRPC_ERROR_NONE;
585   /** the error that resulted in this stream being write-closed */
586   grpc_error_handle write_closed_error = GRPC_ERROR_NONE;
587 
588   grpc_published_metadata_method published_metadata[2] = {};
589   bool final_metadata_requested = false;
590 
591   grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
592 
593   grpc_slice_buffer frame_storage; /* protected by t combiner */
594 
595   grpc_closure* on_next = nullptr;  /* protected by t combiner */
596   bool pending_byte_stream = false; /* protected by t combiner */
597   // cached length of buffer to be used by the transport thread in cases where
598   // stream->pending_byte_stream == true. The value is saved before
599   // application threads are allowed to modify
600   // unprocessed_incoming_frames_buffer
601   size_t unprocessed_incoming_frames_buffer_cached_length = 0;
602   /* Accessed only by transport thread when stream->pending_byte_stream == false
603    * Accessed only by application thread when stream->pending_byte_stream ==
604    * true */
605   grpc_slice_buffer unprocessed_incoming_frames_buffer;
606   grpc_closure reset_byte_stream;
607   grpc_error_handle byte_stream_error =
608       GRPC_ERROR_NONE;              /* protected by t combiner */
609   bool received_last_frame = false; /* protected by t combiner */
610 
611   grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
612 
613   /** saw some stream level error */
614   grpc_error_handle forced_close_error = GRPC_ERROR_NONE;
615   /** how many header frames have we received? */
616   uint8_t header_frames_received = 0;
617   /** parsing state for data frames */
618   /* Accessed only by transport thread when stream->pending_byte_stream == false
619    * Accessed only by application thread when stream->pending_byte_stream ==
620    * true */
621   grpc_chttp2_data_parser data_parser;
622   /** number of bytes received - reset at end of parse thread execution */
623   int64_t received_bytes = 0;
624 
625   bool sent_initial_metadata = false;
626   bool sent_trailing_metadata = false;
627 
628   grpc_core::PolymorphicManualConstructor<
629       grpc_core::chttp2::StreamFlowControlBase,
630       grpc_core::chttp2::StreamFlowControl,
631       grpc_core::chttp2::StreamFlowControlDisabled>
632       flow_control;
633 
634   grpc_slice_buffer flow_controlled_buffer;
635 
636   grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
637   grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
638   grpc_chttp2_write_cb* finish_after_write = nullptr;
639   size_t sending_bytes = 0;
640 
641   /* Stream compression method to be used. */
642   grpc_stream_compression_method stream_compression_method =
643       GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
644   /* Stream decompression method to be used. */
645   grpc_stream_compression_method stream_decompression_method =
646       GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS;
647 
648   /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
649    */
650   bool unprocessed_incoming_frames_decompressed = false;
651   /** Whether the bytes needs to be traced using Fathom */
652   bool traced = false;
653   /** gRPC header bytes that are already decompressed */
654   size_t decompressed_header_bytes = 0;
655   /** Byte counter for number of bytes written */
656   size_t byte_counter = 0;
657 
658   /** Amount of uncompressed bytes sent out when compressed_data_buffer is
659    * emptied */
660   size_t uncompressed_data_size;
661   /** Stream compression compress context */
662   grpc_stream_compression_context* stream_compression_ctx;
663   /** Buffer storing data that is compressed but not sent */
664   grpc_slice_buffer compressed_data_buffer;
665 
666   /** Stream compression decompress context */
667   grpc_stream_compression_context* stream_decompression_ctx;
668   /** Temporary buffer storing decompressed data.
669    * Initialized, used, and destroyed only when stream uses (non-identity)
670    * compression.
671    */
672   grpc_slice_buffer decompressed_data_buffer;
673 };
674 
675 /** Transport writing call flow:
676     grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
677     go out on the wire.
678     If no other write has been started, a task is enqueued onto our workqueue.
679     When that task executes, it obtains the global lock, and gathers the data
680     to write.
681     The global lock is dropped and we do the syscall to write.
682     After writing, a follow-up check is made to see if another round of writing
683     should be performed.
684 
685     The actual call chain is documented in the implementation of this function.
686     */
687 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
688                                 grpc_chttp2_initiate_write_reason reason);
689 
690 struct grpc_chttp2_begin_write_result {
691   /** are we writing? */
692   bool writing;
693   /** if writing: was it a complete flush (false) or a partial flush (true) */
694   bool partial;
695   /** did we queue any completions as part of beginning the write */
696   bool early_results_scheduled;
697 };
698 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
699     grpc_chttp2_transport* t);
700 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error);
701 
702 /** Process one slice of incoming data; return 1 if the connection is still
703     viable after reading, or 0 if the connection should be torn down */
704 grpc_error_handle grpc_chttp2_perform_read(grpc_chttp2_transport* t,
705                                            const grpc_slice& slice);
706 
707 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
708                                           grpc_chttp2_stream* s);
709 /** Get a writable stream
710     returns non-zero if there was a stream available */
711 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
712                                           grpc_chttp2_stream** s);
713 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
714                                              grpc_chttp2_stream* s);
715 
716 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
717                                          grpc_chttp2_stream* s);
718 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
719 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
720                                          grpc_chttp2_stream** s);
721 
722 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
723                                          grpc_chttp2_stream* s);
724 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
725                                          grpc_chttp2_stream** s);
726 
727 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
728                                                   grpc_chttp2_stream* s);
729 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
730                                                   grpc_chttp2_stream** s);
731 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
732                                                      grpc_chttp2_stream* s);
733 
734 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
735                                                grpc_chttp2_stream* s);
736 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
737                                                grpc_chttp2_stream** s);
738 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
739                                                   grpc_chttp2_stream* s);
740 
741 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
742                                             grpc_chttp2_stream* s);
743 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
744                                             grpc_chttp2_stream** s);
745 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
746                                                grpc_chttp2_stream* s);
747 
748 /********* Flow Control ***************/
749 
750 // Takes in a flow control action and performs all the needed operations.
751 void grpc_chttp2_act_on_flowctl_action(
752     const grpc_core::chttp2::FlowControlAction& action,
753     grpc_chttp2_transport* t, grpc_chttp2_stream* s);
754 
755 /********* End of Flow Control ***************/
756 
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)757 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
758     grpc_chttp2_transport* t, uint32_t id) {
759   return static_cast<grpc_chttp2_stream*>(
760       grpc_chttp2_stream_map_find(&t->stream_map, id));
761 }
762 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
763                                                       uint32_t id);
764 
765 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
766                                      uint32_t goaway_error,
767                                      uint32_t last_stream_id,
768                                      const grpc_slice& goaway_text);
769 
770 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
771 
772 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
773                                        grpc_chttp2_stream* s,
774                                        grpc_closure** pclosure,
775                                        grpc_error_handle error,
776                                        const char* desc);
777 
778 #define GRPC_HEADER_SIZE_IN_BYTES 5
779 #define MAX_SIZE_T (~(size_t)0)
780 
781 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
782 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
783   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
784 
785 // extern grpc_core::TraceFlag grpc_http_trace;
786 // extern grpc_core::TraceFlag grpc_flowctl_trace;
787 
788 #define GRPC_CHTTP2_IF_TRACING(stmt)                \
789   do {                                              \
790     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
791       (stmt);                                       \
792     }                                               \
793   } while (0)
794 
795 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
796                              grpc_chttp2_stream* stream,
797                              grpc_error_handle error);
798 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
799                                     grpc_chttp2_stream* s, int close_reads,
800                                     int close_writes, grpc_error_handle error);
801 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
802 
803 #ifndef NDEBUG
804 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
805   grpc_chttp2_stream_ref(stream, reason)
806 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
807   grpc_chttp2_stream_unref(stream, reason)
808 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
809 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
810 #else
811 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
812 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
813   grpc_chttp2_stream_unref(stream)
814 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
815 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
816 #endif
817 
818 #ifndef NDEBUG
819 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
820   grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
821 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
822   grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)823 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
824                                         const char* reason, const char* file,
825                                         int line) {
826   if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
827     delete t;
828   }
829 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)830 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
831                                       const char* reason, const char* file,
832                                       int line) {
833   t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
834 }
835 #else
836 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
837 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t)838 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
839   if (t->refs.Unref()) {
840     delete t;
841   }
842 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t)843 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
844   t->refs.Ref();
845 }
846 #endif
847 
848 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
849 
850 /** Add a new ping strike to ping_recv_state.ping_strikes. If
851     ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
852     with error code ENHANCE_YOUR_CALM and additional debug data resembling
853     "too_many_pings" followed by immediately closing the connection. */
854 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
855 
856 /** Resets ping clock. Should be called when flushing window updates,
857  * initial/trailing metadata or data frames. For a server, it resets the number
858  * of ping strikes and the last_ping_recv_time. For a ping sender, it resets
859  * pings_before_data_required. */
860 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
861 
862 /** add a ref to the stream and add it to the writable list;
863     ref will be dropped in writing.c */
864 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
865                                       grpc_chttp2_stream* s);
866 
867 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
868                                grpc_error_handle due_to_error);
869 
870 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
871                                                       grpc_chttp2_stream* s);
872 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
873                                              grpc_chttp2_stream* s);
874 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
875                                                        grpc_chttp2_stream* s);
876 
877 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
878                                      grpc_chttp2_stream* s,
879                                      grpc_error_handle error);
880 
881 /** Set the default keepalive configurations, must only be called at
882     initialization */
883 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
884                                                bool is_client);
885 
886 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error_handle error);
887 
888 void schedule_bdp_ping_locked(grpc_chttp2_transport* t);
889 
890 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
891