• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/batch_writer/quic_batch_writer_base.h"
6 
7 #include <cstdint>
8 
9 #include "quiche/quic/platform/api/quic_export.h"
10 #include "quiche/quic/platform/api/quic_flags.h"
11 #include "quiche/quic/platform/api/quic_server_stats.h"
12 
13 namespace quic {
14 
QuicBatchWriterBase(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)15 QuicBatchWriterBase::QuicBatchWriterBase(
16     std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)
17     : write_blocked_(false), batch_buffer_(std::move(batch_buffer)) {}
18 
WritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options)19 WriteResult QuicBatchWriterBase::WritePacket(
20     const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
21     const QuicSocketAddress& peer_address, PerPacketOptions* options) {
22   const WriteResult result =
23       InternalWritePacket(buffer, buf_len, self_address, peer_address, options);
24 
25   if (IsWriteBlockedStatus(result.status)) {
26     write_blocked_ = true;
27   }
28 
29   return result;
30 }
31 
InternalWritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options)32 WriteResult QuicBatchWriterBase::InternalWritePacket(
33     const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
34     const QuicSocketAddress& peer_address, PerPacketOptions* options) {
35   if (buf_len > kMaxOutgoingPacketSize) {
36     return WriteResult(WRITE_STATUS_MSG_TOO_BIG, EMSGSIZE);
37   }
38 
39   ReleaseTime release_time{0, QuicTime::Delta::Zero()};
40   if (SupportsReleaseTime()) {
41     release_time = GetReleaseTime(options);
42     if (release_time.release_time_offset >= QuicTime::Delta::Zero()) {
43       QUIC_SERVER_HISTOGRAM_TIMES(
44           "batch_writer_positive_release_time_offset",
45           release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
46           "Duration from ideal release time to actual "
47           "release time, in microseconds.");
48     } else {
49       QUIC_SERVER_HISTOGRAM_TIMES(
50           "batch_writer_negative_release_time_offset",
51           -release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
52           "Duration from actual release time to ideal "
53           "release time, in microseconds.");
54     }
55   }
56 
57   const CanBatchResult can_batch_result =
58       CanBatch(buffer, buf_len, self_address, peer_address, options,
59                release_time.actual_release_time);
60 
61   bool buffered = false;
62   bool flush = can_batch_result.must_flush;
63 
64   if (can_batch_result.can_batch) {
65     QuicBatchWriterBuffer::PushResult push_result =
66         batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
67                                          peer_address, options,
68                                          release_time.actual_release_time);
69     if (push_result.succeeded) {
70       buffered = true;
71       // If there's no space left after the packet is buffered, force a flush.
72       flush = flush || (batch_buffer_->GetNextWriteLocation() == nullptr);
73     } else {
74       // If there's no space without this packet, force a flush.
75       flush = true;
76     }
77   }
78 
79   if (!flush) {
80     WriteResult result(WRITE_STATUS_OK, 0);
81     result.send_time_offset = release_time.release_time_offset;
82     return result;
83   }
84 
85   size_t num_buffered_packets = buffered_writes().size();
86   const FlushImplResult flush_result = CheckedFlush();
87   WriteResult result = flush_result.write_result;
88   QUIC_DVLOG(1) << "Internally flushed " << flush_result.num_packets_sent
89                 << " out of " << num_buffered_packets
90                 << " packets. WriteResult=" << result;
91 
92   if (result.status != WRITE_STATUS_OK) {
93     if (IsWriteBlockedStatus(result.status)) {
94       return WriteResult(
95           buffered ? WRITE_STATUS_BLOCKED_DATA_BUFFERED : WRITE_STATUS_BLOCKED,
96           result.error_code);
97     }
98 
99     // Drop all packets, including the one being written.
100     size_t dropped_packets =
101         buffered ? buffered_writes().size() : buffered_writes().size() + 1;
102 
103     batch_buffer().Clear();
104     result.dropped_packets =
105         dropped_packets > std::numeric_limits<uint16_t>::max()
106             ? std::numeric_limits<uint16_t>::max()
107             : static_cast<uint16_t>(dropped_packets);
108     return result;
109   }
110 
111   if (!buffered) {
112     QuicBatchWriterBuffer::PushResult push_result =
113         batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
114                                          peer_address, options,
115                                          release_time.actual_release_time);
116     buffered = push_result.succeeded;
117 
118     // Since buffered_writes has been emptied, this write must have been
119     // buffered successfully.
120     QUIC_BUG_IF(quic_bug_10826_1, !buffered)
121         << "Failed to push to an empty batch buffer."
122         << "  self_addr:" << self_address.ToString()
123         << ", peer_addr:" << peer_address.ToString() << ", buf_len:" << buf_len;
124   }
125 
126   result.send_time_offset = release_time.release_time_offset;
127   return result;
128 }
129 
CheckedFlush()130 QuicBatchWriterBase::FlushImplResult QuicBatchWriterBase::CheckedFlush() {
131   if (buffered_writes().empty()) {
132     return FlushImplResult{WriteResult(WRITE_STATUS_OK, 0),
133                            /*num_packets_sent=*/0, /*bytes_written=*/0};
134   }
135 
136   const FlushImplResult flush_result = FlushImpl();
137 
138   // Either flush_result.write_result.status is not WRITE_STATUS_OK, or it is
139   // WRITE_STATUS_OK and batch_buffer is empty.
140   QUICHE_DCHECK(flush_result.write_result.status != WRITE_STATUS_OK ||
141                 buffered_writes().empty());
142 
143   // Flush should never return WRITE_STATUS_BLOCKED_DATA_BUFFERED.
144   QUICHE_DCHECK(flush_result.write_result.status !=
145                 WRITE_STATUS_BLOCKED_DATA_BUFFERED);
146 
147   return flush_result;
148 }
149 
Flush()150 WriteResult QuicBatchWriterBase::Flush() {
151   size_t num_buffered_packets = buffered_writes().size();
152   FlushImplResult flush_result = CheckedFlush();
153   QUIC_DVLOG(1) << "Externally flushed " << flush_result.num_packets_sent
154                 << " out of " << num_buffered_packets
155                 << " packets. WriteResult=" << flush_result.write_result;
156 
157   if (IsWriteError(flush_result.write_result.status)) {
158     if (buffered_writes().size() > std::numeric_limits<uint16_t>::max()) {
159       flush_result.write_result.dropped_packets =
160           std::numeric_limits<uint16_t>::max();
161     } else {
162       flush_result.write_result.dropped_packets =
163           static_cast<uint16_t>(buffered_writes().size());
164     }
165     // Treat all errors as non-retryable fatal errors. Drop all buffered packets
166     // to avoid sending them and getting the same error again.
167     batch_buffer().Clear();
168   }
169 
170   if (flush_result.write_result.status == WRITE_STATUS_BLOCKED) {
171     write_blocked_ = true;
172   }
173   return flush_result.write_result;
174 }
175 
176 }  // namespace quic
177