• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <assert.h>
25 #include <stdbool.h>
26 
27 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
28 #include "src/core/ext/transport/chttp2/transport/frame.h"
29 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
30 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
31 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
32 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
33 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
34 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
35 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
36 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
38 #include "src/core/ext/transport/chttp2/transport/stream_map.h"
39 #include "src/core/lib/channel/channelz.h"
40 #include "src/core/lib/compression/stream_compression.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/combiner.h"
43 #include "src/core/lib/iomgr/endpoint.h"
44 #include "src/core/lib/iomgr/timer.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/lib/transport/transport_impl.h"
47 
48 namespace grpc_core {
49 class ContextList;
50 }
51 
52 /* streams are kept in various linked lists depending on what things need to
53    happen to them... this enum labels each list */
54 typedef enum {
55   GRPC_CHTTP2_LIST_WRITABLE,
56   GRPC_CHTTP2_LIST_WRITING,
57   GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
58   GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
59   /** streams that are waiting to start because there are too many concurrent
60       streams on the connection */
61   GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
62   STREAM_LIST_COUNT /* must be last */
63 } grpc_chttp2_stream_list_id;
64 
65 typedef enum {
66   GRPC_CHTTP2_WRITE_STATE_IDLE,
67   GRPC_CHTTP2_WRITE_STATE_WRITING,
68   GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
69 } grpc_chttp2_write_state;
70 
71 typedef enum {
72   GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
73   GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
74 } grpc_chttp2_optimization_target;
75 
76 typedef enum {
77   GRPC_CHTTP2_PCL_INITIATE = 0,
78   GRPC_CHTTP2_PCL_NEXT,
79   GRPC_CHTTP2_PCL_INFLIGHT,
80   GRPC_CHTTP2_PCL_COUNT /* must be last */
81 } grpc_chttp2_ping_closure_list;
82 
83 typedef enum {
84   GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
85   GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
86   GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
87   GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
88   GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
89   GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
90   GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
91   GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
92   GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
93   GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
94   GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
95   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
96   GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
97   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
98   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
99   GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
100   GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
101   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
102   GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
103   GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
104 } grpc_chttp2_initiate_write_reason;
105 
106 const char* grpc_chttp2_initiate_write_reason_string(
107     grpc_chttp2_initiate_write_reason reason);
108 
109 struct grpc_chttp2_ping_queue {
110   grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT] = {};
111   uint64_t inflight_id = 0;
112 };
113 struct grpc_chttp2_repeated_ping_policy {
114   int max_pings_without_data;
115   int max_ping_strikes;
116   grpc_millis min_sent_ping_interval_without_data;
117   grpc_millis min_recv_ping_interval_without_data;
118 };
119 struct grpc_chttp2_repeated_ping_state {
120   grpc_millis last_ping_sent_time;
121   int pings_before_data_required;
122   grpc_timer delayed_ping_timer;
123   bool is_delayed_ping_timer_set;
124 };
125 struct grpc_chttp2_server_ping_recv_state {
126   grpc_millis last_ping_recv_time;
127   int ping_strikes;
128 };
129 /* deframer state for the overall http2 stream of bytes */
130 typedef enum {
131   /* prefix: one entry per http2 connection prefix byte */
132   GRPC_DTS_CLIENT_PREFIX_0 = 0,
133   GRPC_DTS_CLIENT_PREFIX_1,
134   GRPC_DTS_CLIENT_PREFIX_2,
135   GRPC_DTS_CLIENT_PREFIX_3,
136   GRPC_DTS_CLIENT_PREFIX_4,
137   GRPC_DTS_CLIENT_PREFIX_5,
138   GRPC_DTS_CLIENT_PREFIX_6,
139   GRPC_DTS_CLIENT_PREFIX_7,
140   GRPC_DTS_CLIENT_PREFIX_8,
141   GRPC_DTS_CLIENT_PREFIX_9,
142   GRPC_DTS_CLIENT_PREFIX_10,
143   GRPC_DTS_CLIENT_PREFIX_11,
144   GRPC_DTS_CLIENT_PREFIX_12,
145   GRPC_DTS_CLIENT_PREFIX_13,
146   GRPC_DTS_CLIENT_PREFIX_14,
147   GRPC_DTS_CLIENT_PREFIX_15,
148   GRPC_DTS_CLIENT_PREFIX_16,
149   GRPC_DTS_CLIENT_PREFIX_17,
150   GRPC_DTS_CLIENT_PREFIX_18,
151   GRPC_DTS_CLIENT_PREFIX_19,
152   GRPC_DTS_CLIENT_PREFIX_20,
153   GRPC_DTS_CLIENT_PREFIX_21,
154   GRPC_DTS_CLIENT_PREFIX_22,
155   GRPC_DTS_CLIENT_PREFIX_23,
156   /* frame header byte 0... */
157   /* must follow from the prefix states */
158   GRPC_DTS_FH_0,
159   GRPC_DTS_FH_1,
160   GRPC_DTS_FH_2,
161   GRPC_DTS_FH_3,
162   GRPC_DTS_FH_4,
163   GRPC_DTS_FH_5,
164   GRPC_DTS_FH_6,
165   GRPC_DTS_FH_7,
166   /* ... frame header byte 8 */
167   GRPC_DTS_FH_8,
168   /* inside a http2 frame */
169   GRPC_DTS_FRAME
170 } grpc_chttp2_deframe_transport_state;
171 
172 struct grpc_chttp2_stream_list {
173   grpc_chttp2_stream* head;
174   grpc_chttp2_stream* tail;
175 };
176 struct grpc_chttp2_stream_link {
177   grpc_chttp2_stream* next;
178   grpc_chttp2_stream* prev;
179 };
180 /* We keep several sets of connection wide parameters */
181 typedef enum {
182   /* The settings our peer has asked for (and we have acked) */
183   GRPC_PEER_SETTINGS = 0,
184   /* The settings we'd like to have */
185   GRPC_LOCAL_SETTINGS,
186   /* The settings we've published to our peer */
187   GRPC_SENT_SETTINGS,
188   /* The settings the peer has acked */
189   GRPC_ACKED_SETTINGS,
190   GRPC_NUM_SETTING_SETS
191 } grpc_chttp2_setting_set;
192 
193 typedef enum {
194   GRPC_CHTTP2_NO_GOAWAY_SEND,
195   GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED,
196   GRPC_CHTTP2_GOAWAY_SENT,
197 } grpc_chttp2_sent_goaway_state;
198 
199 typedef struct grpc_chttp2_write_cb {
200   int64_t call_at_byte;
201   grpc_closure* closure;
202   struct grpc_chttp2_write_cb* next;
203 } grpc_chttp2_write_cb;
204 
205 namespace grpc_core {
206 
207 class Chttp2IncomingByteStream : public ByteStream {
208  public:
209   Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
210                            grpc_chttp2_stream* stream, uint32_t frame_size,
211                            uint32_t flags);
212 
213   void Orphan() override;
214 
215   bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
216   grpc_error* Pull(grpc_slice* slice) override;
217   void Shutdown(grpc_error* error) override;
218 
219   // TODO(roth): When I converted this class to C++, I wanted to make it
220   // inherit from RefCounted or InternallyRefCounted instead of continuing
221   // to use its own custom ref-counting code.  However, that would require
222   // using multiple inheritance, which sucks in general.  And to make matters
223   // worse, it causes problems with our New<> and Delete<> wrappers.
224   // Specifically, unless RefCounted is first in the list of parent classes,
225   // it will see a different value of the address of the object than the one
226   // we actually allocated, in which case gpr_free() will be called on a
227   // different address than the one we got from gpr_malloc(), thus causing a
228   // crash.  Given the fragility of depending on that, as well as a desire to
229   // avoid multiple inheritance in general, I've decided to leave this
230   // alone for now.  We can revisit this once we're able to link against
231   // libc++, at which point we can eliminate New<> and Delete<> and
232   // switch to std::shared_ptr<>.
Ref()233   void Ref() { refs_.Ref(); }
Unref()234   void Unref() {
235     if (GPR_UNLIKELY(refs_.Unref())) {
236       delete this;
237     }
238   }
239 
240   void PublishError(grpc_error* error);
241 
242   grpc_error* Push(const grpc_slice& slice, grpc_slice* slice_out);
243 
244   grpc_error* Finished(grpc_error* error, bool reset_on_error);
245 
remaining_bytes()246   uint32_t remaining_bytes() const { return remaining_bytes_; }
247 
248  private:
249   static void NextLocked(void* arg, grpc_error* error_ignored);
250   static void OrphanLocked(void* arg, grpc_error* error_ignored);
251 
252   void MaybeCreateStreamDecompressionCtx();
253 
254   grpc_chttp2_transport* transport_;  // Immutable.
255   grpc_chttp2_stream* stream_;        // Immutable.
256 
257   grpc_core::RefCount refs_;
258 
259   /* Accessed only by transport thread when stream->pending_byte_stream == false
260    * Accessed only by application thread when stream->pending_byte_stream ==
261    * true */
262   uint32_t remaining_bytes_;
263 
264   /* Accessed only by transport thread when stream->pending_byte_stream == false
265    * Accessed only by application thread when stream->pending_byte_stream ==
266    * true */
267   struct {
268     grpc_closure closure;
269     size_t max_size_hint;
270     grpc_closure* on_complete;
271   } next_action_;
272   grpc_closure destroy_action_;
273 };
274 
275 }  // namespace grpc_core
276 
277 typedef enum {
278   GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
279   GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
280   GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
281   GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
282 } grpc_chttp2_keepalive_state;
283 
284 struct grpc_chttp2_transport {
285   grpc_chttp2_transport(const grpc_channel_args* channel_args,
286                         grpc_endpoint* ep, bool is_client,
287                         grpc_resource_user* resource_user);
288   ~grpc_chttp2_transport();
289 
290   grpc_transport base; /* must be first */
291   grpc_core::RefCount refs;
292   grpc_endpoint* ep;
293   char* peer_string;
294 
295   grpc_resource_user* resource_user;
296 
297   grpc_core::Combiner* combiner;
298 
299   grpc_closure* notify_on_receive_settings = nullptr;
300 
301   /** write execution state of the transport */
302   grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
303 
304   /** is the transport destroying itself? */
305   uint8_t destroying = false;
306   /** has the upper layer closed the transport? */
307   grpc_error* closed_with_error = GRPC_ERROR_NONE;
308 
309   /** is there a read request to the endpoint outstanding? */
310   uint8_t endpoint_reading = 1;
311 
312   /** various lists of streams */
313   grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
314 
315   /** maps stream id to grpc_chttp2_stream objects */
316   grpc_chttp2_stream_map stream_map;
317 
318   grpc_closure write_action_begin_locked;
319   grpc_closure write_action;
320   grpc_closure write_action_end_locked;
321 
322   grpc_closure read_action_locked;
323 
324   /** incoming read bytes */
325   grpc_slice_buffer read_buffer;
326 
327   /** address to place a newly accepted stream - set and unset by
328       grpc_chttp2_parsing_accept_stream; used by init_stream to
329       publish the accepted server stream */
330   grpc_chttp2_stream** accepting_stream = nullptr;
331 
332   /* accept stream callback */
333   void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
334                            const void* server_data);
335   void* accept_stream_cb_user_data;
336 
337   /** connectivity tracking */
338   grpc_core::ConnectivityStateTracker state_tracker;
339 
340   /** data to write now */
341   grpc_slice_buffer outbuf;
342   /** hpack encoding */
343   grpc_chttp2_hpack_compressor hpack_compressor;
344   /** is this a client? */
345   bool is_client;
346 
347   /** data to write next write */
348   grpc_slice_buffer qbuf;
349 
350   /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
351    */
352   uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
353 
354   /** Set to a grpc_error object if a goaway frame is received. By default, set
355    * to GRPC_ERROR_NONE */
356   grpc_error* goaway_error = GRPC_ERROR_NONE;
357 
358   grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
359 
360   /** are the local settings dirty and need to be sent? */
361   bool dirtied_local_settings = true;
362   /** have local settings been sent? */
363   bool sent_local_settings = false;
364   /** bitmask of setting indexes to send out
365       Hack: it's common for implementations to assume 65536 bytes initial send
366       window -- this should by rights be 0 */
367   uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
368   /** settings values */
369   uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
370 
371   /** what is the next stream id to be allocated by this peer?
372       copied to next_stream_id in parsing when parsing commences */
373   uint32_t next_stream_id = 0;
374 
375   /** last new stream id */
376   uint32_t last_new_stream_id = 0;
377 
378   /** ping queues for various ping insertion points */
379   grpc_chttp2_ping_queue ping_queue = grpc_chttp2_ping_queue();
380   grpc_chttp2_repeated_ping_policy ping_policy;
381   grpc_chttp2_repeated_ping_state ping_state;
382   uint64_t ping_ctr = 0; /* unique id for pings */
383   grpc_closure retry_initiate_ping_locked;
384 
385   /** ping acks */
386   size_t ping_ack_count = 0;
387   size_t ping_ack_capacity = 0;
388   uint64_t* ping_acks = nullptr;
389   grpc_chttp2_server_ping_recv_state ping_recv_state;
390 
391   /** parser for headers */
392   grpc_chttp2_hpack_parser hpack_parser;
393   /** simple one shot parsers */
394   union {
395     grpc_chttp2_window_update_parser window_update;
396     grpc_chttp2_settings_parser settings;
397     grpc_chttp2_ping_parser ping;
398     grpc_chttp2_rst_stream_parser rst_stream;
399   } simple;
400   /** parser for goaway frames */
401   grpc_chttp2_goaway_parser goaway_parser;
402 
403   grpc_core::PolymorphicManualConstructor<
404       grpc_core::chttp2::TransportFlowControlBase,
405       grpc_core::chttp2::TransportFlowControl,
406       grpc_core::chttp2::TransportFlowControlDisabled>
407       flow_control;
408   /** initial window change. This is tracked as we parse settings frames from
409    * the remote peer. If there is a positive delta, then we will make all
410    * streams readable since they may have become unstalled */
411   int64_t initial_window_update = 0;
412 
413   /* deframing */
414   grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
415   uint8_t incoming_frame_type = 0;
416   uint8_t incoming_frame_flags = 0;
417   uint8_t header_eof = 0;
418   bool is_first_frame = true;
419   uint32_t expect_continuation_stream_id = 0;
420   uint32_t incoming_frame_size = 0;
421   uint32_t incoming_stream_id = 0;
422 
423   /* active parser */
424   void* parser_data = nullptr;
425   grpc_chttp2_stream* incoming_stream = nullptr;
426   grpc_error* (*parser)(void* parser_user_data, grpc_chttp2_transport* t,
427                         grpc_chttp2_stream* s, const grpc_slice& slice,
428                         int is_last);
429 
430   grpc_chttp2_write_cb* write_cb_pool = nullptr;
431 
432   /* bdp estimator */
433   grpc_closure next_bdp_ping_timer_expired_locked;
434   grpc_closure start_bdp_ping_locked;
435   grpc_closure finish_bdp_ping_locked;
436 
437   /* if non-NULL, close the transport with this error when writes are finished
438    */
439   grpc_error* close_transport_on_writes_finished = GRPC_ERROR_NONE;
440 
441   /* a list of closures to run after writes are finished */
442   grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
443 
444   /* buffer pool state */
445   /** have we scheduled a benign cleanup? */
446   bool benign_reclaimer_registered = false;
447   /** have we scheduled a destructive cleanup? */
448   bool destructive_reclaimer_registered = false;
449   /** benign cleanup closure */
450   grpc_closure benign_reclaimer_locked;
451   /** destructive cleanup closure */
452   grpc_closure destructive_reclaimer_locked;
453 
454   /* next bdp ping timer */
455   bool have_next_bdp_ping_timer = false;
456   /** If start_bdp_ping_locked has been called */
457   bool bdp_ping_started = false;
458   grpc_timer next_bdp_ping_timer;
459 
460   /* keep-alive ping support */
461   /** Closure to initialize a keepalive ping */
462   grpc_closure init_keepalive_ping_locked;
463   /** Closure to run when the keepalive ping is sent */
464   grpc_closure start_keepalive_ping_locked;
465   /** Cousure to run when the keepalive ping ack is received */
466   grpc_closure finish_keepalive_ping_locked;
467   /** Closrue to run when the keepalive ping timeouts */
468   grpc_closure keepalive_watchdog_fired_locked;
469   /** timer to initiate ping events */
470   grpc_timer keepalive_ping_timer;
471   /** watchdog to kill the transport when waiting for the keepalive ping */
472   grpc_timer keepalive_watchdog_timer;
473   /** time duration in between pings */
474   grpc_millis keepalive_time;
475   /** grace period for a ping to complete before watchdog kicks in */
476   grpc_millis keepalive_timeout;
477   /** if keepalive pings are allowed when there's no outstanding streams */
478   bool keepalive_permit_without_calls = false;
479   /** If start_keepalive_ping_locked has been called */
480   bool keepalive_ping_started = false;
481   /** keep-alive state machine state */
482   grpc_chttp2_keepalive_state keepalive_state;
483   grpc_core::ContextList* cl = nullptr;
484   grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
485   uint32_t num_messages_in_next_write = 0;
486   /** The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
487    * RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
488    * DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
489    * only continue reading when we are able to write to the socket again,
490    * thereby reducing the number of induced frames. */
491   uint32_t num_pending_induced_frames = 0;
492   bool reading_paused_on_pending_induced_frames = false;
493 };
494 
495 typedef enum {
496   GRPC_METADATA_NOT_PUBLISHED,
497   GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
498   GRPC_METADATA_PUBLISHED_FROM_WIRE,
499   GRPC_METADATA_PUBLISHED_AT_CLOSE
500 } grpc_published_metadata_method;
501 
502 struct grpc_chttp2_stream {
503   grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
504                      const void* server_data, grpc_core::Arena* arena);
505   ~grpc_chttp2_stream();
506 
507   void* context;
508   grpc_chttp2_transport* t;
509   grpc_stream_refcount* refcount;
510   // Reffer is a 0-len structure, simply reffing `t` and `refcount` in its ctor
511   // before initializing the rest of the stream, to avoid cache misses. This
512   // field MUST be right after `t` and `refcount`.
513   struct Reffer {
514     explicit Reffer(grpc_chttp2_stream* s);
515   } reffer;
516 
517   grpc_closure destroy_stream;
518   grpc_closure* destroy_stream_arg;
519 
520   grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
521   uint8_t included[STREAM_LIST_COUNT] = {};
522 
523   /** HTTP2 stream id for this stream, or zero if one has not been assigned */
524   uint32_t id = 0;
525 
526   /** things the upper layers would like to send */
527   grpc_metadata_batch* send_initial_metadata = nullptr;
528   grpc_closure* send_initial_metadata_finished = nullptr;
529   grpc_metadata_batch* send_trailing_metadata = nullptr;
530   // TODO(yashykt): Find a better name for the below field and others in this
531   //                struct to betteer distinguish inputs, return values, and
532   //                internal state.
533   // sent_trailing_metadata_op allows the transport to fill in to the upper
534   // layer whether this stream was able to send its trailing metadata (used for
535   // detecting cancellation on the server-side)..
536   bool* sent_trailing_metadata_op = nullptr;
537   grpc_closure* send_trailing_metadata_finished = nullptr;
538 
539   grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
540   uint32_t fetched_send_message_length = 0;
541   grpc_slice fetching_slice = grpc_empty_slice();
542   int64_t next_message_end_offset;
543   int64_t flow_controlled_bytes_written = 0;
544   int64_t flow_controlled_bytes_flowed = 0;
545   grpc_closure complete_fetch_locked;
546   grpc_closure* fetching_send_message_finished = nullptr;
547 
548   grpc_metadata_batch* recv_initial_metadata;
549   grpc_closure* recv_initial_metadata_ready = nullptr;
550   bool* trailing_metadata_available = nullptr;
551   grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
552   grpc_closure* recv_message_ready = nullptr;
553   grpc_metadata_batch* recv_trailing_metadata;
554   grpc_closure* recv_trailing_metadata_finished = nullptr;
555 
556   grpc_transport_stream_stats* collecting_stats = nullptr;
557   grpc_transport_stream_stats stats = grpc_transport_stream_stats();
558 
559   /** Is this stream closed for writing. */
560   bool write_closed = false;
561   /** Is this stream reading half-closed. */
562   bool read_closed = false;
563   /** Are all published incoming byte streams closed. */
564   bool all_incoming_byte_streams_finished = false;
565   /** Has this stream seen an error.
566       If true, then pending incoming frames can be thrown away. */
567   bool seen_error = false;
568   /** Are we buffering writes on this stream? If yes, we won't become writable
569       until there's enough queued up in the flow_controlled_buffer */
570   bool write_buffering = false;
571 
572   /* have we sent or received the EOS bit? */
573   bool eos_received = false;
574   bool eos_sent = false;
575 
576   /** the error that resulted in this stream being read-closed */
577   grpc_error* read_closed_error = GRPC_ERROR_NONE;
578   /** the error that resulted in this stream being write-closed */
579   grpc_error* write_closed_error = GRPC_ERROR_NONE;
580 
581   grpc_published_metadata_method published_metadata[2] = {};
582   bool final_metadata_requested = false;
583 
584   grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
585 
586   grpc_slice_buffer frame_storage; /* protected by t combiner */
587 
588   grpc_closure* on_next = nullptr;  /* protected by t combiner */
589   bool pending_byte_stream = false; /* protected by t combiner */
590   // cached length of buffer to be used by the transport thread in cases where
591   // stream->pending_byte_stream == true. The value is saved before
592   // application threads are allowed to modify
593   // unprocessed_incoming_frames_buffer
594   size_t unprocessed_incoming_frames_buffer_cached_length = 0;
595   /* Accessed only by transport thread when stream->pending_byte_stream == false
596    * Accessed only by application thread when stream->pending_byte_stream ==
597    * true */
598   grpc_slice_buffer unprocessed_incoming_frames_buffer;
599   grpc_closure reset_byte_stream;
600   grpc_error* byte_stream_error = GRPC_ERROR_NONE; /* protected by t combiner */
601   bool received_last_frame = false;                /* protected by t combiner */
602 
603   grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
604 
605   /** saw some stream level error */
606   grpc_error* forced_close_error = GRPC_ERROR_NONE;
607   /** how many header frames have we received? */
608   uint8_t header_frames_received = 0;
609   /** parsing state for data frames */
610   /* Accessed only by transport thread when stream->pending_byte_stream == false
611    * Accessed only by application thread when stream->pending_byte_stream ==
612    * true */
613   grpc_chttp2_data_parser data_parser;
614   /** number of bytes received - reset at end of parse thread execution */
615   int64_t received_bytes = 0;
616 
617   bool sent_initial_metadata = false;
618   bool sent_trailing_metadata = false;
619 
620   grpc_core::PolymorphicManualConstructor<
621       grpc_core::chttp2::StreamFlowControlBase,
622       grpc_core::chttp2::StreamFlowControl,
623       grpc_core::chttp2::StreamFlowControlDisabled>
624       flow_control;
625 
626   grpc_slice_buffer flow_controlled_buffer;
627 
628   grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
629   grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
630   grpc_chttp2_write_cb* finish_after_write = nullptr;
631   size_t sending_bytes = 0;
632 
633   /* Stream compression method to be used. */
634   grpc_stream_compression_method stream_compression_method =
635       GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
636   /* Stream decompression method to be used. */
637   grpc_stream_compression_method stream_decompression_method =
638       GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS;
639 
640   /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
641    */
642   bool unprocessed_incoming_frames_decompressed = false;
643   /** Whether the bytes needs to be traced using Fathom */
644   bool traced = false;
645   /** gRPC header bytes that are already decompressed */
646   size_t decompressed_header_bytes = 0;
647   /** Byte counter for number of bytes written */
648   size_t byte_counter = 0;
649 
650   /** Amount of uncompressed bytes sent out when compressed_data_buffer is
651    * emptied */
652   size_t uncompressed_data_size;
653   /** Stream compression compress context */
654   grpc_stream_compression_context* stream_compression_ctx;
655   /** Buffer storing data that is compressed but not sent */
656   grpc_slice_buffer compressed_data_buffer;
657 
658   /** Stream compression decompress context */
659   grpc_stream_compression_context* stream_decompression_ctx;
660   /** Temporary buffer storing decompressed data.
661    * Initialized, used, and destroyed only when stream uses (non-identity)
662    * compression.
663    */
664   grpc_slice_buffer decompressed_data_buffer;
665 };
666 
667 /** Transport writing call flow:
668     grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
669     go out on the wire.
670     If no other write has been started, a task is enqueued onto our workqueue.
671     When that task executes, it obtains the global lock, and gathers the data
672     to write.
673     The global lock is dropped and we do the syscall to write.
674     After writing, a follow-up check is made to see if another round of writing
675     should be performed.
676 
677     The actual call chain is documented in the implementation of this function.
678     */
679 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
680                                 grpc_chttp2_initiate_write_reason reason);
681 
682 struct grpc_chttp2_begin_write_result {
683   /** are we writing? */
684   bool writing;
685   /** if writing: was it a complete flush (false) or a partial flush (true) */
686   bool partial;
687   /** did we queue any completions as part of beginning the write */
688   bool early_results_scheduled;
689 };
690 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
691     grpc_chttp2_transport* t);
692 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error);
693 
694 /** Process one slice of incoming data; return 1 if the connection is still
695     viable after reading, or 0 if the connection should be torn down */
696 grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t,
697                                      const grpc_slice& slice);
698 
699 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
700                                           grpc_chttp2_stream* s);
701 /** Get a writable stream
702     returns non-zero if there was a stream available */
703 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
704                                           grpc_chttp2_stream** s);
705 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
706                                              grpc_chttp2_stream* s);
707 
708 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
709                                          grpc_chttp2_stream* s);
710 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
711 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
712                                          grpc_chttp2_stream** s);
713 
714 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
715                                          grpc_chttp2_stream* s);
716 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
717                                          grpc_chttp2_stream** s);
718 
719 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
720                                                   grpc_chttp2_stream* s);
721 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
722                                                   grpc_chttp2_stream** s);
723 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
724                                                      grpc_chttp2_stream* s);
725 
726 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
727                                                grpc_chttp2_stream* s);
728 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
729                                                grpc_chttp2_stream** s);
730 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
731                                                   grpc_chttp2_stream* s);
732 
733 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
734                                             grpc_chttp2_stream* s);
735 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
736                                             grpc_chttp2_stream** s);
737 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
738                                                grpc_chttp2_stream* s);
739 
740 /********* Flow Control ***************/
741 
742 // Takes in a flow control action and performs all the needed operations.
743 void grpc_chttp2_act_on_flowctl_action(
744     const grpc_core::chttp2::FlowControlAction& action,
745     grpc_chttp2_transport* t, grpc_chttp2_stream* s);
746 
747 /********* End of Flow Control ***************/
748 
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)749 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
750     grpc_chttp2_transport* t, uint32_t id) {
751   return static_cast<grpc_chttp2_stream*>(
752       grpc_chttp2_stream_map_find(&t->stream_map, id));
753 }
754 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
755                                                       uint32_t id);
756 
757 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
758                                      uint32_t goaway_error,
759                                      uint32_t last_stream_id,
760                                      const grpc_slice& goaway_text);
761 
762 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
763 
764 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
765                                        grpc_chttp2_stream* s,
766                                        grpc_closure** pclosure,
767                                        grpc_error* error, const char* desc);
768 
769 #define GRPC_HEADER_SIZE_IN_BYTES 5
770 #define MAX_SIZE_T (~(size_t)0)
771 
772 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
773 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
774   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
775 
776 // extern grpc_core::TraceFlag grpc_http_trace;
777 // extern grpc_core::TraceFlag grpc_flowctl_trace;
778 
779 #define GRPC_CHTTP2_IF_TRACING(stmt)                \
780   do {                                              \
781     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
782       (stmt);                                       \
783     }                                               \
784   } while (0)
785 
786 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
787                              grpc_chttp2_stream* stream, grpc_error* error);
788 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
789                                     grpc_chttp2_stream* s, int close_reads,
790                                     int close_writes, grpc_error* error);
791 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
792 
793 #ifndef NDEBUG
794 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
795   grpc_chttp2_stream_ref(stream, reason)
796 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
797   grpc_chttp2_stream_unref(stream, reason)
798 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
799 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
800 #else
801 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
802 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
803   grpc_chttp2_stream_unref(stream)
804 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
805 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
806 #endif
807 
808 #ifndef NDEBUG
809 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \
810   grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__)
811 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \
812   grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)813 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t,
814                                         const char* reason, const char* file,
815                                         int line) {
816   if (t->refs.Unref(grpc_core::DebugLocation(file, line), reason)) {
817     delete t;
818   }
819 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t,const char * reason,const char * file,int line)820 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t,
821                                       const char* reason, const char* file,
822                                       int line) {
823   t->refs.Ref(grpc_core::DebugLocation(file, line), reason);
824 }
825 #else
826 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t)
827 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t)
grpc_chttp2_unref_transport(grpc_chttp2_transport * t)828 inline void grpc_chttp2_unref_transport(grpc_chttp2_transport* t) {
829   if (t->refs.Unref()) {
830     delete t;
831   }
832 }
grpc_chttp2_ref_transport(grpc_chttp2_transport * t)833 inline void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) {
834   t->refs.Ref();
835 }
836 #endif
837 
838 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
839 
840 /** Add a new ping strike to ping_recv_state.ping_strikes. If
841     ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY
842     with error code ENHANCE_YOUR_CALM and additional debug data resembling
843     "too_many_pings" followed by immediately closing the connection. */
844 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t);
845 
846 /** Resets ping clock. Should be called when flushing window updates,
847  * initial/trailing metadata or data frames. For a server, it resets the number
848  * of ping strikes and the last_ping_recv_time. For a ping sender, it resets
849  * pings_before_data_required. */
850 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
851 
852 /** add a ref to the stream and add it to the writable list;
853     ref will be dropped in writing.c */
854 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
855                                       grpc_chttp2_stream* s);
856 
857 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
858                                grpc_error* due_to_error);
859 
860 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
861                                                       grpc_chttp2_stream* s);
862 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
863                                              grpc_chttp2_stream* s);
864 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
865                                                        grpc_chttp2_stream* s);
866 
867 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
868                                      grpc_chttp2_stream* s, grpc_error* error);
869 
870 /** Set the default keepalive configurations, must only be called at
871     initialization */
872 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
873                                                bool is_client);
874 
875 void grpc_chttp2_retry_initiate_ping(void* tp, grpc_error* error);
876 
877 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
878