1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
16
17 #include <grpc/support/port_platform.h>
18
19 #include <utility>
20
21 #include "absl/log/check.h"
22 #include "absl/meta/type_traits.h"
23 #include "absl/random/distributions.h"
24
25 namespace grpc_core {
26
OnPing(Callback on_start,Callback on_ack)27 void Chttp2PingCallbacks::OnPing(Callback on_start, Callback on_ack) {
28 on_start_.emplace_back(std::move(on_start));
29 on_ack_.emplace_back(std::move(on_ack));
30 ping_requested_ = true;
31 }
32
OnPingAck(Callback on_ack)33 void Chttp2PingCallbacks::OnPingAck(Callback on_ack) {
34 auto it = inflight_.find(most_recent_inflight_);
35 if (it != inflight_.end()) {
36 it->second.on_ack.emplace_back(std::move(on_ack));
37 return;
38 }
39 ping_requested_ = true;
40 on_ack_.emplace_back(std::move(on_ack));
41 }
42
StartPing(absl::BitGenRef bitgen)43 uint64_t Chttp2PingCallbacks::StartPing(absl::BitGenRef bitgen) {
44 uint64_t id;
45 do {
46 id = absl::Uniform<uint64_t>(bitgen);
47 } while (inflight_.contains(id));
48 CallbackVec cbs = std::move(on_start_);
49 CallbackVec().swap(on_start_);
50 InflightPing inflight;
51 inflight.on_ack.swap(on_ack_);
52 started_new_ping_without_setting_timeout_ = true;
53 inflight_.emplace(id, std::move(inflight));
54 most_recent_inflight_ = id;
55 ping_requested_ = false;
56 for (auto& cb : cbs) {
57 cb();
58 }
59 return id;
60 }
61
AckPing(uint64_t id,grpc_event_engine::experimental::EventEngine * event_engine)62 bool Chttp2PingCallbacks::AckPing(
63 uint64_t id, grpc_event_engine::experimental::EventEngine* event_engine) {
64 auto ping = inflight_.extract(id);
65 if (ping.empty()) return false;
66 if (ping.mapped().on_timeout !=
67 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
68 event_engine->Cancel(ping.mapped().on_timeout);
69 }
70 for (auto& cb : ping.mapped().on_ack) {
71 cb();
72 }
73 return true;
74 }
75
CancelAll(grpc_event_engine::experimental::EventEngine * event_engine)76 void Chttp2PingCallbacks::CancelAll(
77 grpc_event_engine::experimental::EventEngine* event_engine) {
78 CallbackVec().swap(on_start_);
79 CallbackVec().swap(on_ack_);
80 for (auto& cbs : inflight_) {
81 CallbackVec().swap(cbs.second.on_ack);
82 if (cbs.second.on_timeout !=
83 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
84 event_engine->Cancel(std::exchange(
85 cbs.second.on_timeout,
86 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
87 }
88 }
89 ping_requested_ = false;
90 }
91
OnPingTimeout(Duration ping_timeout,grpc_event_engine::experimental::EventEngine * event_engine,Callback callback)92 absl::optional<uint64_t> Chttp2PingCallbacks::OnPingTimeout(
93 Duration ping_timeout,
94 grpc_event_engine::experimental::EventEngine* event_engine,
95 Callback callback) {
96 CHECK(started_new_ping_without_setting_timeout_);
97 started_new_ping_without_setting_timeout_ = false;
98 auto it = inflight_.find(most_recent_inflight_);
99 if (it == inflight_.end()) return absl::nullopt;
100 it->second.on_timeout =
101 event_engine->RunAfter(ping_timeout, std::move(callback));
102 return most_recent_inflight_;
103 }
104
105 } // namespace grpc_core
106