1 // Copyright 2022 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/io/quic_poll_event_loop.h"
6
7 #include <poll.h>
8
9 #include <algorithm>
10 #include <cerrno>
11 #include <memory>
12
13 #include "absl/types/span.h"
14 #include "quiche/quic/core/io/quic_event_loop.h"
15 #include "quiche/quic/core/quic_alarm.h"
16 #include "quiche/quic/core/quic_time.h"
17 #include "quiche/quic/platform/api/quic_bug_tracker.h"
18
19 namespace quic {
20
21 namespace {
22
23 using PollMask = decltype(::pollfd().events);
24
GetPollMask(QuicSocketEventMask event_mask)25 PollMask GetPollMask(QuicSocketEventMask event_mask) {
26 return ((event_mask & kSocketEventReadable) ? POLLIN : 0) |
27 ((event_mask & kSocketEventWritable) ? POLLOUT : 0) |
28 ((event_mask & kSocketEventError) ? POLLERR : 0);
29 }
30
GetEventMask(PollMask poll_mask)31 QuicSocketEventMask GetEventMask(PollMask poll_mask) {
32 return ((poll_mask & POLLIN) ? kSocketEventReadable : 0) |
33 ((poll_mask & POLLOUT) ? kSocketEventWritable : 0) |
34 ((poll_mask & POLLERR) ? kSocketEventError : 0);
35 }
36
37 } // namespace
38
QuicPollEventLoop(QuicClock * clock)39 QuicPollEventLoop::QuicPollEventLoop(QuicClock* clock) : clock_(clock) {}
40
RegisterSocket(QuicUdpSocketFd fd,QuicSocketEventMask events,QuicSocketEventListener * listener)41 bool QuicPollEventLoop::RegisterSocket(QuicUdpSocketFd fd,
42 QuicSocketEventMask events,
43 QuicSocketEventListener* listener) {
44 auto [it, success] =
45 registrations_.insert({fd, std::make_shared<Registration>()});
46 if (!success) {
47 return false;
48 }
49 Registration& registration = *it->second;
50 registration.events = events;
51 registration.listener = listener;
52 return true;
53 }
54
UnregisterSocket(QuicUdpSocketFd fd)55 bool QuicPollEventLoop::UnregisterSocket(QuicUdpSocketFd fd) {
56 return registrations_.erase(fd);
57 }
58
RearmSocket(QuicUdpSocketFd fd,QuicSocketEventMask events)59 bool QuicPollEventLoop::RearmSocket(QuicUdpSocketFd fd,
60 QuicSocketEventMask events) {
61 auto it = registrations_.find(fd);
62 if (it == registrations_.end()) {
63 return false;
64 }
65 it->second->events |= events;
66 return true;
67 }
68
ArtificiallyNotifyEvent(QuicUdpSocketFd fd,QuicSocketEventMask events)69 bool QuicPollEventLoop::ArtificiallyNotifyEvent(QuicUdpSocketFd fd,
70 QuicSocketEventMask events) {
71 auto it = registrations_.find(fd);
72 if (it == registrations_.end()) {
73 return false;
74 }
75 has_artificial_events_pending_ = true;
76 it->second->artificially_notify_at_next_iteration |= events;
77 return true;
78 }
79
RunEventLoopOnce(QuicTime::Delta default_timeout)80 void QuicPollEventLoop::RunEventLoopOnce(QuicTime::Delta default_timeout) {
81 const QuicTime start_time = clock_->Now();
82 ProcessAlarmsUpTo(start_time);
83
84 QuicTime::Delta timeout = ComputePollTimeout(start_time, default_timeout);
85 ProcessIoEvents(start_time, timeout);
86
87 const QuicTime end_time = clock_->Now();
88 ProcessAlarmsUpTo(end_time);
89 }
90
ComputePollTimeout(QuicTime now,QuicTime::Delta default_timeout) const91 QuicTime::Delta QuicPollEventLoop::ComputePollTimeout(
92 QuicTime now, QuicTime::Delta default_timeout) const {
93 default_timeout = std::max(default_timeout, QuicTime::Delta::Zero());
94 if (has_artificial_events_pending_) {
95 return QuicTime::Delta::Zero();
96 }
97 if (alarms_.empty()) {
98 return default_timeout;
99 }
100 QuicTime end_time = std::min(now + default_timeout, alarms_.begin()->first);
101 if (end_time < now) {
102 // We only run a single pass of processing alarm callbacks per
103 // RunEventLoopOnce() call. If an alarm schedules another alarm in the past
104 // while in the callback, this will happen.
105 return QuicTime::Delta::Zero();
106 }
107 return end_time - now;
108 }
109
PollWithRetries(absl::Span<pollfd> fds,QuicTime start_time,QuicTime::Delta timeout)110 int QuicPollEventLoop::PollWithRetries(absl::Span<pollfd> fds,
111 QuicTime start_time,
112 QuicTime::Delta timeout) {
113 const QuicTime timeout_at = start_time + timeout;
114 int poll_result;
115 for (;;) {
116 float timeout_ms = std::ceil(timeout.ToMicroseconds() / 1000.f);
117 poll_result =
118 PollSyscall(fds.data(), fds.size(), static_cast<int>(timeout_ms));
119
120 // Retry if EINTR happens.
121 bool is_eintr = poll_result < 0 && errno == EINTR;
122 if (!is_eintr) {
123 break;
124 }
125 QuicTime now = clock_->Now();
126 if (now >= timeout_at) {
127 break;
128 }
129 timeout = timeout_at - now;
130 }
131 return poll_result;
132 }
133
ProcessIoEvents(QuicTime start_time,QuicTime::Delta timeout)134 void QuicPollEventLoop::ProcessIoEvents(QuicTime start_time,
135 QuicTime::Delta timeout) {
136 // Set up the pollfd[] array.
137 const size_t registration_count = registrations_.size();
138 auto pollfds = std::make_unique<pollfd[]>(registration_count);
139 size_t i = 0;
140 for (auto& [fd, registration] : registrations_) {
141 QUICHE_CHECK_LT(
142 i, registration_count); // Crash instead of out-of-bounds access.
143 pollfds[i].fd = fd;
144 pollfds[i].events = GetPollMask(registration->events);
145 pollfds[i].revents = 0;
146 ++i;
147 }
148
149 // Actually run poll(2).
150 int poll_result =
151 PollWithRetries(absl::Span<pollfd>(pollfds.get(), registration_count),
152 start_time, timeout);
153 if (poll_result == 0 && !has_artificial_events_pending_) {
154 return;
155 }
156
157 // Prepare the list of all callbacks to be called, while resetting all events,
158 // since we're operating in the level-triggered mode.
159 std::vector<ReadyListEntry> ready_list;
160 ready_list.reserve(registration_count);
161 for (i = 0; i < registration_count; i++) {
162 DispatchIoEvent(ready_list, pollfds[i].fd, pollfds[i].revents);
163 }
164 has_artificial_events_pending_ = false;
165
166 // Actually call all of the callbacks.
167 RunReadyCallbacks(ready_list);
168 }
169
DispatchIoEvent(std::vector<ReadyListEntry> & ready_list,QuicUdpSocketFd fd,PollMask mask)170 void QuicPollEventLoop::DispatchIoEvent(std::vector<ReadyListEntry>& ready_list,
171 QuicUdpSocketFd fd, PollMask mask) {
172 auto it = registrations_.find(fd);
173 if (it == registrations_.end()) {
174 QUIC_BUG(poll returned an unregistered fd) << fd;
175 return;
176 }
177 Registration& registration = *it->second;
178
179 mask |= GetPollMask(registration.artificially_notify_at_next_iteration);
180 registration.artificially_notify_at_next_iteration = QuicSocketEventMask();
181
182 // poll() always returns certain classes of events even if not requested.
183 mask &= GetPollMask(registration.events);
184 if (!mask) {
185 return;
186 }
187
188 ready_list.push_back(ReadyListEntry{fd, it->second, GetEventMask(mask)});
189 registration.events &= ~GetEventMask(mask);
190 }
191
RunReadyCallbacks(std::vector<ReadyListEntry> & ready_list)192 void QuicPollEventLoop::RunReadyCallbacks(
193 std::vector<ReadyListEntry>& ready_list) {
194 for (ReadyListEntry& entry : ready_list) {
195 std::shared_ptr<Registration> registration = entry.registration.lock();
196 if (!registration) {
197 // The socket has been unregistered from within one of the callbacks.
198 continue;
199 }
200 registration->listener->OnSocketEvent(this, entry.fd, entry.events);
201 }
202 ready_list.clear();
203 }
204
ProcessAlarmsUpTo(QuicTime time)205 void QuicPollEventLoop::ProcessAlarmsUpTo(QuicTime time) {
206 // Determine which alarm callbacks needs to be run.
207 std::vector<std::weak_ptr<Alarm*>> alarms_to_call;
208 while (!alarms_.empty() && alarms_.begin()->first <= time) {
209 auto& [deadline, schedule_handle_weak] = *alarms_.begin();
210 alarms_to_call.push_back(std::move(schedule_handle_weak));
211 alarms_.erase(alarms_.begin());
212 }
213 // Actually run those callbacks.
214 for (std::weak_ptr<Alarm*>& schedule_handle_weak : alarms_to_call) {
215 std::shared_ptr<Alarm*> schedule_handle = schedule_handle_weak.lock();
216 if (!schedule_handle) {
217 // The alarm has been cancelled and might not even exist anymore.
218 continue;
219 }
220 (*schedule_handle)->DoFire();
221 }
222 // Clean up all of the alarms in the front that have been cancelled.
223 while (!alarms_.empty()) {
224 if (alarms_.begin()->second.expired()) {
225 alarms_.erase(alarms_.begin());
226 } else {
227 break;
228 }
229 }
230 }
231
CreateAlarm(QuicAlarm::Delegate * delegate)232 QuicAlarm* QuicPollEventLoop::AlarmFactory::CreateAlarm(
233 QuicAlarm::Delegate* delegate) {
234 return new Alarm(loop_, QuicArenaScopedPtr<QuicAlarm::Delegate>(delegate));
235 }
236
CreateAlarm(QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,QuicConnectionArena * arena)237 QuicArenaScopedPtr<QuicAlarm> QuicPollEventLoop::AlarmFactory::CreateAlarm(
238 QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,
239 QuicConnectionArena* arena) {
240 if (arena != nullptr) {
241 return arena->New<Alarm>(loop_, std::move(delegate));
242 }
243 return QuicArenaScopedPtr<QuicAlarm>(new Alarm(loop_, std::move(delegate)));
244 }
245
Alarm(QuicPollEventLoop * loop,QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)246 QuicPollEventLoop::Alarm::Alarm(
247 QuicPollEventLoop* loop, QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)
248 : QuicAlarm(std::move(delegate)), loop_(loop) {}
249
SetImpl()250 void QuicPollEventLoop::Alarm::SetImpl() {
251 current_schedule_handle_ = std::make_shared<Alarm*>(this);
252 loop_->alarms_.insert({deadline(), current_schedule_handle_});
253 }
254
CancelImpl()255 void QuicPollEventLoop::Alarm::CancelImpl() {
256 current_schedule_handle_.reset();
257 }
258
CreateAlarmFactory()259 std::unique_ptr<QuicAlarmFactory> QuicPollEventLoop::CreateAlarmFactory() {
260 return std::make_unique<AlarmFactory>(this);
261 }
262
263 } // namespace quic
264