diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/chttp2_transport.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/chttp2_transport.cc --- grpc-1.41.1/src/core/ext/transport/chttp2/transport/chttp2_transport.cc 2021-10-20 04:14:40.000000000 +0800 +++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/chttp2_transport.cc 2024-07-31 17:54:56.767196600 +0800 @@ -35,6 +35,7 @@ #include "src/core/ext/transport/chttp2/transport/frame_data.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/varint.h" +#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/debug/stats.h" @@ -378,6 +379,9 @@ if (value >= 0) { queue_setting_update(t, settings_map[j].setting_id, static_cast(value)); + if (settings_map[j].setting_id == GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) { + t->max_concurrent_streams_policy.SetTarget(value); + } } } break; diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/internal.h third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/internal.h --- grpc-1.41.1/src/core/ext/transport/chttp2/transport/internal.h 2021-10-20 04:14:40.000000000 +0800 +++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/internal.h 2024-07-31 17:16:44.925139300 +0800 @@ -36,6 +36,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" +#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" @@ -387,6 +388,8 @@ uint64_t ping_ctr = 0; /* unique id for pings */ grpc_closure retry_initiate_ping_locked; + grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy; + /** ping acks */ size_t ping_ack_count = 0; size_t ping_ack_capacity = 0; @@ -482,6 +485,8 @@ /** grace period for a ping to complete before watchdog kicks in */ grpc_millis keepalive_timeout; /** if keepalive pings are allowed when there's no outstanding streams */ + /// number of stream objects currently allocated by this transport + std::atomic streams_allocated{0}; bool keepalive_permit_without_calls = false; /** If start_keepalive_ping_locked has been called */ bool keepalive_ping_started = false; diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc --- grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc 1970-01-01 08:00:00.000000000 +0800 +++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc 2024-07-31 17:16:44.925139300 +0800 @@ -0,0 +1,44 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" + +#include + +#include + +namespace grpc_core { + +void Chttp2MaxConcurrentStreamsPolicy::AddDemerit() { + ++new_demerits_; + ++unacked_demerits_; +} + +void Chttp2MaxConcurrentStreamsPolicy::FlushedSettings() { + sent_demerits_ += std::exchange(new_demerits_, 0); +} + +void Chttp2MaxConcurrentStreamsPolicy::AckLastSend() { + GPR_ASSERT(unacked_demerits_ >= sent_demerits_); + unacked_demerits_ -= std::exchange(sent_demerits_, 0); +} + +uint32_t Chttp2MaxConcurrentStreamsPolicy::AdvertiseValue() const { + if (target_ < unacked_demerits_) return 0; + return target_ - unacked_demerits_; +} + +} // namespace grpc_core diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h --- grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h 1970-01-01 08:00:00.000000000 +0800 +++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h 2024-07-31 17:16:44.925139300 +0800 @@ -0,0 +1,67 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H +#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H + +#include + +#include +#include + +namespace grpc_core { + +class Chttp2MaxConcurrentStreamsPolicy { + public: + // Set the target number of concurrent streams. + // If everything is idle we should advertise this number. + void SetTarget(uint32_t target) { target_ = target; } + + // Add one demerit to the current target. + // We need to do one full settings round trip after this to clear this + // demerit. + // It will reduce our advertised max concurrent streams by one. + void AddDemerit(); + + // Notify the policy that we've sent a settings frame. + // Newly added demerits since the last settings frame was sent will be cleared + // once that settings frame is acknowledged. + void FlushedSettings(); + + // Notify the policy that we've received an acknowledgement for the last + // settings frame we sent. + void AckLastSend(); + + // Returns what we should advertise as max concurrent streams. + uint32_t AdvertiseValue() const; + + private: + uint32_t target_ = std::numeric_limits::max(); + // Demerit flow: + // When we add a demerit, we add to both new & unacked. + // When we flush settings, we move new to sent. + // When we ack settings, we remove what we sent from unacked. + // eg: + // we add 10 demerits - now new=10, sent=0, unacked=10 + // we send settings - now new=0, sent=10, unacked=10 + // we add 5 demerits - now new=5, sent=10, unacked=15 + // we get the settings ack - now new=5, sent=0, unacked=5 + uint32_t new_demerits_ = 0; + uint32_t sent_demerits_ = 0; + uint32_t unacked_demerits_ = 0; +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/parsing.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/parsing.cc --- grpc-1.41.1/src/core/ext/transport/chttp2/transport/parsing.cc 2021-10-20 04:14:40.000000000 +0800 +++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/parsing.cc 2024-07-31 17:16:44.925139300 +0800 @@ -27,6 +27,7 @@ #include #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/slice/slice_utils.h" @@ -636,6 +637,17 @@ t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS])) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Max stream count exceeded"); + } else if (GPR_UNLIKELY( + t->streams_allocated.load(std::memory_order_relaxed) > + t->max_concurrent_streams_policy.AdvertiseValue())) { + // We have more streams allocated than we'd like, so apply some pushback + // by refusing this stream. + ++t->num_pending_induced_frames; + grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create( + t->incoming_stream_id, + GRPC_HTTP2_REFUSED_STREAM, nullptr)); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); + return init_header_skip_frame_parser(t, priority_type); } t->last_new_stream_id = t->incoming_stream_id; s = t->incoming_stream = @@ -756,6 +768,7 @@ return err; } if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { + t->max_concurrent_streams_policy.AckLastSend(); memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS], GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); t->hpack_parser.hpack_table()->SetMaxBytes( @@ -765,6 +778,9 @@ } t->parser = grpc_chttp2_settings_parser_parse; t->parser_data = &t->simple.settings; + if (!t->is_client) { + t->max_concurrent_streams_policy.AddDemerit(); + } return GRPC_ERROR_NONE; } diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/writing.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/writing.cc --- grpc-1.41.1/src/core/ext/transport/chttp2/transport/writing.cc 2021-10-20 04:14:40.000000000 +0800 +++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/writing.cc 2024-07-31 17:16:44.925139300 +0800 @@ -25,6 +25,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/ext/transport/chttp2/transport/context_list.h" #include "src/core/ext/transport/chttp2/transport/internal.h" +#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" @@ -217,7 +218,15 @@ } void FlushSettings() { - if (t_->dirtied_local_settings && !t_->sent_local_settings) { + const bool dirty = + t_->dirtied_local_settings || + t_->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] != + t_->max_concurrent_streams_policy.AdvertiseValue(); + if (dirty && !t_->sent_local_settings) { + t_->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = + t_->max_concurrent_streams_policy.AdvertiseValue(); grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_settings_create( t_->settings[GRPC_SENT_SETTINGS], @@ -226,6 +235,7 @@ t_->force_send_settings = false; t_->dirtied_local_settings = false; t_->sent_local_settings = true; + t_->max_concurrent_streams_policy.FlushedSettings(); GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(); } } diff -Naur grpc-1.41.1/src/python/grpcio/grpc_core_dependencies.py third_party_grpc_sxy/src/python/grpcio/grpc_core_dependencies.py --- grpc-1.41.1/src/python/grpcio/grpc_core_dependencies.py 2021-10-20 04:14:40.000000000 +0800 +++ third_party_grpc_sxy/src/python/grpcio/grpc_core_dependencies.py 2024-07-31 17:16:44.925139300 +0800 @@ -121,6 +121,7 @@ 'src/core/ext/transport/chttp2/transport/http2_settings.cc', 'src/core/ext/transport/chttp2/transport/huffsyms.cc', 'src/core/ext/transport/chttp2/transport/incoming_metadata.cc', + 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc', 'src/core/ext/transport/chttp2/transport/parsing.cc', 'src/core/ext/transport/chttp2/transport/stream_lists.cc', 'src/core/ext/transport/chttp2/transport/stream_map.cc',