• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "net/dcsctp/rx/reassembly_queue.h"
11 
12 #include <stddef.h>
13 
14 #include <algorithm>
15 #include <cstdint>
16 #include <memory>
17 #include <set>
18 #include <string>
19 #include <utility>
20 #include <vector>
21 
22 #include "absl/strings/string_view.h"
23 #include "absl/types/optional.h"
24 #include "api/array_view.h"
25 #include "net/dcsctp/common/sequence_numbers.h"
26 #include "net/dcsctp/common/str_join.h"
27 #include "net/dcsctp/packet/chunk/forward_tsn_common.h"
28 #include "net/dcsctp/packet/data.h"
29 #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
30 #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
31 #include "net/dcsctp/public/dcsctp_message.h"
32 #include "net/dcsctp/rx/interleaved_reassembly_streams.h"
33 #include "net/dcsctp/rx/reassembly_streams.h"
34 #include "net/dcsctp/rx/traditional_reassembly_streams.h"
35 #include "rtc_base/logging.h"
36 
37 namespace dcsctp {
38 namespace {
CreateStreams(absl::string_view log_prefix,ReassemblyStreams::OnAssembledMessage on_assembled_message,bool use_message_interleaving)39 std::unique_ptr<ReassemblyStreams> CreateStreams(
40     absl::string_view log_prefix,
41     ReassemblyStreams::OnAssembledMessage on_assembled_message,
42     bool use_message_interleaving) {
43   if (use_message_interleaving) {
44     return std::make_unique<InterleavedReassemblyStreams>(
45         log_prefix, std::move(on_assembled_message));
46   }
47   return std::make_unique<TraditionalReassemblyStreams>(
48       log_prefix, std::move(on_assembled_message));
49 }
50 }  // namespace
51 
ReassemblyQueue(absl::string_view log_prefix,TSN peer_initial_tsn,size_t max_size_bytes,bool use_message_interleaving)52 ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
53                                  TSN peer_initial_tsn,
54                                  size_t max_size_bytes,
55                                  bool use_message_interleaving)
56     : log_prefix_(std::string(log_prefix) + "reasm: "),
57       max_size_bytes_(max_size_bytes),
58       watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
59       last_assembled_tsn_watermark_(
60           tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
61       last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
62       streams_(CreateStreams(
63           log_prefix_,
64           [this](rtc::ArrayView<const UnwrappedTSN> tsns,
65                  DcSctpMessage message) {
66             AddReassembledMessage(tsns, std::move(message));
67           },
68           use_message_interleaving)) {}
69 
Add(TSN tsn,Data data)70 void ReassemblyQueue::Add(TSN tsn, Data data) {
71   RTC_DCHECK(IsConsistent());
72   RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
73                        << ", stream=" << *data.stream_id << ":"
74                        << *data.message_id << ":" << *data.fsn << ", type="
75                        << (data.is_beginning && data.is_end ? "complete"
76                            : data.is_beginning              ? "first"
77                            : data.is_end                    ? "last"
78                                                             : "middle");
79 
80   UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
81 
82   if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
83       delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
84     RTC_DLOG(LS_VERBOSE) << log_prefix_
85                          << "Chunk has already been delivered - skipping";
86     return;
87   }
88 
89   // If a stream reset has been received with a "sender's last assigned tsn" in
90   // the future, the socket is in "deferred reset processing" mode and must
91   // buffer chunks until it's exited.
92   if (deferred_reset_streams_.has_value() &&
93       unwrapped_tsn >
94           tsn_unwrapper_.Unwrap(
95               deferred_reset_streams_->req.sender_last_assigned_tsn())) {
96     RTC_DLOG(LS_VERBOSE)
97         << log_prefix_ << "Deferring chunk with tsn=" << *tsn
98         << " until cum_ack_tsn="
99         << *deferred_reset_streams_->req.sender_last_assigned_tsn();
100     // https://tools.ietf.org/html/rfc6525#section-5.2.2
101     // "In this mode, any data arriving with a TSN larger than the
102     // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
103     // locally and held until the cumulative acknowledgment point reaches the
104     // Sender's Last Assigned TSN."
105     queued_bytes_ += data.size();
106     deferred_reset_streams_->deferred_chunks.emplace_back(
107         std::make_pair(tsn, std::move(data)));
108   } else {
109     queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
110   }
111 
112   // https://tools.ietf.org/html/rfc4960#section-6.9
113   // "Note: If the data receiver runs out of buffer space while still
114   // waiting for more fragments to complete the reassembly of the message, it
115   // should dispatch part of its inbound message through a partial delivery
116   // API (see Section 10), freeing some of its receive buffer space so that
117   // the rest of the message may be received."
118 
119   // TODO(boivie): Support EOR flag and partial delivery?
120   RTC_DCHECK(IsConsistent());
121 }
122 
ResetStreams(const OutgoingSSNResetRequestParameter & req,TSN cum_tsn_ack)123 ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
124     const OutgoingSSNResetRequestParameter& req,
125     TSN cum_tsn_ack) {
126   RTC_DCHECK(IsConsistent());
127   if (deferred_reset_streams_.has_value()) {
128     // In deferred mode already.
129     return ReconfigurationResponseParameter::Result::kInProgress;
130   } else if (req.request_sequence_number() <=
131              last_completed_reset_req_seq_nbr_) {
132     // Already performed at some time previously.
133     return ReconfigurationResponseParameter::Result::kSuccessPerformed;
134   }
135 
136   UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
137   UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
138 
139   // https://tools.ietf.org/html/rfc6525#section-5.2.2
140   // "If the Sender's Last Assigned TSN is greater than the
141   // cumulative acknowledgment point, then the endpoint MUST enter "deferred
142   // reset processing"."
143   if (sla_tsn > unwrapped_cum_tsn_ack) {
144     RTC_DLOG(LS_VERBOSE)
145         << log_prefix_
146         << "Entering deferred reset processing mode until cum_tsn_ack="
147         << *req.sender_last_assigned_tsn();
148     deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
149     return ReconfigurationResponseParameter::Result::kInProgress;
150   }
151 
152   // https://tools.ietf.org/html/rfc6525#section-5.2.2
153   // "... streams MUST be reset to 0 as the next expected SSN."
154   streams_->ResetStreams(req.stream_ids());
155   last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
156   RTC_DCHECK(IsConsistent());
157   return ReconfigurationResponseParameter::Result::kSuccessPerformed;
158 }
159 
MaybeResetStreamsDeferred(TSN cum_ack_tsn)160 bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
161   RTC_DCHECK(IsConsistent());
162   if (deferred_reset_streams_.has_value()) {
163     UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
164     UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
165         deferred_reset_streams_->req.sender_last_assigned_tsn());
166     if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
167       RTC_DLOG(LS_VERBOSE) << log_prefix_
168                            << "Leaving deferred reset processing with tsn="
169                            << *cum_ack_tsn << ", feeding back "
170                            << deferred_reset_streams_->deferred_chunks.size()
171                            << " chunks";
172       // https://tools.ietf.org/html/rfc6525#section-5.2.2
173       // "... streams MUST be reset to 0 as the next expected SSN."
174       streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
175       std::vector<std::pair<TSN, Data>> deferred_chunks =
176           std::move(deferred_reset_streams_->deferred_chunks);
177       // The response will not be sent now, but as a reply to the retried
178       // request, which will come as "in progress" has been sent prior.
179       last_completed_reset_req_seq_nbr_ =
180           deferred_reset_streams_->req.request_sequence_number();
181       deferred_reset_streams_ = absl::nullopt;
182 
183       // https://tools.ietf.org/html/rfc6525#section-5.2.2
184       // "Any queued TSNs (queued at step E2) MUST now be released and processed
185       // normally."
186       for (auto& [tsn, data] : deferred_chunks) {
187         queued_bytes_ -= data.size();
188         Add(tsn, std::move(data));
189       }
190 
191       RTC_DCHECK(IsConsistent());
192       return true;
193     } else {
194       RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
195                            << *cum_ack_tsn;
196     }
197   }
198 
199   return false;
200 }
201 
FlushMessages()202 std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
203   std::vector<DcSctpMessage> ret;
204   reassembled_messages_.swap(ret);
205   return ret;
206 }
207 
AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,DcSctpMessage message)208 void ReassemblyQueue::AddReassembledMessage(
209     rtc::ArrayView<const UnwrappedTSN> tsns,
210     DcSctpMessage message) {
211   RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
212                        << StrJoin(tsns, ",",
213                                   [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
214                                     sb << *tsn.Wrap();
215                                   })
216                        << "], message; stream_id=" << *message.stream_id()
217                        << ", ppid=" << *message.ppid()
218                        << ", payload=" << message.payload().size() << " bytes";
219 
220   for (const UnwrappedTSN tsn : tsns) {
221     if (tsn <= last_assembled_tsn_watermark_) {
222       // This can be provoked by a misbehaving peer by sending FORWARD-TSN with
223       // invalid SSNs, allowing ordered messages to stay in the queue that
224       // should've been discarded.
225       RTC_DLOG(LS_VERBOSE)
226           << log_prefix_
227           << "Message is built from fragments already seen - skipping";
228       return;
229     } else if (tsn == last_assembled_tsn_watermark_.next_value()) {
230       // Update watermark, or insert into delivered_tsns_
231       last_assembled_tsn_watermark_.Increment();
232     } else {
233       delivered_tsns_.insert(tsn);
234     }
235   }
236 
237   // With new TSNs in delivered_tsns, gaps might be filled.
238   MaybeMoveLastAssembledWatermarkFurther();
239 
240   reassembled_messages_.emplace_back(std::move(message));
241 }
242 
MaybeMoveLastAssembledWatermarkFurther()243 void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
244   // `delivered_tsns_` contain TSNS when there is a gap between ranges of
245   // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
246   // that list, because if so, it can be moved.
247   while (!delivered_tsns_.empty() &&
248          *delivered_tsns_.begin() ==
249              last_assembled_tsn_watermark_.next_value()) {
250     last_assembled_tsn_watermark_.Increment();
251     delivered_tsns_.erase(delivered_tsns_.begin());
252   }
253 }
254 
Handle(const AnyForwardTsnChunk & forward_tsn)255 void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
256   RTC_DCHECK(IsConsistent());
257   UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
258 
259   last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
260   delivered_tsns_.erase(delivered_tsns_.begin(),
261                         delivered_tsns_.upper_bound(tsn));
262 
263   MaybeMoveLastAssembledWatermarkFurther();
264 
265   queued_bytes_ -=
266       streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
267   RTC_DCHECK(IsConsistent());
268 }
269 
IsConsistent() const270 bool ReassemblyQueue::IsConsistent() const {
271   // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
272   // adjacent.
273   if (!delivered_tsns_.empty() &&
274       last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
275     return false;
276   }
277 
278   // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
279   // enforced in this class. This comparison will still trigger if queued_bytes_
280   // became "negative".
281   return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
282 }
283 
GetHandoverReadiness() const284 HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
285   HandoverReadinessStatus status = streams_->GetHandoverReadiness();
286   if (!delivered_tsns_.empty()) {
287     status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
288   }
289   if (deferred_reset_streams_.has_value()) {
290     status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
291   }
292   return status;
293 }
294 
AddHandoverState(DcSctpSocketHandoverState & state)295 void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
296   state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
297   state.rx.last_completed_deferred_reset_req_sn =
298       last_completed_reset_req_seq_nbr_.value();
299   streams_->AddHandoverState(state);
300 }
301 
RestoreFromState(const DcSctpSocketHandoverState & state)302 void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
303   // Validate that the component is in pristine state.
304   RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));
305 
306   last_assembled_tsn_watermark_ =
307       tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
308   last_completed_reset_req_seq_nbr_ =
309       ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
310   streams_->RestoreFromState(state);
311 }
312 }  // namespace dcsctp
313