• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <utility>
20 
21 #include "absl/log/check.h"
22 #include "absl/meta/type_traits.h"
23 #include "absl/random/distributions.h"
24 
25 namespace grpc_core {
26 
OnPing(Callback on_start,Callback on_ack)27 void Chttp2PingCallbacks::OnPing(Callback on_start, Callback on_ack) {
28   on_start_.emplace_back(std::move(on_start));
29   on_ack_.emplace_back(std::move(on_ack));
30   ping_requested_ = true;
31 }
32 
OnPingAck(Callback on_ack)33 void Chttp2PingCallbacks::OnPingAck(Callback on_ack) {
34   auto it = inflight_.find(most_recent_inflight_);
35   if (it != inflight_.end()) {
36     it->second.on_ack.emplace_back(std::move(on_ack));
37     return;
38   }
39   ping_requested_ = true;
40   on_ack_.emplace_back(std::move(on_ack));
41 }
42 
StartPing(absl::BitGenRef bitgen)43 uint64_t Chttp2PingCallbacks::StartPing(absl::BitGenRef bitgen) {
44   uint64_t id;
45   do {
46     id = absl::Uniform<uint64_t>(bitgen);
47   } while (inflight_.contains(id));
48   CallbackVec cbs = std::move(on_start_);
49   CallbackVec().swap(on_start_);
50   InflightPing inflight;
51   inflight.on_ack.swap(on_ack_);
52   started_new_ping_without_setting_timeout_ = true;
53   inflight_.emplace(id, std::move(inflight));
54   most_recent_inflight_ = id;
55   ping_requested_ = false;
56   for (auto& cb : cbs) {
57     cb();
58   }
59   return id;
60 }
61 
AckPing(uint64_t id,grpc_event_engine::experimental::EventEngine * event_engine)62 bool Chttp2PingCallbacks::AckPing(
63     uint64_t id, grpc_event_engine::experimental::EventEngine* event_engine) {
64   auto ping = inflight_.extract(id);
65   if (ping.empty()) return false;
66   if (ping.mapped().on_timeout !=
67       grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
68     event_engine->Cancel(ping.mapped().on_timeout);
69   }
70   for (auto& cb : ping.mapped().on_ack) {
71     cb();
72   }
73   return true;
74 }
75 
CancelAll(grpc_event_engine::experimental::EventEngine * event_engine)76 void Chttp2PingCallbacks::CancelAll(
77     grpc_event_engine::experimental::EventEngine* event_engine) {
78   CallbackVec().swap(on_start_);
79   CallbackVec().swap(on_ack_);
80   for (auto& cbs : inflight_) {
81     CallbackVec().swap(cbs.second.on_ack);
82     if (cbs.second.on_timeout !=
83         grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
84       event_engine->Cancel(std::exchange(
85           cbs.second.on_timeout,
86           grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
87     }
88   }
89   ping_requested_ = false;
90 }
91 
OnPingTimeout(Duration ping_timeout,grpc_event_engine::experimental::EventEngine * event_engine,Callback callback)92 absl::optional<uint64_t> Chttp2PingCallbacks::OnPingTimeout(
93     Duration ping_timeout,
94     grpc_event_engine::experimental::EventEngine* event_engine,
95     Callback callback) {
96   CHECK(started_new_ping_without_setting_timeout_);
97   started_new_ping_without_setting_timeout_ = false;
98   auto it = inflight_.find(most_recent_inflight_);
99   if (it == inflight_.end()) return absl::nullopt;
100   it->second.on_timeout =
101       event_engine->RunAfter(ping_timeout, std::move(callback));
102   return most_recent_inflight_;
103 }
104 
105 }  // namespace grpc_core
106