• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/quic_datagram_queue.h"
6 
7 #include "absl/types/span.h"
8 #include "quiche/quic/core/quic_constants.h"
9 #include "quiche/quic/core/quic_session.h"
10 #include "quiche/quic/core/quic_time.h"
11 #include "quiche/quic/core/quic_types.h"
12 
13 namespace quic {
14 
15 constexpr float kExpiryInMinRtts = 1.25;
16 constexpr float kMinPacingWindows = 4;
17 
QuicDatagramQueue(QuicSession * session)18 QuicDatagramQueue::QuicDatagramQueue(QuicSession* session)
19     : QuicDatagramQueue(session, nullptr) {}
20 
QuicDatagramQueue(QuicSession * session,std::unique_ptr<Observer> observer)21 QuicDatagramQueue::QuicDatagramQueue(QuicSession* session,
22                                      std::unique_ptr<Observer> observer)
23     : session_(session),
24       clock_(session->connection()->clock()),
25       observer_(std::move(observer)),
26       force_flush_(false) {}
27 
SendOrQueueDatagram(quiche::QuicheMemSlice datagram)28 MessageStatus QuicDatagramQueue::SendOrQueueDatagram(
29     quiche::QuicheMemSlice datagram) {
30   // If the queue is non-empty, always queue the daragram.  This ensures that
31   // the datagrams are sent in the same order that they were sent by the
32   // application.
33   if (queue_.empty()) {
34     MessageResult result = session_->SendMessage(absl::MakeSpan(&datagram, 1),
35                                                  /*flush=*/force_flush_);
36     if (result.status != MESSAGE_STATUS_BLOCKED) {
37       if (observer_) {
38         observer_->OnDatagramProcessed(result.status);
39       }
40       return result.status;
41     }
42   }
43 
44   queue_.emplace_back(Datagram{std::move(datagram),
45                                clock_->ApproximateNow() + GetMaxTimeInQueue()});
46   return MESSAGE_STATUS_BLOCKED;
47 }
48 
TrySendingNextDatagram()49 absl::optional<MessageStatus> QuicDatagramQueue::TrySendingNextDatagram() {
50   RemoveExpiredDatagrams();
51   if (queue_.empty()) {
52     return absl::nullopt;
53   }
54 
55   MessageResult result =
56       session_->SendMessage(absl::MakeSpan(&queue_.front().datagram, 1));
57   if (result.status != MESSAGE_STATUS_BLOCKED) {
58     queue_.pop_front();
59     if (observer_) {
60       observer_->OnDatagramProcessed(result.status);
61     }
62   }
63   return result.status;
64 }
65 
SendDatagrams()66 size_t QuicDatagramQueue::SendDatagrams() {
67   size_t num_datagrams = 0;
68   for (;;) {
69     absl::optional<MessageStatus> status = TrySendingNextDatagram();
70     if (!status.has_value()) {
71       break;
72     }
73     if (*status == MESSAGE_STATUS_BLOCKED) {
74       break;
75     }
76     num_datagrams++;
77   }
78   return num_datagrams;
79 }
80 
GetMaxTimeInQueue() const81 QuicTime::Delta QuicDatagramQueue::GetMaxTimeInQueue() const {
82   if (!max_time_in_queue_.IsZero()) {
83     return max_time_in_queue_;
84   }
85 
86   const QuicTime::Delta min_rtt =
87       session_->connection()->sent_packet_manager().GetRttStats()->min_rtt();
88   return std::max(kExpiryInMinRtts * min_rtt,
89                   kMinPacingWindows * kAlarmGranularity);
90 }
91 
RemoveExpiredDatagrams()92 void QuicDatagramQueue::RemoveExpiredDatagrams() {
93   QuicTime now = clock_->ApproximateNow();
94   while (!queue_.empty() && queue_.front().expiry <= now) {
95     queue_.pop_front();
96     if (observer_) {
97       observer_->OnDatagramProcessed(absl::nullopt);
98     }
99   }
100 }
101 
102 }  // namespace quic
103