1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H 20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <assert.h> 25 #include <stdbool.h> 26 27 #include "src/core/ext/transport/chttp2/transport/flow_control.h" 28 #include "src/core/ext/transport/chttp2/transport/frame.h" 29 #include "src/core/ext/transport/chttp2/transport/frame_data.h" 30 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h" 31 #include "src/core/ext/transport/chttp2/transport/frame_ping.h" 32 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h" 33 #include "src/core/ext/transport/chttp2/transport/frame_settings.h" 34 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h" 35 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" 36 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" 37 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" 38 #include "src/core/ext/transport/chttp2/transport/stream_map.h" 39 #include "src/core/lib/compression/stream_compression.h" 40 #include "src/core/lib/gprpp/manual_constructor.h" 41 #include "src/core/lib/iomgr/combiner.h" 42 #include "src/core/lib/iomgr/endpoint.h" 43 #include "src/core/lib/iomgr/timer.h" 44 #include "src/core/lib/transport/connectivity_state.h" 45 #include "src/core/lib/transport/transport_impl.h" 46 47 /* streams are kept in various linked lists depending on what things need to 48 happen to them... this enum labels each list */ 49 typedef enum { 50 GRPC_CHTTP2_LIST_WRITABLE, 51 GRPC_CHTTP2_LIST_WRITING, 52 GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, 53 GRPC_CHTTP2_LIST_STALLED_BY_STREAM, 54 /** streams that are waiting to start because there are too many concurrent 55 streams on the connection */ 56 GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY, 57 STREAM_LIST_COUNT /* must be last */ 58 } grpc_chttp2_stream_list_id; 59 60 typedef enum { 61 GRPC_CHTTP2_WRITE_STATE_IDLE, 62 GRPC_CHTTP2_WRITE_STATE_WRITING, 63 GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, 64 } grpc_chttp2_write_state; 65 66 typedef enum { 67 GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY, 68 GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT, 69 } grpc_chttp2_optimization_target; 70 71 typedef enum { 72 GRPC_CHTTP2_PCL_INITIATE = 0, 73 GRPC_CHTTP2_PCL_NEXT, 74 GRPC_CHTTP2_PCL_INFLIGHT, 75 GRPC_CHTTP2_PCL_COUNT /* must be last */ 76 } grpc_chttp2_ping_closure_list; 77 78 typedef enum { 79 GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE, 80 GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM, 81 GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE, 82 GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA, 83 GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA, 84 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING, 85 GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS, 86 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT, 87 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM, 88 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API, 89 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, 90 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, 91 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, 92 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING, 93 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE, 94 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING, 95 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING, 96 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED, 97 GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE, 98 GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM, 99 } grpc_chttp2_initiate_write_reason; 100 101 const char* grpc_chttp2_initiate_write_reason_string( 102 grpc_chttp2_initiate_write_reason reason); 103 104 typedef struct { 105 grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT]; 106 uint64_t inflight_id; 107 } grpc_chttp2_ping_queue; 108 109 typedef struct { 110 int max_pings_without_data; 111 int max_ping_strikes; 112 grpc_millis min_sent_ping_interval_without_data; 113 grpc_millis min_recv_ping_interval_without_data; 114 } grpc_chttp2_repeated_ping_policy; 115 116 typedef struct { 117 grpc_millis last_ping_sent_time; 118 int pings_before_data_required; 119 grpc_timer delayed_ping_timer; 120 bool is_delayed_ping_timer_set; 121 } grpc_chttp2_repeated_ping_state; 122 123 typedef struct { 124 grpc_millis last_ping_recv_time; 125 int ping_strikes; 126 } grpc_chttp2_server_ping_recv_state; 127 128 /* deframer state for the overall http2 stream of bytes */ 129 typedef enum { 130 /* prefix: one entry per http2 connection prefix byte */ 131 GRPC_DTS_CLIENT_PREFIX_0 = 0, 132 GRPC_DTS_CLIENT_PREFIX_1, 133 GRPC_DTS_CLIENT_PREFIX_2, 134 GRPC_DTS_CLIENT_PREFIX_3, 135 GRPC_DTS_CLIENT_PREFIX_4, 136 GRPC_DTS_CLIENT_PREFIX_5, 137 GRPC_DTS_CLIENT_PREFIX_6, 138 GRPC_DTS_CLIENT_PREFIX_7, 139 GRPC_DTS_CLIENT_PREFIX_8, 140 GRPC_DTS_CLIENT_PREFIX_9, 141 GRPC_DTS_CLIENT_PREFIX_10, 142 GRPC_DTS_CLIENT_PREFIX_11, 143 GRPC_DTS_CLIENT_PREFIX_12, 144 GRPC_DTS_CLIENT_PREFIX_13, 145 GRPC_DTS_CLIENT_PREFIX_14, 146 GRPC_DTS_CLIENT_PREFIX_15, 147 GRPC_DTS_CLIENT_PREFIX_16, 148 GRPC_DTS_CLIENT_PREFIX_17, 149 GRPC_DTS_CLIENT_PREFIX_18, 150 GRPC_DTS_CLIENT_PREFIX_19, 151 GRPC_DTS_CLIENT_PREFIX_20, 152 GRPC_DTS_CLIENT_PREFIX_21, 153 GRPC_DTS_CLIENT_PREFIX_22, 154 GRPC_DTS_CLIENT_PREFIX_23, 155 /* frame header byte 0... */ 156 /* must follow from the prefix states */ 157 GRPC_DTS_FH_0, 158 GRPC_DTS_FH_1, 159 GRPC_DTS_FH_2, 160 GRPC_DTS_FH_3, 161 GRPC_DTS_FH_4, 162 GRPC_DTS_FH_5, 163 GRPC_DTS_FH_6, 164 GRPC_DTS_FH_7, 165 /* ... frame header byte 8 */ 166 GRPC_DTS_FH_8, 167 /* inside a http2 frame */ 168 GRPC_DTS_FRAME 169 } grpc_chttp2_deframe_transport_state; 170 171 typedef struct { 172 grpc_chttp2_stream* head; 173 grpc_chttp2_stream* tail; 174 } grpc_chttp2_stream_list; 175 176 typedef struct { 177 grpc_chttp2_stream* next; 178 grpc_chttp2_stream* prev; 179 } grpc_chttp2_stream_link; 180 181 /* We keep several sets of connection wide parameters */ 182 typedef enum { 183 /* The settings our peer has asked for (and we have acked) */ 184 GRPC_PEER_SETTINGS = 0, 185 /* The settings we'd like to have */ 186 GRPC_LOCAL_SETTINGS, 187 /* The settings we've published to our peer */ 188 GRPC_SENT_SETTINGS, 189 /* The settings the peer has acked */ 190 GRPC_ACKED_SETTINGS, 191 GRPC_NUM_SETTING_SETS 192 } grpc_chttp2_setting_set; 193 194 typedef enum { 195 GRPC_CHTTP2_NO_GOAWAY_SEND, 196 GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED, 197 GRPC_CHTTP2_GOAWAY_SENT, 198 } grpc_chttp2_sent_goaway_state; 199 200 typedef struct grpc_chttp2_write_cb { 201 int64_t call_at_byte; 202 grpc_closure* closure; 203 struct grpc_chttp2_write_cb* next; 204 } grpc_chttp2_write_cb; 205 206 namespace grpc_core { 207 208 class Chttp2IncomingByteStream : public ByteStream { 209 public: 210 Chttp2IncomingByteStream(grpc_chttp2_transport* transport, 211 grpc_chttp2_stream* stream, uint32_t frame_size, 212 uint32_t flags); 213 214 void Orphan() override; 215 216 bool Next(size_t max_size_hint, grpc_closure* on_complete) override; 217 grpc_error* Pull(grpc_slice* slice) override; 218 void Shutdown(grpc_error* error) override; 219 220 // TODO(roth): When I converted this class to C++, I wanted to make it 221 // inherit from RefCounted or InternallyRefCounted instead of continuing 222 // to use its own custom ref-counting code. However, that would require 223 // using multiple inheritence, which sucks in general. And to make matters 224 // worse, it causes problems with our New<> and Delete<> wrappers. 225 // Specifically, unless RefCounted is first in the list of parent classes, 226 // it will see a different value of the address of the object than the one 227 // we actually allocated, in which case gpr_free() will be called on a 228 // different address than the one we got from gpr_malloc(), thus causing a 229 // crash. Given the fragility of depending on that, as well as a desire to 230 // avoid multiple inheritence in general, I've decided to leave this 231 // alone for now. We can revisit this once we're able to link against 232 // libc++, at which point we can eliminate New<> and Delete<> and 233 // switch to std::shared_ptr<>. 234 void Ref(); 235 void Unref(); 236 237 void PublishError(grpc_error* error); 238 239 grpc_error* Push(grpc_slice slice, grpc_slice* slice_out); 240 241 grpc_error* Finished(grpc_error* error, bool reset_on_error); 242 remaining_bytes()243 uint32_t remaining_bytes() const { return remaining_bytes_; } 244 245 private: 246 static void NextLocked(void* arg, grpc_error* error_ignored); 247 static void OrphanLocked(void* arg, grpc_error* error_ignored); 248 249 void MaybeCreateStreamDecompressionCtx(); 250 251 grpc_chttp2_transport* transport_; // Immutable. 252 grpc_chttp2_stream* stream_; // Immutable. 253 254 gpr_refcount refs_; 255 256 /* Accessed only by transport thread when stream->pending_byte_stream == false 257 * Accessed only by application thread when stream->pending_byte_stream == 258 * true */ 259 uint32_t remaining_bytes_; 260 261 /* Accessed only by transport thread when stream->pending_byte_stream == false 262 * Accessed only by application thread when stream->pending_byte_stream == 263 * true */ 264 struct { 265 grpc_closure closure; 266 size_t max_size_hint; 267 grpc_closure* on_complete; 268 } next_action_; 269 grpc_closure destroy_action_; 270 }; 271 272 } // namespace grpc_core 273 274 typedef enum { 275 GRPC_CHTTP2_KEEPALIVE_STATE_WAITING, 276 GRPC_CHTTP2_KEEPALIVE_STATE_PINGING, 277 GRPC_CHTTP2_KEEPALIVE_STATE_DYING, 278 GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, 279 } grpc_chttp2_keepalive_state; 280 281 struct grpc_chttp2_transport { 282 grpc_transport base; /* must be first */ 283 gpr_refcount refs; 284 grpc_endpoint* ep; 285 char* peer_string; 286 287 grpc_combiner* combiner; 288 289 grpc_closure* notify_on_receive_settings; 290 291 /** write execution state of the transport */ 292 grpc_chttp2_write_state write_state; 293 /** is this the first write in a series of writes? 294 set when we initiate writing from idle, cleared when we 295 initiate writing from writing+more */ 296 bool is_first_write_in_batch; 297 298 /** is the transport destroying itself? */ 299 uint8_t destroying; 300 /** has the upper layer closed the transport? */ 301 grpc_error* closed_with_error; 302 303 /** is there a read request to the endpoint outstanding? */ 304 uint8_t endpoint_reading; 305 306 grpc_chttp2_optimization_target opt_target; 307 308 /** various lists of streams */ 309 grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; 310 311 /** maps stream id to grpc_chttp2_stream objects */ 312 grpc_chttp2_stream_map stream_map; 313 314 grpc_closure write_action_begin_locked; 315 grpc_closure write_action; 316 grpc_closure write_action_end_locked; 317 318 grpc_closure read_action_locked; 319 320 /** incoming read bytes */ 321 grpc_slice_buffer read_buffer; 322 323 /** address to place a newly accepted stream - set and unset by 324 grpc_chttp2_parsing_accept_stream; used by init_stream to 325 publish the accepted server stream */ 326 grpc_chttp2_stream** accepting_stream; 327 328 struct { 329 /* accept stream callback */ 330 void (*accept_stream)(void* user_data, grpc_transport* transport, 331 const void* server_data); 332 void* accept_stream_user_data; 333 334 /** connectivity tracking */ 335 grpc_connectivity_state_tracker state_tracker; 336 } channel_callback; 337 338 /** data to write now */ 339 grpc_slice_buffer outbuf; 340 /** hpack encoding */ 341 grpc_chttp2_hpack_compressor hpack_compressor; 342 /** is this a client? */ 343 bool is_client; 344 345 /** data to write next write */ 346 grpc_slice_buffer qbuf; 347 348 /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? 349 */ 350 uint32_t write_buffer_size; 351 352 /** Set to a grpc_error object if a goaway frame is received. By default, set 353 * to GRPC_ERROR_NONE */ 354 grpc_error* goaway_error; 355 356 grpc_chttp2_sent_goaway_state sent_goaway_state; 357 358 /** are the local settings dirty and need to be sent? */ 359 bool dirtied_local_settings; 360 /** have local settings been sent? */ 361 bool sent_local_settings; 362 /** bitmask of setting indexes to send out */ 363 uint32_t force_send_settings; 364 /** settings values */ 365 uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; 366 367 /** what is the next stream id to be allocated by this peer? 368 copied to next_stream_id in parsing when parsing commences */ 369 uint32_t next_stream_id; 370 371 /** last new stream id */ 372 uint32_t last_new_stream_id; 373 374 /** ping queues for various ping insertion points */ 375 grpc_chttp2_ping_queue ping_queue; 376 grpc_chttp2_repeated_ping_policy ping_policy; 377 grpc_chttp2_repeated_ping_state ping_state; 378 uint64_t ping_ctr; /* unique id for pings */ 379 grpc_closure retry_initiate_ping_locked; 380 381 /** ping acks */ 382 size_t ping_ack_count; 383 size_t ping_ack_capacity; 384 uint64_t* ping_acks; 385 grpc_chttp2_server_ping_recv_state ping_recv_state; 386 387 /** parser for headers */ 388 grpc_chttp2_hpack_parser hpack_parser; 389 /** simple one shot parsers */ 390 union { 391 grpc_chttp2_window_update_parser window_update; 392 grpc_chttp2_settings_parser settings; 393 grpc_chttp2_ping_parser ping; 394 grpc_chttp2_rst_stream_parser rst_stream; 395 } simple; 396 /** parser for goaway frames */ 397 grpc_chttp2_goaway_parser goaway_parser; 398 399 grpc_core::PolymorphicManualConstructor< 400 grpc_core::chttp2::TransportFlowControlBase, 401 grpc_core::chttp2::TransportFlowControl, 402 grpc_core::chttp2::TransportFlowControlDisabled> 403 flow_control; 404 /** initial window change. This is tracked as we parse settings frames from 405 * the remote peer. If there is a positive delta, then we will make all 406 * streams readable since they may have become unstalled */ 407 int64_t initial_window_update = 0; 408 409 /* deframing */ 410 grpc_chttp2_deframe_transport_state deframe_state; 411 uint8_t incoming_frame_type; 412 uint8_t incoming_frame_flags; 413 uint8_t header_eof; 414 bool is_first_frame; 415 uint32_t expect_continuation_stream_id; 416 uint32_t incoming_frame_size; 417 uint32_t incoming_stream_id; 418 419 /* active parser */ 420 void* parser_data; 421 grpc_chttp2_stream* incoming_stream; 422 grpc_error* (*parser)(void* parser_user_data, grpc_chttp2_transport* t, 423 grpc_chttp2_stream* s, grpc_slice slice, int is_last); 424 425 grpc_chttp2_write_cb* write_cb_pool; 426 427 /* bdp estimator */ 428 grpc_closure next_bdp_ping_timer_expired_locked; 429 grpc_closure start_bdp_ping_locked; 430 grpc_closure finish_bdp_ping_locked; 431 432 /* if non-NULL, close the transport with this error when writes are finished 433 */ 434 grpc_error* close_transport_on_writes_finished; 435 436 /* a list of closures to run after writes are finished */ 437 grpc_closure_list run_after_write; 438 439 /* buffer pool state */ 440 /** have we scheduled a benign cleanup? */ 441 bool benign_reclaimer_registered; 442 /** have we scheduled a destructive cleanup? */ 443 bool destructive_reclaimer_registered; 444 /** benign cleanup closure */ 445 grpc_closure benign_reclaimer_locked; 446 /** destructive cleanup closure */ 447 grpc_closure destructive_reclaimer_locked; 448 449 /* next bdp ping timer */ 450 bool have_next_bdp_ping_timer; 451 grpc_timer next_bdp_ping_timer; 452 453 /* keep-alive ping support */ 454 /** Closure to initialize a keepalive ping */ 455 grpc_closure init_keepalive_ping_locked; 456 /** Closure to run when the keepalive ping is sent */ 457 grpc_closure start_keepalive_ping_locked; 458 /** Cousure to run when the keepalive ping ack is received */ 459 grpc_closure finish_keepalive_ping_locked; 460 /** Closrue to run when the keepalive ping timeouts */ 461 grpc_closure keepalive_watchdog_fired_locked; 462 /** timer to initiate ping events */ 463 grpc_timer keepalive_ping_timer; 464 /** watchdog to kill the transport when waiting for the keepalive ping */ 465 grpc_timer keepalive_watchdog_timer; 466 /** time duration in between pings */ 467 grpc_millis keepalive_time; 468 /** grace period for a ping to complete before watchdog kicks in */ 469 grpc_millis keepalive_timeout; 470 /** if keepalive pings are allowed when there's no outstanding streams */ 471 bool keepalive_permit_without_calls; 472 /** keep-alive state machine state */ 473 grpc_chttp2_keepalive_state keepalive_state; 474 }; 475 476 typedef enum { 477 GRPC_METADATA_NOT_PUBLISHED, 478 GRPC_METADATA_SYNTHESIZED_FROM_FAKE, 479 GRPC_METADATA_PUBLISHED_FROM_WIRE, 480 GPRC_METADATA_PUBLISHED_AT_CLOSE 481 } grpc_published_metadata_method; 482 483 struct grpc_chttp2_stream { 484 grpc_chttp2_transport* t; 485 grpc_stream_refcount* refcount; 486 487 grpc_closure destroy_stream; 488 grpc_closure* destroy_stream_arg; 489 490 grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; 491 uint8_t included[STREAM_LIST_COUNT]; 492 493 /** HTTP2 stream id for this stream, or zero if one has not been assigned */ 494 uint32_t id; 495 496 /** things the upper layers would like to send */ 497 grpc_metadata_batch* send_initial_metadata; 498 grpc_closure* send_initial_metadata_finished; 499 grpc_metadata_batch* send_trailing_metadata; 500 grpc_closure* send_trailing_metadata_finished; 501 502 grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message; 503 uint32_t fetched_send_message_length; 504 grpc_slice fetching_slice; 505 int64_t next_message_end_offset; 506 int64_t flow_controlled_bytes_written; 507 int64_t flow_controlled_bytes_flowed; 508 grpc_closure complete_fetch_locked; 509 grpc_closure* fetching_send_message_finished; 510 511 grpc_metadata_batch* recv_initial_metadata; 512 grpc_closure* recv_initial_metadata_ready; 513 bool* trailing_metadata_available; 514 grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message; 515 grpc_closure* recv_message_ready; 516 grpc_metadata_batch* recv_trailing_metadata; 517 grpc_closure* recv_trailing_metadata_finished; 518 519 grpc_transport_stream_stats* collecting_stats; 520 grpc_transport_stream_stats stats; 521 522 /** Is this stream closed for writing. */ 523 bool write_closed; 524 /** Is this stream reading half-closed. */ 525 bool read_closed; 526 /** Are all published incoming byte streams closed. */ 527 bool all_incoming_byte_streams_finished; 528 /** Has this stream seen an error. 529 If true, then pending incoming frames can be thrown away. */ 530 bool seen_error; 531 /** Are we buffering writes on this stream? If yes, we won't become writable 532 until there's enough queued up in the flow_controlled_buffer */ 533 bool write_buffering; 534 /** Has trailing metadata been received. */ 535 bool received_trailing_metadata; 536 537 /** the error that resulted in this stream being read-closed */ 538 grpc_error* read_closed_error; 539 /** the error that resulted in this stream being write-closed */ 540 grpc_error* write_closed_error; 541 542 grpc_published_metadata_method published_metadata[2]; 543 bool final_metadata_requested; 544 545 grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; 546 547 grpc_slice_buffer frame_storage; /* protected by t combiner */ 548 549 /* Accessed only by transport thread when stream->pending_byte_stream == false 550 * Accessed only by application thread when stream->pending_byte_stream == 551 * true */ 552 grpc_slice_buffer unprocessed_incoming_frames_buffer; 553 grpc_closure* on_next; /* protected by t combiner */ 554 bool pending_byte_stream; /* protected by t combiner */ 555 // cached length of buffer to be used by the transport thread in cases where 556 // stream->pending_byte_stream == true. The value is saved before 557 // application threads are allowed to modify 558 // unprocessed_incoming_frames_buffer 559 size_t unprocessed_incoming_frames_buffer_cached_length; 560 grpc_closure reset_byte_stream; 561 grpc_error* byte_stream_error; /* protected by t combiner */ 562 bool received_last_frame; /* protected by t combiner */ 563 564 grpc_millis deadline; 565 566 /** saw some stream level error */ 567 grpc_error* forced_close_error; 568 /** how many header frames have we received? */ 569 uint8_t header_frames_received; 570 /** parsing state for data frames */ 571 /* Accessed only by transport thread when stream->pending_byte_stream == false 572 * Accessed only by application thread when stream->pending_byte_stream == 573 * true */ 574 grpc_chttp2_data_parser data_parser; 575 /** number of bytes received - reset at end of parse thread execution */ 576 int64_t received_bytes; 577 578 bool sent_initial_metadata; 579 bool sent_trailing_metadata; 580 581 grpc_core::PolymorphicManualConstructor< 582 grpc_core::chttp2::StreamFlowControlBase, 583 grpc_core::chttp2::StreamFlowControl, 584 grpc_core::chttp2::StreamFlowControlDisabled> 585 flow_control; 586 587 grpc_slice_buffer flow_controlled_buffer; 588 589 grpc_chttp2_write_cb* on_flow_controlled_cbs; 590 grpc_chttp2_write_cb* on_write_finished_cbs; 591 grpc_chttp2_write_cb* finish_after_write; 592 size_t sending_bytes; 593 594 /* Stream compression method to be used. */ 595 grpc_stream_compression_method stream_compression_method; 596 /* Stream decompression method to be used. */ 597 grpc_stream_compression_method stream_decompression_method; 598 /** Stream compression decompress context */ 599 grpc_stream_compression_context* stream_decompression_ctx; 600 /** Stream compression compress context */ 601 grpc_stream_compression_context* stream_compression_ctx; 602 603 /** Buffer storing data that is compressed but not sent */ 604 grpc_slice_buffer compressed_data_buffer; 605 /** Amount of uncompressed bytes sent out when compressed_data_buffer is 606 * emptied */ 607 size_t uncompressed_data_size; 608 /** Temporary buffer storing decompressed data */ 609 grpc_slice_buffer decompressed_data_buffer; 610 /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed 611 */ 612 bool unprocessed_incoming_frames_decompressed; 613 /** gRPC header bytes that are already decompressed */ 614 size_t decompressed_header_bytes; 615 }; 616 617 /** Transport writing call flow: 618 grpc_chttp2_initiate_write() is called anywhere that we know bytes need to 619 go out on the wire. 620 If no other write has been started, a task is enqueued onto our workqueue. 621 When that task executes, it obtains the global lock, and gathers the data 622 to write. 623 The global lock is dropped and we do the syscall to write. 624 After writing, a follow-up check is made to see if another round of writing 625 should be performed. 626 627 The actual call chain is documented in the implementation of this function. 628 */ 629 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, 630 grpc_chttp2_initiate_write_reason reason); 631 632 typedef struct { 633 /** are we writing? */ 634 bool writing; 635 /** if writing: was it a complete flush (false) or a partial flush (true) */ 636 bool partial; 637 /** did we queue any completions as part of beginning the write */ 638 bool early_results_scheduled; 639 } grpc_chttp2_begin_write_result; 640 641 grpc_chttp2_begin_write_result grpc_chttp2_begin_write( 642 grpc_chttp2_transport* t); 643 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error); 644 645 /** Process one slice of incoming data; return 1 if the connection is still 646 viable after reading, or 0 if the connection should be torn down */ 647 grpc_error* grpc_chttp2_perform_read(grpc_chttp2_transport* t, 648 grpc_slice slice); 649 650 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t, 651 grpc_chttp2_stream* s); 652 /** Get a writable stream 653 returns non-zero if there was a stream available */ 654 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t, 655 grpc_chttp2_stream** s); 656 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t, 657 grpc_chttp2_stream* s); 658 659 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t, 660 grpc_chttp2_stream* s); 661 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t); 662 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t, 663 grpc_chttp2_stream** s); 664 665 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t, 666 grpc_chttp2_stream* s); 667 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t, 668 grpc_chttp2_stream** s); 669 670 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t, 671 grpc_chttp2_stream* s); 672 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t, 673 grpc_chttp2_stream** s); 674 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t, 675 grpc_chttp2_stream* s); 676 677 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t, 678 grpc_chttp2_stream* s); 679 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t, 680 grpc_chttp2_stream** s); 681 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t, 682 grpc_chttp2_stream* s); 683 684 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t, 685 grpc_chttp2_stream* s); 686 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t, 687 grpc_chttp2_stream** s); 688 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t, 689 grpc_chttp2_stream* s); 690 691 /********* Flow Control ***************/ 692 693 // Takes in a flow control action and performs all the needed operations. 694 void grpc_chttp2_act_on_flowctl_action( 695 const grpc_core::chttp2::FlowControlAction& action, 696 grpc_chttp2_transport* t, grpc_chttp2_stream* s); 697 698 /********* End of Flow Control ***************/ 699 700 grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport* t, 701 uint32_t id); 702 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, 703 uint32_t id); 704 705 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, 706 uint32_t goaway_error, 707 grpc_slice goaway_text); 708 709 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t); 710 711 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, 712 grpc_chttp2_stream* s, 713 grpc_closure** pclosure, 714 grpc_error* error, const char* desc); 715 716 #define GRPC_HEADER_SIZE_IN_BYTES 5 717 #define MAX_SIZE_T (~(size_t)0) 718 719 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" 720 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ 721 (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) 722 723 // extern grpc_core::TraceFlag grpc_http_trace; 724 // extern grpc_core::TraceFlag grpc_flowctl_trace; 725 726 #define GRPC_CHTTP2_IF_TRACING(stmt) \ 727 if (!(grpc_http_trace.enabled())) \ 728 ; \ 729 else \ 730 stmt 731 732 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, 733 grpc_chttp2_stream* stream, grpc_error* error); 734 void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t, 735 grpc_chttp2_stream* s, int close_reads, 736 int close_writes, grpc_error* error); 737 void grpc_chttp2_start_writing(grpc_chttp2_transport* t); 738 739 #ifndef NDEBUG 740 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \ 741 grpc_chttp2_stream_ref(stream, reason) 742 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \ 743 grpc_chttp2_stream_unref(stream, reason) 744 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason); 745 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason); 746 #else 747 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream) 748 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \ 749 grpc_chttp2_stream_unref(stream) 750 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s); 751 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s); 752 #endif 753 754 #ifndef NDEBUG 755 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) \ 756 grpc_chttp2_ref_transport(t, r, __FILE__, __LINE__) 757 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) \ 758 grpc_chttp2_unref_transport(t, r, __FILE__, __LINE__) 759 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t, const char* reason, 760 const char* file, int line); 761 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t, const char* reason, 762 const char* file, int line); 763 #else 764 #define GRPC_CHTTP2_REF_TRANSPORT(t, r) grpc_chttp2_ref_transport(t) 765 #define GRPC_CHTTP2_UNREF_TRANSPORT(t, r) grpc_chttp2_unref_transport(t) 766 void grpc_chttp2_unref_transport(grpc_chttp2_transport* t); 767 void grpc_chttp2_ref_transport(grpc_chttp2_transport* t); 768 #endif 769 770 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id); 771 772 /** Add a new ping strike to ping_recv_state.ping_strikes. If 773 ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY 774 with error code ENHANCE_YOUR_CALM and additional debug data resembling 775 "too_many_pings" followed by immediately closing the connection. */ 776 void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t); 777 778 /** add a ref to the stream and add it to the writable list; 779 ref will be dropped in writing.c */ 780 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t, 781 grpc_chttp2_stream* s); 782 783 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s, 784 grpc_error* due_to_error); 785 786 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t, 787 grpc_chttp2_stream* s); 788 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, 789 grpc_chttp2_stream* s); 790 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, 791 grpc_chttp2_stream* s); 792 793 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, 794 grpc_chttp2_stream* s, grpc_error* error); 795 796 /** Set the default keepalive configurations, must only be called at 797 initialization */ 798 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, 799 bool is_client); 800 801 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ 802