• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/io/quic_poll_event_loop.h"
6 
7 #include <poll.h>
8 
9 #include <algorithm>
10 #include <cerrno>
11 #include <memory>
12 
13 #include "absl/types/span.h"
14 #include "quiche/quic/core/io/quic_event_loop.h"
15 #include "quiche/quic/core/quic_alarm.h"
16 #include "quiche/quic/core/quic_time.h"
17 #include "quiche/quic/platform/api/quic_bug_tracker.h"
18 
19 namespace quic {
20 
21 namespace {
22 
23 using PollMask = decltype(::pollfd().events);
24 
GetPollMask(QuicSocketEventMask event_mask)25 PollMask GetPollMask(QuicSocketEventMask event_mask) {
26   return ((event_mask & kSocketEventReadable) ? POLLIN : 0) |
27          ((event_mask & kSocketEventWritable) ? POLLOUT : 0) |
28          ((event_mask & kSocketEventError) ? POLLERR : 0);
29 }
30 
GetEventMask(PollMask poll_mask)31 QuicSocketEventMask GetEventMask(PollMask poll_mask) {
32   return ((poll_mask & POLLIN) ? kSocketEventReadable : 0) |
33          ((poll_mask & POLLOUT) ? kSocketEventWritable : 0) |
34          ((poll_mask & POLLERR) ? kSocketEventError : 0);
35 }
36 
37 }  // namespace
38 
QuicPollEventLoop(QuicClock * clock)39 QuicPollEventLoop::QuicPollEventLoop(QuicClock* clock) : clock_(clock) {}
40 
RegisterSocket(QuicUdpSocketFd fd,QuicSocketEventMask events,QuicSocketEventListener * listener)41 bool QuicPollEventLoop::RegisterSocket(QuicUdpSocketFd fd,
42                                        QuicSocketEventMask events,
43                                        QuicSocketEventListener* listener) {
44   auto [it, success] =
45       registrations_.insert({fd, std::make_shared<Registration>()});
46   if (!success) {
47     return false;
48   }
49   Registration& registration = *it->second;
50   registration.events = events;
51   registration.listener = listener;
52   return true;
53 }
54 
UnregisterSocket(QuicUdpSocketFd fd)55 bool QuicPollEventLoop::UnregisterSocket(QuicUdpSocketFd fd) {
56   return registrations_.erase(fd);
57 }
58 
RearmSocket(QuicUdpSocketFd fd,QuicSocketEventMask events)59 bool QuicPollEventLoop::RearmSocket(QuicUdpSocketFd fd,
60                                     QuicSocketEventMask events) {
61   auto it = registrations_.find(fd);
62   if (it == registrations_.end()) {
63     return false;
64   }
65   it->second->events |= events;
66   return true;
67 }
68 
ArtificiallyNotifyEvent(QuicUdpSocketFd fd,QuicSocketEventMask events)69 bool QuicPollEventLoop::ArtificiallyNotifyEvent(QuicUdpSocketFd fd,
70                                                 QuicSocketEventMask events) {
71   auto it = registrations_.find(fd);
72   if (it == registrations_.end()) {
73     return false;
74   }
75   has_artificial_events_pending_ = true;
76   it->second->artificially_notify_at_next_iteration |= events;
77   return true;
78 }
79 
RunEventLoopOnce(QuicTime::Delta default_timeout)80 void QuicPollEventLoop::RunEventLoopOnce(QuicTime::Delta default_timeout) {
81   const QuicTime start_time = clock_->Now();
82   ProcessAlarmsUpTo(start_time);
83 
84   QuicTime::Delta timeout = ComputePollTimeout(start_time, default_timeout);
85   ProcessIoEvents(start_time, timeout);
86 
87   const QuicTime end_time = clock_->Now();
88   ProcessAlarmsUpTo(end_time);
89 }
90 
ComputePollTimeout(QuicTime now,QuicTime::Delta default_timeout) const91 QuicTime::Delta QuicPollEventLoop::ComputePollTimeout(
92     QuicTime now, QuicTime::Delta default_timeout) const {
93   default_timeout = std::max(default_timeout, QuicTime::Delta::Zero());
94   if (has_artificial_events_pending_) {
95     return QuicTime::Delta::Zero();
96   }
97   if (alarms_.empty()) {
98     return default_timeout;
99   }
100   QuicTime end_time = std::min(now + default_timeout, alarms_.begin()->first);
101   if (end_time < now) {
102     // We only run a single pass of processing alarm callbacks per
103     // RunEventLoopOnce() call.  If an alarm schedules another alarm in the past
104     // while in the callback, this will happen.
105     return QuicTime::Delta::Zero();
106   }
107   return end_time - now;
108 }
109 
PollWithRetries(absl::Span<pollfd> fds,QuicTime start_time,QuicTime::Delta timeout)110 int QuicPollEventLoop::PollWithRetries(absl::Span<pollfd> fds,
111                                        QuicTime start_time,
112                                        QuicTime::Delta timeout) {
113   const QuicTime timeout_at = start_time + timeout;
114   int poll_result;
115   for (;;) {
116     float timeout_ms = std::ceil(timeout.ToMicroseconds() / 1000.f);
117     poll_result =
118         PollSyscall(fds.data(), fds.size(), static_cast<int>(timeout_ms));
119 
120     // Retry if EINTR happens.
121     bool is_eintr = poll_result < 0 && errno == EINTR;
122     if (!is_eintr) {
123       break;
124     }
125     QuicTime now = clock_->Now();
126     if (now >= timeout_at) {
127       break;
128     }
129     timeout = timeout_at - now;
130   }
131   return poll_result;
132 }
133 
ProcessIoEvents(QuicTime start_time,QuicTime::Delta timeout)134 void QuicPollEventLoop::ProcessIoEvents(QuicTime start_time,
135                                         QuicTime::Delta timeout) {
136   // Set up the pollfd[] array.
137   const size_t registration_count = registrations_.size();
138   auto pollfds = std::make_unique<pollfd[]>(registration_count);
139   size_t i = 0;
140   for (auto& [fd, registration] : registrations_) {
141     QUICHE_CHECK_LT(
142         i, registration_count);  // Crash instead of out-of-bounds access.
143     pollfds[i].fd = fd;
144     pollfds[i].events = GetPollMask(registration->events);
145     pollfds[i].revents = 0;
146     ++i;
147   }
148 
149   // Actually run poll(2).
150   int poll_result =
151       PollWithRetries(absl::Span<pollfd>(pollfds.get(), registration_count),
152                       start_time, timeout);
153   if (poll_result == 0 && !has_artificial_events_pending_) {
154     return;
155   }
156 
157   // Prepare the list of all callbacks to be called, while resetting all events,
158   // since we're operating in the level-triggered mode.
159   std::vector<ReadyListEntry> ready_list;
160   ready_list.reserve(registration_count);
161   for (i = 0; i < registration_count; i++) {
162     DispatchIoEvent(ready_list, pollfds[i].fd, pollfds[i].revents);
163   }
164   has_artificial_events_pending_ = false;
165 
166   // Actually call all of the callbacks.
167   RunReadyCallbacks(ready_list);
168 }
169 
DispatchIoEvent(std::vector<ReadyListEntry> & ready_list,QuicUdpSocketFd fd,PollMask mask)170 void QuicPollEventLoop::DispatchIoEvent(std::vector<ReadyListEntry>& ready_list,
171                                         QuicUdpSocketFd fd, PollMask mask) {
172   auto it = registrations_.find(fd);
173   if (it == registrations_.end()) {
174     QUIC_BUG(poll returned an unregistered fd) << fd;
175     return;
176   }
177   Registration& registration = *it->second;
178 
179   mask |= GetPollMask(registration.artificially_notify_at_next_iteration);
180   registration.artificially_notify_at_next_iteration = QuicSocketEventMask();
181 
182   // poll() always returns certain classes of events even if not requested.
183   mask &= GetPollMask(registration.events);
184   if (!mask) {
185     return;
186   }
187 
188   ready_list.push_back(ReadyListEntry{fd, it->second, GetEventMask(mask)});
189   registration.events &= ~GetEventMask(mask);
190 }
191 
RunReadyCallbacks(std::vector<ReadyListEntry> & ready_list)192 void QuicPollEventLoop::RunReadyCallbacks(
193     std::vector<ReadyListEntry>& ready_list) {
194   for (ReadyListEntry& entry : ready_list) {
195     std::shared_ptr<Registration> registration = entry.registration.lock();
196     if (!registration) {
197       // The socket has been unregistered from within one of the callbacks.
198       continue;
199     }
200     registration->listener->OnSocketEvent(this, entry.fd, entry.events);
201   }
202   ready_list.clear();
203 }
204 
ProcessAlarmsUpTo(QuicTime time)205 void QuicPollEventLoop::ProcessAlarmsUpTo(QuicTime time) {
206   // Determine which alarm callbacks needs to be run.
207   std::vector<std::weak_ptr<Alarm*>> alarms_to_call;
208   while (!alarms_.empty() && alarms_.begin()->first <= time) {
209     auto& [deadline, schedule_handle_weak] = *alarms_.begin();
210     alarms_to_call.push_back(std::move(schedule_handle_weak));
211     alarms_.erase(alarms_.begin());
212   }
213   // Actually run those callbacks.
214   for (std::weak_ptr<Alarm*>& schedule_handle_weak : alarms_to_call) {
215     std::shared_ptr<Alarm*> schedule_handle = schedule_handle_weak.lock();
216     if (!schedule_handle) {
217       // The alarm has been cancelled and might not even exist anymore.
218       continue;
219     }
220     (*schedule_handle)->DoFire();
221   }
222   // Clean up all of the alarms in the front that have been cancelled.
223   while (!alarms_.empty()) {
224     if (alarms_.begin()->second.expired()) {
225       alarms_.erase(alarms_.begin());
226     } else {
227       break;
228     }
229   }
230 }
231 
CreateAlarm(QuicAlarm::Delegate * delegate)232 QuicAlarm* QuicPollEventLoop::AlarmFactory::CreateAlarm(
233     QuicAlarm::Delegate* delegate) {
234   return new Alarm(loop_, QuicArenaScopedPtr<QuicAlarm::Delegate>(delegate));
235 }
236 
CreateAlarm(QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,QuicConnectionArena * arena)237 QuicArenaScopedPtr<QuicAlarm> QuicPollEventLoop::AlarmFactory::CreateAlarm(
238     QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,
239     QuicConnectionArena* arena) {
240   if (arena != nullptr) {
241     return arena->New<Alarm>(loop_, std::move(delegate));
242   }
243   return QuicArenaScopedPtr<QuicAlarm>(new Alarm(loop_, std::move(delegate)));
244 }
245 
Alarm(QuicPollEventLoop * loop,QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)246 QuicPollEventLoop::Alarm::Alarm(
247     QuicPollEventLoop* loop, QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)
248     : QuicAlarm(std::move(delegate)), loop_(loop) {}
249 
SetImpl()250 void QuicPollEventLoop::Alarm::SetImpl() {
251   current_schedule_handle_ = std::make_shared<Alarm*>(this);
252   loop_->alarms_.insert({deadline(), current_schedule_handle_});
253 }
254 
CancelImpl()255 void QuicPollEventLoop::Alarm::CancelImpl() {
256   current_schedule_handle_.reset();
257 }
258 
CreateAlarmFactory()259 std::unique_ptr<QuicAlarmFactory> QuicPollEventLoop::CreateAlarmFactory() {
260   return std::make_unique<AlarmFactory>(this);
261 }
262 
263 }  // namespace quic
264