1 /*
2 * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "net/dcsctp/rx/reassembly_queue.h"
11
12 #include <stddef.h>
13
14 #include <algorithm>
15 #include <cstdint>
16 #include <memory>
17 #include <set>
18 #include <string>
19 #include <utility>
20 #include <vector>
21
22 #include "absl/strings/string_view.h"
23 #include "absl/types/optional.h"
24 #include "api/array_view.h"
25 #include "net/dcsctp/common/sequence_numbers.h"
26 #include "net/dcsctp/common/str_join.h"
27 #include "net/dcsctp/packet/chunk/forward_tsn_common.h"
28 #include "net/dcsctp/packet/data.h"
29 #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
30 #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
31 #include "net/dcsctp/public/dcsctp_message.h"
32 #include "net/dcsctp/rx/interleaved_reassembly_streams.h"
33 #include "net/dcsctp/rx/reassembly_streams.h"
34 #include "net/dcsctp/rx/traditional_reassembly_streams.h"
35 #include "rtc_base/logging.h"
36
37 namespace dcsctp {
38 namespace {
CreateStreams(absl::string_view log_prefix,ReassemblyStreams::OnAssembledMessage on_assembled_message,bool use_message_interleaving)39 std::unique_ptr<ReassemblyStreams> CreateStreams(
40 absl::string_view log_prefix,
41 ReassemblyStreams::OnAssembledMessage on_assembled_message,
42 bool use_message_interleaving) {
43 if (use_message_interleaving) {
44 return std::make_unique<InterleavedReassemblyStreams>(
45 log_prefix, std::move(on_assembled_message));
46 }
47 return std::make_unique<TraditionalReassemblyStreams>(
48 log_prefix, std::move(on_assembled_message));
49 }
50 } // namespace
51
ReassemblyQueue(absl::string_view log_prefix,TSN peer_initial_tsn,size_t max_size_bytes,bool use_message_interleaving)52 ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
53 TSN peer_initial_tsn,
54 size_t max_size_bytes,
55 bool use_message_interleaving)
56 : log_prefix_(std::string(log_prefix) + "reasm: "),
57 max_size_bytes_(max_size_bytes),
58 watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
59 last_assembled_tsn_watermark_(
60 tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
61 last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
62 streams_(CreateStreams(
63 log_prefix_,
64 [this](rtc::ArrayView<const UnwrappedTSN> tsns,
65 DcSctpMessage message) {
66 AddReassembledMessage(tsns, std::move(message));
67 },
68 use_message_interleaving)) {}
69
Add(TSN tsn,Data data)70 void ReassemblyQueue::Add(TSN tsn, Data data) {
71 RTC_DCHECK(IsConsistent());
72 RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
73 << ", stream=" << *data.stream_id << ":"
74 << *data.message_id << ":" << *data.fsn << ", type="
75 << (data.is_beginning && data.is_end ? "complete"
76 : data.is_beginning ? "first"
77 : data.is_end ? "last"
78 : "middle");
79
80 UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
81
82 if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
83 delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
84 RTC_DLOG(LS_VERBOSE) << log_prefix_
85 << "Chunk has already been delivered - skipping";
86 return;
87 }
88
89 // If a stream reset has been received with a "sender's last assigned tsn" in
90 // the future, the socket is in "deferred reset processing" mode and must
91 // buffer chunks until it's exited.
92 if (deferred_reset_streams_.has_value() &&
93 unwrapped_tsn >
94 tsn_unwrapper_.Unwrap(
95 deferred_reset_streams_->req.sender_last_assigned_tsn())) {
96 RTC_DLOG(LS_VERBOSE)
97 << log_prefix_ << "Deferring chunk with tsn=" << *tsn
98 << " until cum_ack_tsn="
99 << *deferred_reset_streams_->req.sender_last_assigned_tsn();
100 // https://tools.ietf.org/html/rfc6525#section-5.2.2
101 // "In this mode, any data arriving with a TSN larger than the
102 // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
103 // locally and held until the cumulative acknowledgment point reaches the
104 // Sender's Last Assigned TSN."
105 queued_bytes_ += data.size();
106 deferred_reset_streams_->deferred_chunks.emplace_back(
107 std::make_pair(tsn, std::move(data)));
108 } else {
109 queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
110 }
111
112 // https://tools.ietf.org/html/rfc4960#section-6.9
113 // "Note: If the data receiver runs out of buffer space while still
114 // waiting for more fragments to complete the reassembly of the message, it
115 // should dispatch part of its inbound message through a partial delivery
116 // API (see Section 10), freeing some of its receive buffer space so that
117 // the rest of the message may be received."
118
119 // TODO(boivie): Support EOR flag and partial delivery?
120 RTC_DCHECK(IsConsistent());
121 }
122
ResetStreams(const OutgoingSSNResetRequestParameter & req,TSN cum_tsn_ack)123 ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
124 const OutgoingSSNResetRequestParameter& req,
125 TSN cum_tsn_ack) {
126 RTC_DCHECK(IsConsistent());
127 if (deferred_reset_streams_.has_value()) {
128 // In deferred mode already.
129 return ReconfigurationResponseParameter::Result::kInProgress;
130 } else if (req.request_sequence_number() <=
131 last_completed_reset_req_seq_nbr_) {
132 // Already performed at some time previously.
133 return ReconfigurationResponseParameter::Result::kSuccessPerformed;
134 }
135
136 UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
137 UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
138
139 // https://tools.ietf.org/html/rfc6525#section-5.2.2
140 // "If the Sender's Last Assigned TSN is greater than the
141 // cumulative acknowledgment point, then the endpoint MUST enter "deferred
142 // reset processing"."
143 if (sla_tsn > unwrapped_cum_tsn_ack) {
144 RTC_DLOG(LS_VERBOSE)
145 << log_prefix_
146 << "Entering deferred reset processing mode until cum_tsn_ack="
147 << *req.sender_last_assigned_tsn();
148 deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
149 return ReconfigurationResponseParameter::Result::kInProgress;
150 }
151
152 // https://tools.ietf.org/html/rfc6525#section-5.2.2
153 // "... streams MUST be reset to 0 as the next expected SSN."
154 streams_->ResetStreams(req.stream_ids());
155 last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
156 RTC_DCHECK(IsConsistent());
157 return ReconfigurationResponseParameter::Result::kSuccessPerformed;
158 }
159
MaybeResetStreamsDeferred(TSN cum_ack_tsn)160 bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
161 RTC_DCHECK(IsConsistent());
162 if (deferred_reset_streams_.has_value()) {
163 UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
164 UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
165 deferred_reset_streams_->req.sender_last_assigned_tsn());
166 if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
167 RTC_DLOG(LS_VERBOSE) << log_prefix_
168 << "Leaving deferred reset processing with tsn="
169 << *cum_ack_tsn << ", feeding back "
170 << deferred_reset_streams_->deferred_chunks.size()
171 << " chunks";
172 // https://tools.ietf.org/html/rfc6525#section-5.2.2
173 // "... streams MUST be reset to 0 as the next expected SSN."
174 streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
175 std::vector<std::pair<TSN, Data>> deferred_chunks =
176 std::move(deferred_reset_streams_->deferred_chunks);
177 // The response will not be sent now, but as a reply to the retried
178 // request, which will come as "in progress" has been sent prior.
179 last_completed_reset_req_seq_nbr_ =
180 deferred_reset_streams_->req.request_sequence_number();
181 deferred_reset_streams_ = absl::nullopt;
182
183 // https://tools.ietf.org/html/rfc6525#section-5.2.2
184 // "Any queued TSNs (queued at step E2) MUST now be released and processed
185 // normally."
186 for (auto& [tsn, data] : deferred_chunks) {
187 queued_bytes_ -= data.size();
188 Add(tsn, std::move(data));
189 }
190
191 RTC_DCHECK(IsConsistent());
192 return true;
193 } else {
194 RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
195 << *cum_ack_tsn;
196 }
197 }
198
199 return false;
200 }
201
FlushMessages()202 std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
203 std::vector<DcSctpMessage> ret;
204 reassembled_messages_.swap(ret);
205 return ret;
206 }
207
AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,DcSctpMessage message)208 void ReassemblyQueue::AddReassembledMessage(
209 rtc::ArrayView<const UnwrappedTSN> tsns,
210 DcSctpMessage message) {
211 RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
212 << StrJoin(tsns, ",",
213 [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
214 sb << *tsn.Wrap();
215 })
216 << "], message; stream_id=" << *message.stream_id()
217 << ", ppid=" << *message.ppid()
218 << ", payload=" << message.payload().size() << " bytes";
219
220 for (const UnwrappedTSN tsn : tsns) {
221 if (tsn <= last_assembled_tsn_watermark_) {
222 // This can be provoked by a misbehaving peer by sending FORWARD-TSN with
223 // invalid SSNs, allowing ordered messages to stay in the queue that
224 // should've been discarded.
225 RTC_DLOG(LS_VERBOSE)
226 << log_prefix_
227 << "Message is built from fragments already seen - skipping";
228 return;
229 } else if (tsn == last_assembled_tsn_watermark_.next_value()) {
230 // Update watermark, or insert into delivered_tsns_
231 last_assembled_tsn_watermark_.Increment();
232 } else {
233 delivered_tsns_.insert(tsn);
234 }
235 }
236
237 // With new TSNs in delivered_tsns, gaps might be filled.
238 MaybeMoveLastAssembledWatermarkFurther();
239
240 reassembled_messages_.emplace_back(std::move(message));
241 }
242
MaybeMoveLastAssembledWatermarkFurther()243 void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
244 // `delivered_tsns_` contain TSNS when there is a gap between ranges of
245 // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
246 // that list, because if so, it can be moved.
247 while (!delivered_tsns_.empty() &&
248 *delivered_tsns_.begin() ==
249 last_assembled_tsn_watermark_.next_value()) {
250 last_assembled_tsn_watermark_.Increment();
251 delivered_tsns_.erase(delivered_tsns_.begin());
252 }
253 }
254
Handle(const AnyForwardTsnChunk & forward_tsn)255 void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
256 RTC_DCHECK(IsConsistent());
257 UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
258
259 last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
260 delivered_tsns_.erase(delivered_tsns_.begin(),
261 delivered_tsns_.upper_bound(tsn));
262
263 MaybeMoveLastAssembledWatermarkFurther();
264
265 queued_bytes_ -=
266 streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
267 RTC_DCHECK(IsConsistent());
268 }
269
IsConsistent() const270 bool ReassemblyQueue::IsConsistent() const {
271 // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
272 // adjacent.
273 if (!delivered_tsns_.empty() &&
274 last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
275 return false;
276 }
277
278 // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
279 // enforced in this class. This comparison will still trigger if queued_bytes_
280 // became "negative".
281 return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
282 }
283
GetHandoverReadiness() const284 HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
285 HandoverReadinessStatus status = streams_->GetHandoverReadiness();
286 if (!delivered_tsns_.empty()) {
287 status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
288 }
289 if (deferred_reset_streams_.has_value()) {
290 status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
291 }
292 return status;
293 }
294
AddHandoverState(DcSctpSocketHandoverState & state)295 void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
296 state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
297 state.rx.last_completed_deferred_reset_req_sn =
298 last_completed_reset_req_seq_nbr_.value();
299 streams_->AddHandoverState(state);
300 }
301
RestoreFromState(const DcSctpSocketHandoverState & state)302 void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
303 // Validate that the component is in pristine state.
304 RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));
305
306 last_assembled_tsn_watermark_ =
307 tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
308 last_completed_reset_req_seq_nbr_ =
309 ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
310 streams_->RestoreFromState(state);
311 }
312 } // namespace dcsctp
313