1 // Copyright (c) 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/batch_writer/quic_batch_writer_base.h"
6
7 #include <cstdint>
8
9 #include "quiche/quic/platform/api/quic_export.h"
10 #include "quiche/quic/platform/api/quic_flags.h"
11 #include "quiche/quic/platform/api/quic_server_stats.h"
12
13 namespace quic {
14
QuicBatchWriterBase(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)15 QuicBatchWriterBase::QuicBatchWriterBase(
16 std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)
17 : write_blocked_(false), batch_buffer_(std::move(batch_buffer)) {}
18
WritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options)19 WriteResult QuicBatchWriterBase::WritePacket(
20 const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
21 const QuicSocketAddress& peer_address, PerPacketOptions* options) {
22 const WriteResult result =
23 InternalWritePacket(buffer, buf_len, self_address, peer_address, options);
24
25 if (IsWriteBlockedStatus(result.status)) {
26 write_blocked_ = true;
27 }
28
29 return result;
30 }
31
InternalWritePacket(const char * buffer,size_t buf_len,const QuicIpAddress & self_address,const QuicSocketAddress & peer_address,PerPacketOptions * options)32 WriteResult QuicBatchWriterBase::InternalWritePacket(
33 const char* buffer, size_t buf_len, const QuicIpAddress& self_address,
34 const QuicSocketAddress& peer_address, PerPacketOptions* options) {
35 if (buf_len > kMaxOutgoingPacketSize) {
36 return WriteResult(WRITE_STATUS_MSG_TOO_BIG, EMSGSIZE);
37 }
38
39 ReleaseTime release_time{0, QuicTime::Delta::Zero()};
40 if (SupportsReleaseTime()) {
41 release_time = GetReleaseTime(options);
42 if (release_time.release_time_offset >= QuicTime::Delta::Zero()) {
43 QUIC_SERVER_HISTOGRAM_TIMES(
44 "batch_writer_positive_release_time_offset",
45 release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
46 "Duration from ideal release time to actual "
47 "release time, in microseconds.");
48 } else {
49 QUIC_SERVER_HISTOGRAM_TIMES(
50 "batch_writer_negative_release_time_offset",
51 -release_time.release_time_offset.ToMicroseconds(), 1, 100000, 50,
52 "Duration from actual release time to ideal "
53 "release time, in microseconds.");
54 }
55 }
56
57 const CanBatchResult can_batch_result =
58 CanBatch(buffer, buf_len, self_address, peer_address, options,
59 release_time.actual_release_time);
60
61 bool buffered = false;
62 bool flush = can_batch_result.must_flush;
63
64 if (can_batch_result.can_batch) {
65 QuicBatchWriterBuffer::PushResult push_result =
66 batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
67 peer_address, options,
68 release_time.actual_release_time);
69 if (push_result.succeeded) {
70 buffered = true;
71 // If there's no space left after the packet is buffered, force a flush.
72 flush = flush || (batch_buffer_->GetNextWriteLocation() == nullptr);
73 } else {
74 // If there's no space without this packet, force a flush.
75 flush = true;
76 }
77 }
78
79 if (!flush) {
80 WriteResult result(WRITE_STATUS_OK, 0);
81 result.send_time_offset = release_time.release_time_offset;
82 return result;
83 }
84
85 size_t num_buffered_packets = buffered_writes().size();
86 const FlushImplResult flush_result = CheckedFlush();
87 WriteResult result = flush_result.write_result;
88 QUIC_DVLOG(1) << "Internally flushed " << flush_result.num_packets_sent
89 << " out of " << num_buffered_packets
90 << " packets. WriteResult=" << result;
91
92 if (result.status != WRITE_STATUS_OK) {
93 if (IsWriteBlockedStatus(result.status)) {
94 return WriteResult(
95 buffered ? WRITE_STATUS_BLOCKED_DATA_BUFFERED : WRITE_STATUS_BLOCKED,
96 result.error_code);
97 }
98
99 // Drop all packets, including the one being written.
100 size_t dropped_packets =
101 buffered ? buffered_writes().size() : buffered_writes().size() + 1;
102
103 batch_buffer().Clear();
104 result.dropped_packets =
105 dropped_packets > std::numeric_limits<uint16_t>::max()
106 ? std::numeric_limits<uint16_t>::max()
107 : static_cast<uint16_t>(dropped_packets);
108 return result;
109 }
110
111 if (!buffered) {
112 QuicBatchWriterBuffer::PushResult push_result =
113 batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
114 peer_address, options,
115 release_time.actual_release_time);
116 buffered = push_result.succeeded;
117
118 // Since buffered_writes has been emptied, this write must have been
119 // buffered successfully.
120 QUIC_BUG_IF(quic_bug_10826_1, !buffered)
121 << "Failed to push to an empty batch buffer."
122 << " self_addr:" << self_address.ToString()
123 << ", peer_addr:" << peer_address.ToString() << ", buf_len:" << buf_len;
124 }
125
126 result.send_time_offset = release_time.release_time_offset;
127 return result;
128 }
129
CheckedFlush()130 QuicBatchWriterBase::FlushImplResult QuicBatchWriterBase::CheckedFlush() {
131 if (buffered_writes().empty()) {
132 return FlushImplResult{WriteResult(WRITE_STATUS_OK, 0),
133 /*num_packets_sent=*/0, /*bytes_written=*/0};
134 }
135
136 const FlushImplResult flush_result = FlushImpl();
137
138 // Either flush_result.write_result.status is not WRITE_STATUS_OK, or it is
139 // WRITE_STATUS_OK and batch_buffer is empty.
140 QUICHE_DCHECK(flush_result.write_result.status != WRITE_STATUS_OK ||
141 buffered_writes().empty());
142
143 // Flush should never return WRITE_STATUS_BLOCKED_DATA_BUFFERED.
144 QUICHE_DCHECK(flush_result.write_result.status !=
145 WRITE_STATUS_BLOCKED_DATA_BUFFERED);
146
147 return flush_result;
148 }
149
Flush()150 WriteResult QuicBatchWriterBase::Flush() {
151 size_t num_buffered_packets = buffered_writes().size();
152 FlushImplResult flush_result = CheckedFlush();
153 QUIC_DVLOG(1) << "Externally flushed " << flush_result.num_packets_sent
154 << " out of " << num_buffered_packets
155 << " packets. WriteResult=" << flush_result.write_result;
156
157 if (IsWriteError(flush_result.write_result.status)) {
158 if (buffered_writes().size() > std::numeric_limits<uint16_t>::max()) {
159 flush_result.write_result.dropped_packets =
160 std::numeric_limits<uint16_t>::max();
161 } else {
162 flush_result.write_result.dropped_packets =
163 static_cast<uint16_t>(buffered_writes().size());
164 }
165 // Treat all errors as non-retryable fatal errors. Drop all buffered packets
166 // to avoid sending them and getting the same error again.
167 batch_buffer().Clear();
168 }
169
170 if (flush_result.write_result.status == WRITE_STATUS_BLOCKED) {
171 write_blocked_ = true;
172 }
173 return flush_result.write_result;
174 }
175
176 } // namespace quic
177