1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
21
22 #include <grpc/event_engine/event_engine.h>
23 #include <grpc/event_engine/memory_allocator.h>
24 #include <grpc/grpc.h>
25 #include <grpc/slice.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpc/support/time.h>
28 #include <stddef.h>
29 #include <stdint.h>
30
31 #include <atomic>
32 #include <memory>
33 #include <utility>
34
35 #include "absl/container/flat_hash_map.h"
36 #include "absl/random/random.h"
37 #include "absl/status/status.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/types/optional.h"
40 #include "absl/types/variant.h"
41 #include "src/core/channelz/channelz.h"
42 #include "src/core/ext/transport/chttp2/transport/call_tracer_wrapper.h"
43 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
44 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
45 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
46 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
47 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
48 #include "src/core/ext/transport/chttp2/transport/frame_security.h"
49 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
50 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
51 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
52 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
53 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
54 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
55 #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
56 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
57 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
58 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
59 #include "src/core/lib/channel/channel_args.h"
60 #include "src/core/lib/debug/trace.h"
61 #include "src/core/lib/iomgr/closure.h"
62 #include "src/core/lib/iomgr/combiner.h"
63 #include "src/core/lib/iomgr/endpoint.h"
64 #include "src/core/lib/iomgr/error.h"
65 #include "src/core/lib/iomgr/iomgr_fwd.h"
66 #include "src/core/lib/resource_quota/arena.h"
67 #include "src/core/lib/resource_quota/memory_quota.h"
68 #include "src/core/lib/slice/slice.h"
69 #include "src/core/lib/slice/slice_buffer.h"
70 #include "src/core/lib/surface/init_internally.h"
71 #include "src/core/lib/transport/connectivity_state.h"
72 #include "src/core/lib/transport/metadata_batch.h"
73 #include "src/core/lib/transport/transport.h"
74 #include "src/core/lib/transport/transport_framing_endpoint_extension.h"
75 #include "src/core/telemetry/call_tracer.h"
76 #include "src/core/telemetry/tcp_tracer.h"
77 #include "src/core/util/bitset.h"
78 #include "src/core/util/debug_location.h"
79 #include "src/core/util/ref_counted.h"
80 #include "src/core/util/ref_counted_ptr.h"
81 #include "src/core/util/time.h"
82
83 // Flag that this closure barrier may be covering a write in a pollset, and so
84 // we should not complete this closure until we can prove that the write got
85 // scheduled
86 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
87 // First bit of the reference count, stored in the high order bits (with the low
88 // bits being used for flags defined above)
89 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
90
91 // streams are kept in various linked lists depending on what things need to
92 // happen to them... this enum labels each list
93 typedef enum {
94 // If a stream is in the following two lists, an explicit ref is associated
95 // with the stream
96 GRPC_CHTTP2_LIST_WRITABLE,
97 GRPC_CHTTP2_LIST_WRITING,
98 // No additional ref is taken for the following refs. Make sure to remove the
99 // stream from these lists when the stream is removed.
100 GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
101 GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
102 /// streams that are waiting to start because there are too many concurrent
103 /// streams on the connection
104 GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
105 STREAM_LIST_COUNT // must be last
106 } grpc_chttp2_stream_list_id;
107
108 typedef enum {
109 GRPC_CHTTP2_WRITE_STATE_IDLE,
110 GRPC_CHTTP2_WRITE_STATE_WRITING,
111 GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
112 } grpc_chttp2_write_state;
113
114 typedef enum {
115 GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
116 GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
117 } grpc_chttp2_optimization_target;
118
119 typedef enum {
120 GRPC_CHTTP2_PCL_INITIATE = 0,
121 GRPC_CHTTP2_PCL_NEXT,
122 GRPC_CHTTP2_PCL_INFLIGHT,
123 GRPC_CHTTP2_PCL_COUNT // must be last
124 } grpc_chttp2_ping_closure_list;
125
126 typedef enum {
127 GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
128 GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
129 GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
130 GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
131 GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
132 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
133 GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
134 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
135 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
136 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
137 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
138 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
139 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
140 GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK,
141 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
142 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
143 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
144 GRPC_CHTTP2_INITIATE_WRITE_BDP_PING,
145 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
146 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
147 GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
148 GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
149 } grpc_chttp2_initiate_write_reason;
150
151 const char* grpc_chttp2_initiate_write_reason_string(
152 grpc_chttp2_initiate_write_reason reason);
153
154 // deframer state for the overall http2 stream of bytes
155 typedef enum {
156 // prefix: one entry per http2 connection prefix byte
157 GRPC_DTS_CLIENT_PREFIX_0 = 0,
158 GRPC_DTS_CLIENT_PREFIX_1,
159 GRPC_DTS_CLIENT_PREFIX_2,
160 GRPC_DTS_CLIENT_PREFIX_3,
161 GRPC_DTS_CLIENT_PREFIX_4,
162 GRPC_DTS_CLIENT_PREFIX_5,
163 GRPC_DTS_CLIENT_PREFIX_6,
164 GRPC_DTS_CLIENT_PREFIX_7,
165 GRPC_DTS_CLIENT_PREFIX_8,
166 GRPC_DTS_CLIENT_PREFIX_9,
167 GRPC_DTS_CLIENT_PREFIX_10,
168 GRPC_DTS_CLIENT_PREFIX_11,
169 GRPC_DTS_CLIENT_PREFIX_12,
170 GRPC_DTS_CLIENT_PREFIX_13,
171 GRPC_DTS_CLIENT_PREFIX_14,
172 GRPC_DTS_CLIENT_PREFIX_15,
173 GRPC_DTS_CLIENT_PREFIX_16,
174 GRPC_DTS_CLIENT_PREFIX_17,
175 GRPC_DTS_CLIENT_PREFIX_18,
176 GRPC_DTS_CLIENT_PREFIX_19,
177 GRPC_DTS_CLIENT_PREFIX_20,
178 GRPC_DTS_CLIENT_PREFIX_21,
179 GRPC_DTS_CLIENT_PREFIX_22,
180 GRPC_DTS_CLIENT_PREFIX_23,
181 // frame header byte 0...
182 // must follow from the prefix states
183 GRPC_DTS_FH_0,
184 GRPC_DTS_FH_1,
185 GRPC_DTS_FH_2,
186 GRPC_DTS_FH_3,
187 GRPC_DTS_FH_4,
188 GRPC_DTS_FH_5,
189 GRPC_DTS_FH_6,
190 GRPC_DTS_FH_7,
191 // ... frame header byte 8
192 GRPC_DTS_FH_8,
193 // inside a http2 frame
194 GRPC_DTS_FRAME
195 } grpc_chttp2_deframe_transport_state;
196
197 struct grpc_chttp2_stream_list {
198 grpc_chttp2_stream* head;
199 grpc_chttp2_stream* tail;
200 };
201 struct grpc_chttp2_stream_link {
202 grpc_chttp2_stream* next;
203 grpc_chttp2_stream* prev;
204 };
205
206 typedef enum {
207 GRPC_CHTTP2_NO_GOAWAY_SEND,
208 GRPC_CHTTP2_GRACEFUL_GOAWAY,
209 GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED,
210 GRPC_CHTTP2_FINAL_GOAWAY_SENT,
211 } grpc_chttp2_sent_goaway_state;
212
213 typedef struct grpc_chttp2_write_cb {
214 int64_t call_at_byte;
215 grpc_closure* closure;
216 struct grpc_chttp2_write_cb* next;
217 } grpc_chttp2_write_cb;
218
219 typedef enum {
220 GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
221 GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
222 GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
223 GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
224 } grpc_chttp2_keepalive_state;
225
226 struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
227 public grpc_core::KeepsGrpcInitialized {
228 grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
229 grpc_core::OrphanablePtr<grpc_endpoint> endpoint,
230 bool is_client);
231 ~grpc_chttp2_transport() override;
232
233 void Orphan() override;
234
Reffinal235 grpc_core::RefCountedPtr<grpc_chttp2_transport> Ref() {
236 return grpc_core::FilterStackTransport::RefAsSubclass<
237 grpc_chttp2_transport>();
238 }
239
240 size_t SizeOfStream() const override;
241 bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override;
242 void PerformStreamOp(grpc_stream* gs,
243 grpc_transport_stream_op_batch* op) override;
244 void DestroyStream(grpc_stream* gs,
245 grpc_closure* then_schedule_closure) override;
246
filter_stack_transportfinal247 grpc_core::FilterStackTransport* filter_stack_transport() override {
248 return this;
249 }
client_transportfinal250 grpc_core::ClientTransport* client_transport() override { return nullptr; }
server_transportfinal251 grpc_core::ServerTransport* server_transport() override { return nullptr; }
252
253 absl::string_view GetTransportName() const override;
254 void InitStream(grpc_stream* gs, grpc_stream_refcount* refcount,
255 const void* server_data, grpc_core::Arena* arena) override;
256 void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override;
257 void SetPollsetSet(grpc_stream* stream,
258 grpc_pollset_set* pollset_set) override;
259 void PerformOp(grpc_transport_op* op) override;
260 // Callback for transport framing endpoint extension to send security frames
261 // received directly from the endpoint on wire.
262 void WriteSecurityFrame(grpc_core::SliceBuffer* data);
263 void WriteSecurityFrameLocked(grpc_core::SliceBuffer* data);
264
265 grpc_core::OrphanablePtr<grpc_endpoint> ep;
266 grpc_core::Mutex ep_destroy_mu; // Guards endpoint destruction only.
267
268 grpc_core::Slice peer_string;
269
270 grpc_core::TransportFramingEndpointExtension*
271 transport_framing_endpoint_extension = nullptr;
272
273 grpc_core::MemoryOwner memory_owner;
274 const grpc_core::MemoryAllocator::Reservation self_reservation;
275 grpc_core::ReclamationSweep active_reclamation;
276
277 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
278 grpc_core::Combiner* combiner;
279 absl::BitGen bitgen;
280
281 // On the client side, when the transport is first created, the
282 // endpoint will already have been added to this pollset_set, and it
283 // needs to stay there until the notify_on_receive_settings callback
284 // is invoked. After that, the polling will be coordinated via the
285 // bind_pollset_set transport op, sent by the subchannel when it
286 // starts a connectivity watch.
287 grpc_pollset_set* interested_parties_until_recv_settings = nullptr;
288
289 grpc_closure* notify_on_receive_settings = nullptr;
290 grpc_closure* notify_on_close = nullptr;
291
292 /// has the upper layer closed the transport?
293 grpc_error_handle closed_with_error;
294
295 /// various lists of streams
296 grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
297
298 /// maps stream id to grpc_chttp2_stream objects
299 absl::flat_hash_map<uint32_t, grpc_chttp2_stream*> stream_map;
300 // Count of streams that should be counted against max concurrent streams but
301 // are not in stream_map (due to tarpitting).
302 size_t extra_streams = 0;
303
304 class RemovedStreamHandle {
305 public:
306 RemovedStreamHandle() = default;
RemovedStreamHandlefinal307 explicit RemovedStreamHandle(
308 grpc_core::RefCountedPtr<grpc_chttp2_transport> t)
309 : transport_(std::move(t)) {
310 ++transport_->extra_streams;
311 }
~RemovedStreamHandlefinal312 ~RemovedStreamHandle() {
313 if (transport_ != nullptr) {
314 --transport_->extra_streams;
315 }
316 }
317 RemovedStreamHandle(const RemovedStreamHandle&) = delete;
318 RemovedStreamHandle& operator=(const RemovedStreamHandle&) = delete;
319 RemovedStreamHandle(RemovedStreamHandle&&) = default;
320 RemovedStreamHandle& operator=(RemovedStreamHandle&&) = default;
321
322 private:
323 grpc_core::RefCountedPtr<grpc_chttp2_transport> transport_;
324 };
325
326 grpc_closure write_action_begin_locked;
327 grpc_closure write_action_end_locked;
328
329 grpc_closure read_action_locked;
330
331 /// incoming read bytes
332 grpc_slice_buffer read_buffer;
333
334 /// address to place a newly accepted stream - set and unset by
335 /// grpc_chttp2_parsing_accept_stream; used by init_stream to
336 /// publish the accepted server stream
337 grpc_chttp2_stream** accepting_stream = nullptr;
338
339 // accept stream callback
340 void (*accept_stream_cb)(void* user_data, grpc_core::Transport* transport,
341 const void* server_data);
342 // registered_method_matcher_cb is called before invoking the recv initial
343 // metadata callback.
344 void (*registered_method_matcher_cb)(
345 void* user_data, grpc_core::ServerMetadata* metadata) = nullptr;
346 void* accept_stream_cb_user_data;
347
348 /// connectivity tracking
349 grpc_core::ConnectivityStateTracker state_tracker;
350
351 /// data to write now
352 grpc_core::SliceBuffer outbuf;
353 /// hpack encoding
354 grpc_core::HPackCompressor hpack_compressor;
355
356 /// data to write next write
357 grpc_slice_buffer qbuf;
358
359 size_t max_requests_per_read;
360
361 /// Set to a grpc_error object if a goaway frame is received. By default, set
362 /// to absl::OkStatus()
363 grpc_error_handle goaway_error;
364
365 grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
366
367 /// settings values
368 grpc_core::Http2SettingsManager settings;
369
370 grpc_event_engine::experimental::EventEngine::TaskHandle
371 settings_ack_watchdog =
372 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
373
374 /// what is the next stream id to be allocated by this peer?
375 /// copied to next_stream_id in parsing when parsing commences
376 uint32_t next_stream_id = 0;
377
378 /// last new stream id
379 uint32_t last_new_stream_id = 0;
380
381 /// Number of incoming streams allowed before a settings ACK is required
382 uint32_t num_incoming_streams_before_settings_ack = 0;
383
384 /// ping queues for various ping insertion points
385 grpc_core::Chttp2PingAbusePolicy ping_abuse_policy;
386 grpc_core::Chttp2PingRatePolicy ping_rate_policy;
387 grpc_core::Chttp2PingCallbacks ping_callbacks;
388 grpc_event_engine::experimental::EventEngine::TaskHandle
389 delayed_ping_timer_handle =
390 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
391 grpc_closure retry_initiate_ping_locked;
392
393 /// ping acks
394 size_t ping_ack_count = 0;
395 size_t ping_ack_capacity = 0;
396 uint64_t* ping_acks = nullptr;
397
398 /// parser for headers
399 grpc_core::HPackParser hpack_parser;
400 /// simple one shot parsers
401 union {
402 grpc_chttp2_window_update_parser window_update;
403 grpc_chttp2_settings_parser settings;
404 grpc_chttp2_ping_parser ping;
405 grpc_chttp2_rst_stream_parser rst_stream;
406 } simple;
407 /// parser for goaway frames
408 grpc_chttp2_goaway_parser goaway_parser;
409 // parser for secure frames
410 grpc_chttp2_security_frame_parser security_frame_parser;
411
412 grpc_core::chttp2::TransportFlowControl flow_control;
413 /// initial window change. This is tracked as we parse settings frames from
414 /// the remote peer. If there is a positive delta, then we will make all
415 /// streams readable since they may have become unstalled
416 int64_t initial_window_update = 0;
417
418 // deframing
419 grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
420 uint8_t incoming_frame_type = 0;
421 uint8_t incoming_frame_flags = 0;
422 uint8_t header_eof = 0;
423 bool is_first_frame = true;
424 uint32_t expect_continuation_stream_id = 0;
425 uint32_t incoming_frame_size = 0;
426
427 int min_tarpit_duration_ms;
428 int max_tarpit_duration_ms;
429 bool allow_tarpit;
430
431 grpc_chttp2_stream* incoming_stream = nullptr;
432 // active parser
433 struct Parser {
434 const char* name;
435 grpc_error_handle (*parser)(void* parser_user_data,
436 grpc_chttp2_transport* t, grpc_chttp2_stream* s,
437 const grpc_slice& slice, int is_last);
438 void* user_data = nullptr;
439 };
440 Parser parser;
441
442 grpc_chttp2_write_cb* write_cb_pool = nullptr;
443
444 // bdp estimator
445 grpc_closure next_bdp_ping_timer_expired_locked;
446 grpc_closure start_bdp_ping_locked;
447 grpc_closure finish_bdp_ping_locked;
448
449 // if non-NULL, close the transport with this error when writes are finished
450 grpc_error_handle close_transport_on_writes_finished;
451
452 // a list of closures to run after writes are finished
453 grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
454
455 // buffer pool state
456 /// benign cleanup closure
457 grpc_closure benign_reclaimer_locked;
458 /// destructive cleanup closure
459 grpc_closure destructive_reclaimer_locked;
460
461 // next bdp ping timer handle
462 grpc_event_engine::experimental::EventEngine::TaskHandle
463 next_bdp_ping_timer_handle =
464 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
465
466 // keep-alive ping support
467 /// Closure to initialize a keepalive ping
468 grpc_closure init_keepalive_ping_locked;
469 /// Closure to run when the keepalive ping ack is received
470 grpc_closure finish_keepalive_ping_locked;
471 /// timer to initiate ping events
472 grpc_event_engine::experimental::EventEngine::TaskHandle
473 keepalive_ping_timer_handle =
474 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
475 ;
476 /// time duration in between pings
477 grpc_core::Duration keepalive_time;
478 /// grace period to wait for data after sending a ping before keepalives
479 /// timeout
480 grpc_core::Duration keepalive_timeout;
481 /// number of stream objects currently allocated by this transport
482 std::atomic<size_t> streams_allocated{0};
483 /// keep-alive state machine state
484 grpc_chttp2_keepalive_state keepalive_state;
485 // Soft limit on max header size.
486 uint32_t max_header_list_size_soft_limit = 0;
487 grpc_core::ContextList* context_list = nullptr;
488 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
489 uint32_t num_messages_in_next_write = 0;
490 /// The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
491 /// RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
492 /// DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
493 /// only continue reading when we are able to write to the socket again,
494 /// thereby reducing the number of induced frames.
495 uint32_t num_pending_induced_frames = 0;
496 uint32_t incoming_stream_id = 0;
497
498 /// grace period after sending a ping to wait for the ping ack
499 grpc_core::Duration ping_timeout;
500 grpc_event_engine::experimental::EventEngine::TaskHandle
501 keepalive_ping_timeout_handle =
502 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
503 /// grace period before settings timeout expires
504 grpc_core::Duration settings_timeout;
505
506 /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
507 uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
508
509 /// write execution state of the transport
510 grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
511
512 /// policy for how much data we're willing to put into one http2 write
513 grpc_core::Chttp2WriteSizePolicy write_size_policy;
514
515 bool reading_paused_on_pending_induced_frames = false;
516 /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
517 /// the peer
518 bool enable_preferred_rx_crypto_frame_advertisement = false;
519 /// Set to non zero if closures associated with the transport may be
520 /// covering a write in a pollset. Such closures cannot be scheduled until
521 /// we can prove that the write got scheduled.
522 uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE;
523
524 /// have we scheduled a benign cleanup?
525 bool benign_reclaimer_registered = false;
526 /// have we scheduled a destructive cleanup?
527 bool destructive_reclaimer_registered = false;
528
529 /// if keepalive pings are allowed when there's no outstanding streams
530 bool keepalive_permit_without_calls = false;
531
532 // bdp estimator
533 bool bdp_ping_blocked =
534 false; // Is the BDP blocked due to not receiving any data?
535
536 /// is the transport destroying itself?
537 uint8_t destroying = false;
538
539 /// is this a client?
540 bool is_client;
541
542 /// If start_bdp_ping_locked has been called
543 bool bdp_ping_started = false;
544 // True if pings should be acked
545 bool ack_pings = true;
546 /// True if the keepalive system wants to see some data incoming
547 bool keepalive_incoming_data_wanted = false;
548 /// True if we count stream allocation (instead of HTTP2 concurrency) for
549 /// MAX_CONCURRENT_STREAMS
550 bool max_concurrent_streams_overload_protection = false;
551 bool max_concurrent_streams_reject_on_client = false;
552
553 // What percentage of rst_stream frames on the server should cause a ping
554 // frame to be generated.
555 uint8_t ping_on_rst_stream_percent;
556
557 // The last time a transport window update was received.
558 grpc_core::Timestamp last_window_update_time =
559 grpc_core::Timestamp::InfPast();
560
561 GPR_NO_UNIQUE_ADDRESS grpc_core::latent_see::Flow write_flow;
562 };
563
564 typedef enum {
565 GRPC_METADATA_NOT_PUBLISHED,
566 GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
567 GRPC_METADATA_PUBLISHED_FROM_WIRE,
568 GRPC_METADATA_PUBLISHED_AT_CLOSE
569 } grpc_published_metadata_method;
570
571 struct grpc_chttp2_stream {
572 grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
573 const void* server_data, grpc_core::Arena* arena);
574 ~grpc_chttp2_stream();
575
576 const grpc_core::RefCountedPtr<grpc_chttp2_transport> t;
577 grpc_stream_refcount* refcount;
578 grpc_core::Arena* const arena;
579
580 grpc_closure destroy_stream;
581 grpc_closure* destroy_stream_arg;
582
583 grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
584
585 /// HTTP2 stream id for this stream, or zero if one has not been assigned
586 uint32_t id = 0;
587
588 /// things the upper layers would like to send
589 grpc_metadata_batch* send_initial_metadata = nullptr;
590 grpc_closure* send_initial_metadata_finished = nullptr;
591 grpc_metadata_batch* send_trailing_metadata = nullptr;
592 // TODO(yashykt): Find a better name for the below field and others in this
593 // struct to betteer distinguish inputs, return values, and
594 // internal state.
595 // sent_trailing_metadata_op allows the transport to fill in to the upper
596 // layer whether this stream was able to send its trailing metadata (used for
597 // detecting cancellation on the server-side)..
598 bool* sent_trailing_metadata_op = nullptr;
599 grpc_closure* send_trailing_metadata_finished = nullptr;
600
601 int64_t next_message_end_offset;
602 int64_t flow_controlled_bytes_written = 0;
603 int64_t flow_controlled_bytes_flowed = 0;
604 grpc_closure* send_message_finished = nullptr;
605
606 grpc_metadata_batch* recv_initial_metadata;
607 grpc_closure* recv_initial_metadata_ready = nullptr;
608 bool* trailing_metadata_available = nullptr;
609 absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
610 uint32_t* recv_message_flags = nullptr;
611 bool* call_failed_before_recv_message = nullptr;
612 grpc_closure* recv_message_ready = nullptr;
613 grpc_metadata_batch* recv_trailing_metadata;
614 grpc_closure* recv_trailing_metadata_finished = nullptr;
615
616 grpc_transport_stream_stats* collecting_stats = nullptr;
617 grpc_transport_stream_stats stats = grpc_transport_stream_stats();
618
619 /// Is this stream closed for writing.
620 bool write_closed = false;
621 /// Is this stream reading half-closed.
622 bool read_closed = false;
623 /// Are all published incoming byte streams closed.
624 bool all_incoming_byte_streams_finished = false;
625 /// Has this stream seen an error.
626 /// If true, then pending incoming frames can be thrown away.
627 bool seen_error = false;
628 /// Are we buffering writes on this stream? If yes, we won't become writable
629 /// until there's enough queued up in the flow_controlled_buffer
630 bool write_buffering = false;
631
632 // have we sent or received the EOS bit?
633 bool eos_received = false;
634 bool eos_sent = false;
635
636 grpc_core::BitSet<STREAM_LIST_COUNT> included;
637
638 /// the error that resulted in this stream being read-closed
639 grpc_error_handle read_closed_error;
640 /// the error that resulted in this stream being write-closed
641 grpc_error_handle write_closed_error;
642
643 grpc_published_metadata_method published_metadata[2] = {};
644
645 grpc_metadata_batch initial_metadata_buffer;
646 grpc_metadata_batch trailing_metadata_buffer;
647
648 grpc_slice_buffer frame_storage; // protected by t combiner
649
650 grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
651
652 /// number of bytes received - reset at end of parse thread execution
653 int64_t received_bytes = 0;
654
655 grpc_core::chttp2::StreamFlowControl flow_control;
656
657 grpc_slice_buffer flow_controlled_buffer;
658
659 grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
660 grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
661 grpc_chttp2_write_cb* finish_after_write = nullptr;
662 size_t sending_bytes = 0;
663
664 /// Byte counter for number of bytes written
665 size_t byte_counter = 0;
666
667 /// Number of times written
668 int64_t write_counter = 0;
669
670 grpc_core::Chttp2CallTracerWrapper call_tracer_wrapper;
671
672 /// Only set when enabled.
673 // TODO(roth): Remove this when the call_tracer_in_transport
674 // experiment finishes rolling out.
675 grpc_core::CallTracerAnnotationInterface* call_tracer = nullptr;
676
677 /// Only set when enabled.
678 std::shared_ptr<grpc_core::TcpTracerInterface> tcp_tracer;
679
680 // time this stream was created
681 gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC);
682
683 bool parsed_trailers_only = false;
684
685 bool final_metadata_requested = false;
686 bool received_last_frame = false; // protected by t combiner
687
688 /// how many header frames have we received?
689 uint8_t header_frames_received = 0;
690
691 bool sent_initial_metadata = false;
692 bool sent_trailing_metadata = false;
693
694 /// Whether the bytes needs to be traced using Fathom
695 bool traced = false;
696
697 // The last time a stream window update was received.
698 grpc_core::Timestamp last_window_update_time =
699 grpc_core::Timestamp::InfPast();
700 };
701
702 #define GRPC_ARG_PING_TIMEOUT_MS "grpc.http2.ping_timeout_ms"
703
704 // EXPERIMENTAL: provide protection against overloading a server with too many
705 // requests: wait for streams to be deallocated before they stop counting
706 // against MAX_CONCURRENT_STREAMS
707 #define GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION \
708 "grpc.http.overload_protection"
709
710 // EXPERIMENTAL: Fail requests at the client if the client is over max
711 // concurrent streams, so they may be retried elsewhere.
712 #define GRPC_ARG_MAX_CONCURRENT_STREAMS_REJECT_ON_CLIENT \
713 "grpc.http.max_concurrent_streams_reject_on_client"
714
715 /// Transport writing call flow:
716 /// grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
717 /// go out on the wire.
718 /// If no other write has been started, a task is enqueued onto our workqueue.
719 /// When that task executes, it obtains the global lock, and gathers the data
720 /// to write.
721 /// The global lock is dropped and we do the syscall to write.
722 /// After writing, a follow-up check is made to see if another round of writing
723 /// should be performed.
724
725 /// The actual call chain is documented in the implementation of this function.
726 ///
727 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
728 grpc_chttp2_initiate_write_reason reason);
729
730 struct grpc_chttp2_begin_write_result {
731 /// are we writing?
732 bool writing;
733 /// if writing: was it a complete flush (false) or a partial flush (true)
734 bool partial;
735 /// did we queue any completions as part of beginning the write
736 bool early_results_scheduled;
737 };
738 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
739 grpc_chttp2_transport* t);
740 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error);
741
742 /// Process one slice of incoming data
743 /// Returns:
744 /// - a count of parsed bytes in the event of a partial read: the caller should
745 /// offload responsibilities to another thread to continue parsing.
746 /// - or a status in the case of a completed read
747 absl::variant<size_t, absl::Status> grpc_chttp2_perform_read(
748 grpc_chttp2_transport* t, const grpc_slice& slice,
749 size_t& requests_started);
750
751 //******** Flow Control **************
752
753 // Takes in a flow control action and performs all the needed operations.
754 void grpc_chttp2_act_on_flowctl_action(
755 const grpc_core::chttp2::FlowControlAction& action,
756 grpc_chttp2_transport* t, grpc_chttp2_stream* s);
757
758 //******** End of Flow Control **************
759
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)760 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
761 grpc_chttp2_transport* t, uint32_t id) {
762 auto it = t->stream_map.find(id);
763 if (it == t->stream_map.end()) return nullptr;
764 return it->second;
765 }
766 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
767 uint32_t id);
768
769 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
770 uint32_t goaway_error,
771 uint32_t last_stream_id,
772 absl::string_view goaway_text);
773
774 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
775
776 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
777 grpc_closure** pclosure,
778 grpc_error_handle error,
779 const char* desc,
780 grpc_core::DebugLocation whence = {});
781
782 void grpc_chttp2_keepalive_timeout(
783 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
784 void grpc_chttp2_ping_timeout(
785 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
786
787 void grpc_chttp2_settings_timeout(
788 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
789
790 #define GRPC_HEADER_SIZE_IN_BYTES 5
791 #define MAX_SIZE_T (~(size_t)0)
792
793 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
794 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
795 (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
796
797 #define GRPC_CHTTP2_IF_TRACING(severity) \
798 LOG_IF(severity, GRPC_TRACE_FLAG_ENABLED(http))
799
800 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
801 grpc_chttp2_stream* stream,
802 grpc_error_handle error);
803 grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
804 grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
805 int close_writes, grpc_error_handle error);
806 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
807
808 #ifndef NDEBUG
809 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
810 grpc_chttp2_stream_ref(stream, reason)
811 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
812 grpc_chttp2_stream_unref(stream, reason)
813 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
814 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
815 #else
816 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
817 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
818 grpc_chttp2_stream_unref(stream)
819 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
820 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
821 #endif
822
823 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
824
825 /// Sends GOAWAY with error code ENHANCE_YOUR_CALM and additional debug data
826 /// resembling "too_many_pings" followed by immediately closing the connection.
827 void grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport* t);
828
829 /// Resets ping clock. Should be called when flushing window updates,
830 /// initial/trailing metadata or data frames. For a server, it resets the number
831 /// of ping strikes and the last_ping_recv_time. For a ping sender, it resets
832 /// pings_before_data_required.
833 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
834
835 /// add a ref to the stream and add it to the writable list;
836 /// ref will be dropped in writing.c
837 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
838 grpc_chttp2_stream* s);
839
840 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
841 grpc_error_handle due_to_error, bool tarpit);
842
843 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
844 grpc_chttp2_stream* s);
845 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
846 grpc_chttp2_stream* s);
847 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
848 grpc_chttp2_stream* s);
849
850 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
851 grpc_chttp2_stream* s,
852 grpc_error_handle error);
853
854 /// Set the default keepalive configurations, must only be called at
855 /// initialization
856 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
857 bool is_client);
858 void grpc_chttp2_config_default_keepalive_args(
859 const grpc_core::ChannelArgs& channel_args, bool is_client);
860
861 void grpc_chttp2_retry_initiate_ping(
862 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
863
864 void schedule_bdp_ping_locked(
865 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
866
867 uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t);
868
869 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
870