1 // Copyright 2021 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/http/web_transport_stream_adapter.h"
6
7 #include <cstddef>
8 #include <string>
9 #include <vector>
10
11 #include "absl/status/status.h"
12 #include "absl/status/statusor.h"
13 #include "absl/strings/string_view.h"
14 #include "absl/types/span.h"
15 #include "quiche/quic/core/http/web_transport_http3.h"
16 #include "quiche/quic/core/quic_error_codes.h"
17 #include "quiche/quic/core/quic_session.h"
18 #include "quiche/quic/core/quic_stream.h"
19 #include "quiche/quic/core/quic_stream_sequencer.h"
20 #include "quiche/quic/core/quic_types.h"
21 #include "quiche/quic/core/web_transport_interface.h"
22 #include "quiche/quic/platform/api/quic_bug_tracker.h"
23 #include "quiche/quic/platform/api/quic_flags.h"
24 #include "quiche/quic/platform/api/quic_logging.h"
25 #include "quiche/common/quiche_mem_slice_storage.h"
26 #include "quiche/common/quiche_stream.h"
27 #include "quiche/web_transport/web_transport.h"
28
29 namespace quic {
30
WebTransportStreamAdapter(QuicSession * session,QuicStream * stream,QuicStreamSequencer * sequencer)31 WebTransportStreamAdapter::WebTransportStreamAdapter(
32 QuicSession* session, QuicStream* stream, QuicStreamSequencer* sequencer)
33 : session_(session), stream_(stream), sequencer_(sequencer) {}
34
Read(absl::Span<char> buffer)35 WebTransportStream::ReadResult WebTransportStreamAdapter::Read(
36 absl::Span<char> buffer) {
37 iovec iov;
38 iov.iov_base = buffer.data();
39 iov.iov_len = buffer.size();
40 const size_t result = sequencer_->Readv(&iov, 1);
41 if (!fin_read_ && sequencer_->IsClosed()) {
42 fin_read_ = true;
43 stream_->OnFinRead();
44 }
45 return ReadResult{result, sequencer_->IsClosed()};
46 }
47
Read(std::string * output)48 WebTransportStream::ReadResult WebTransportStreamAdapter::Read(
49 std::string* output) {
50 const size_t old_size = output->size();
51 const size_t bytes_to_read = ReadableBytes();
52 output->resize(old_size + bytes_to_read);
53 ReadResult result =
54 Read(absl::Span<char>(&(*output)[old_size], bytes_to_read));
55 QUICHE_DCHECK_EQ(bytes_to_read, result.bytes_read);
56 output->resize(old_size + result.bytes_read);
57 return result;
58 }
59
Writev(absl::Span<const absl::string_view> data,const quiche::StreamWriteOptions & options)60 absl::Status WebTransportStreamAdapter::Writev(
61 absl::Span<const absl::string_view> data,
62 const quiche::StreamWriteOptions& options) {
63 if (data.empty() && !options.send_fin()) {
64 return absl::InvalidArgumentError(
65 "Writev() called without any data or a FIN");
66 }
67 const absl::Status initial_check_status = CheckBeforeStreamWrite();
68 if (!initial_check_status.ok()) {
69 return initial_check_status;
70 }
71
72 std::vector<iovec> iovecs;
73 size_t total_size = 0;
74 iovecs.resize(data.size());
75 for (size_t i = 0; i < data.size(); i++) {
76 // QuicheMemSliceStorage only reads iovec, thus this is safe.
77 iovecs[i].iov_base = const_cast<char*>(data[i].data());
78 iovecs[i].iov_len = data[i].size();
79 total_size += data[i].size();
80 }
81 quiche::QuicheMemSliceStorage storage(
82 iovecs.data(), iovecs.size(),
83 session_->connection()->helper()->GetStreamSendBufferAllocator(),
84 GetQuicFlag(quic_send_buffer_max_data_slice_size));
85 QuicConsumedData consumed =
86 stream_->WriteMemSlices(storage.ToSpan(), /*fin=*/options.send_fin());
87
88 if (consumed.bytes_consumed == total_size) {
89 return absl::OkStatus();
90 }
91 if (consumed.bytes_consumed == 0) {
92 return absl::UnavailableError("Stream write-blocked");
93 }
94 // WebTransportStream::Write() is an all-or-nothing write API. To achieve
95 // that property, it relies on WriteMemSlices() being an all-or-nothing API.
96 // If WriteMemSlices() fails to provide that guarantee, we have no way to
97 // communicate a partial write to the caller, and thus it's safer to just
98 // close the connection.
99 constexpr absl::string_view kErrorMessage =
100 "WriteMemSlices() unexpectedly partially consumed the input data";
101 QUIC_BUG(WebTransportStreamAdapter partial write)
102 << kErrorMessage << ", provided: " << total_size
103 << ", written: " << consumed.bytes_consumed;
104 stream_->OnUnrecoverableError(QUIC_INTERNAL_ERROR,
105 std::string(kErrorMessage));
106 return absl::InternalError(kErrorMessage);
107 }
108
CheckBeforeStreamWrite() const109 absl::Status WebTransportStreamAdapter::CheckBeforeStreamWrite() const {
110 if (stream_->write_side_closed() || stream_->fin_buffered()) {
111 return absl::FailedPreconditionError("Stream write side is closed");
112 }
113 if (!stream_->CanWriteNewData()) {
114 return absl::UnavailableError("Stream write-blocked");
115 }
116 return absl::OkStatus();
117 }
118
CanWrite() const119 bool WebTransportStreamAdapter::CanWrite() const {
120 return CheckBeforeStreamWrite().ok();
121 }
122
AbruptlyTerminate(absl::Status error)123 void WebTransportStreamAdapter::AbruptlyTerminate(absl::Status error) {
124 QUIC_DLOG(WARNING) << (session_->perspective() == Perspective::IS_CLIENT
125 ? "Client: "
126 : "Server: ")
127 << "Abruptly terminating stream " << stream_->id()
128 << " due to the following error: " << error;
129 ResetDueToInternalError();
130 }
131
ReadableBytes() const132 size_t WebTransportStreamAdapter::ReadableBytes() const {
133 return sequencer_->ReadableBytes();
134 }
135
136 quiche::ReadStream::PeekResult
PeekNextReadableRegion() const137 WebTransportStreamAdapter::PeekNextReadableRegion() const {
138 iovec iov;
139 PeekResult result;
140 if (sequencer_->GetReadableRegion(&iov)) {
141 result.peeked_data =
142 absl::string_view(static_cast<const char*>(iov.iov_base), iov.iov_len);
143 }
144 result.fin_next = sequencer_->IsClosed();
145 result.all_data_received = sequencer_->IsAllDataAvailable();
146 return result;
147 }
148
SkipBytes(size_t bytes)149 bool WebTransportStreamAdapter::SkipBytes(size_t bytes) {
150 if (stream_->read_side_closed()) {
151 // Useful when the stream has been reset in between Peek() and Skip().
152 return true;
153 }
154 sequencer_->MarkConsumed(bytes);
155 if (!fin_read_ && sequencer_->IsClosed()) {
156 fin_read_ = true;
157 stream_->OnFinRead();
158 }
159 return sequencer_->IsClosed();
160 }
161
OnDataAvailable()162 void WebTransportStreamAdapter::OnDataAvailable() {
163 if (visitor_ == nullptr) {
164 return;
165 }
166 const bool fin_readable = sequencer_->IsClosed() && !fin_read_;
167 if (ReadableBytes() == 0 && !fin_readable) {
168 return;
169 }
170 visitor_->OnCanRead();
171 }
172
OnCanWriteNewData()173 void WebTransportStreamAdapter::OnCanWriteNewData() {
174 // Ensure the origin check has been completed, as the stream can be notified
175 // about being writable before that.
176 if (!CanWrite()) {
177 return;
178 }
179 if (visitor_ != nullptr) {
180 visitor_->OnCanWrite();
181 }
182 }
183
ResetWithUserCode(WebTransportStreamError error)184 void WebTransportStreamAdapter::ResetWithUserCode(
185 WebTransportStreamError error) {
186 stream_->ResetWriteSide(QuicResetStreamError(
187 QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
188 }
189
SendStopSending(WebTransportStreamError error)190 void WebTransportStreamAdapter::SendStopSending(WebTransportStreamError error) {
191 stream_->SendStopSending(QuicResetStreamError(
192 QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
193 }
194
195 } // namespace quic
196