1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/quic_datagram_queue.h"
6
7 #include "absl/types/span.h"
8 #include "quiche/quic/core/quic_constants.h"
9 #include "quiche/quic/core/quic_session.h"
10 #include "quiche/quic/core/quic_time.h"
11 #include "quiche/quic/core/quic_types.h"
12
13 namespace quic {
14
15 constexpr float kExpiryInMinRtts = 1.25;
16 constexpr float kMinPacingWindows = 4;
17
QuicDatagramQueue(QuicSession * session)18 QuicDatagramQueue::QuicDatagramQueue(QuicSession* session)
19 : QuicDatagramQueue(session, nullptr) {}
20
QuicDatagramQueue(QuicSession * session,std::unique_ptr<Observer> observer)21 QuicDatagramQueue::QuicDatagramQueue(QuicSession* session,
22 std::unique_ptr<Observer> observer)
23 : session_(session),
24 clock_(session->connection()->clock()),
25 observer_(std::move(observer)),
26 force_flush_(false) {}
27
SendOrQueueDatagram(quiche::QuicheMemSlice datagram)28 MessageStatus QuicDatagramQueue::SendOrQueueDatagram(
29 quiche::QuicheMemSlice datagram) {
30 // If the queue is non-empty, always queue the daragram. This ensures that
31 // the datagrams are sent in the same order that they were sent by the
32 // application.
33 if (queue_.empty()) {
34 MessageResult result = session_->SendMessage(absl::MakeSpan(&datagram, 1),
35 /*flush=*/force_flush_);
36 if (result.status != MESSAGE_STATUS_BLOCKED) {
37 if (observer_) {
38 observer_->OnDatagramProcessed(result.status);
39 }
40 return result.status;
41 }
42 }
43
44 queue_.emplace_back(Datagram{std::move(datagram),
45 clock_->ApproximateNow() + GetMaxTimeInQueue()});
46 return MESSAGE_STATUS_BLOCKED;
47 }
48
TrySendingNextDatagram()49 absl::optional<MessageStatus> QuicDatagramQueue::TrySendingNextDatagram() {
50 RemoveExpiredDatagrams();
51 if (queue_.empty()) {
52 return absl::nullopt;
53 }
54
55 MessageResult result =
56 session_->SendMessage(absl::MakeSpan(&queue_.front().datagram, 1));
57 if (result.status != MESSAGE_STATUS_BLOCKED) {
58 queue_.pop_front();
59 if (observer_) {
60 observer_->OnDatagramProcessed(result.status);
61 }
62 }
63 return result.status;
64 }
65
SendDatagrams()66 size_t QuicDatagramQueue::SendDatagrams() {
67 size_t num_datagrams = 0;
68 for (;;) {
69 absl::optional<MessageStatus> status = TrySendingNextDatagram();
70 if (!status.has_value()) {
71 break;
72 }
73 if (*status == MESSAGE_STATUS_BLOCKED) {
74 break;
75 }
76 num_datagrams++;
77 }
78 return num_datagrams;
79 }
80
GetMaxTimeInQueue() const81 QuicTime::Delta QuicDatagramQueue::GetMaxTimeInQueue() const {
82 if (!max_time_in_queue_.IsZero()) {
83 return max_time_in_queue_;
84 }
85
86 const QuicTime::Delta min_rtt =
87 session_->connection()->sent_packet_manager().GetRttStats()->min_rtt();
88 return std::max(kExpiryInMinRtts * min_rtt,
89 kMinPacingWindows * kAlarmGranularity);
90 }
91
RemoveExpiredDatagrams()92 void QuicDatagramQueue::RemoveExpiredDatagrams() {
93 QuicTime now = clock_->ApproximateNow();
94 while (!queue_.empty() && queue_.front().expiry <= now) {
95 queue_.pop_front();
96 if (observer_) {
97 observer_->OnDatagramProcessed(absl::nullopt);
98 }
99 }
100 }
101
102 } // namespace quic
103