• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/http/web_transport_stream_adapter.h"
6 
7 #include "absl/status/status.h"
8 #include "quiche/quic/core/http/web_transport_http3.h"
9 #include "quiche/quic/core/quic_error_codes.h"
10 #include "quiche/quic/core/quic_types.h"
11 #include "quiche/common/platform/api/quiche_mem_slice.h"
12 #include "quiche/common/quiche_buffer_allocator.h"
13 #include "quiche/common/quiche_mem_slice_storage.h"
14 #include "quiche/web_transport/web_transport.h"
15 
16 namespace quic {
17 
WebTransportStreamAdapter(QuicSession * session,QuicStream * stream,QuicStreamSequencer * sequencer)18 WebTransportStreamAdapter::WebTransportStreamAdapter(
19     QuicSession* session, QuicStream* stream, QuicStreamSequencer* sequencer)
20     : session_(session), stream_(stream), sequencer_(sequencer) {}
21 
Read(absl::Span<char> buffer)22 WebTransportStream::ReadResult WebTransportStreamAdapter::Read(
23     absl::Span<char> buffer) {
24   iovec iov;
25   iov.iov_base = buffer.data();
26   iov.iov_len = buffer.size();
27   const size_t result = sequencer_->Readv(&iov, 1);
28   if (!fin_read_ && sequencer_->IsClosed()) {
29     fin_read_ = true;
30     stream_->OnFinRead();
31   }
32   return ReadResult{result, sequencer_->IsClosed()};
33 }
34 
Read(std::string * output)35 WebTransportStream::ReadResult WebTransportStreamAdapter::Read(
36     std::string* output) {
37   const size_t old_size = output->size();
38   const size_t bytes_to_read = ReadableBytes();
39   output->resize(old_size + bytes_to_read);
40   ReadResult result =
41       Read(absl::Span<char>(&(*output)[old_size], bytes_to_read));
42   QUICHE_DCHECK_EQ(bytes_to_read, result.bytes_read);
43   output->resize(old_size + result.bytes_read);
44   return result;
45 }
46 
Writev(absl::Span<const absl::string_view> data,const quiche::StreamWriteOptions & options)47 absl::Status WebTransportStreamAdapter::Writev(
48     absl::Span<const absl::string_view> data,
49     const quiche::StreamWriteOptions& options) {
50   if (data.empty() && !options.send_fin()) {
51     return absl::InvalidArgumentError(
52         "Writev() called without any data or a FIN");
53   }
54   const absl::Status initial_check_status = CheckBeforeStreamWrite();
55   if (!initial_check_status.ok()) {
56     return initial_check_status;
57   }
58 
59   std::vector<iovec> iovecs;
60   size_t total_size = 0;
61   iovecs.resize(data.size());
62   for (size_t i = 0; i < data.size(); i++) {
63     // QuicheMemSliceStorage only reads iovec, thus this is safe.
64     iovecs[i].iov_base = const_cast<char*>(data[i].data());
65     iovecs[i].iov_len = data[i].size();
66     total_size += data[i].size();
67   }
68   quiche::QuicheMemSliceStorage storage(
69       iovecs.data(), iovecs.size(),
70       session_->connection()->helper()->GetStreamSendBufferAllocator(),
71       GetQuicFlag(quic_send_buffer_max_data_slice_size));
72   QuicConsumedData consumed =
73       stream_->WriteMemSlices(storage.ToSpan(), /*fin=*/options.send_fin());
74 
75   if (consumed.bytes_consumed == total_size) {
76     return absl::OkStatus();
77   }
78   if (consumed.bytes_consumed == 0) {
79     return absl::UnavailableError("Stream write-blocked");
80   }
81   // WebTransportStream::Write() is an all-or-nothing write API.  To achieve
82   // that property, it relies on WriteMemSlices() being an all-or-nothing API.
83   // If WriteMemSlices() fails to provide that guarantee, we have no way to
84   // communicate a partial write to the caller, and thus it's safer to just
85   // close the connection.
86   constexpr absl::string_view kErrorMessage =
87       "WriteMemSlices() unexpectedly partially consumed the input data";
88   QUIC_BUG(WebTransportStreamAdapter partial write)
89       << kErrorMessage << ", provided: " << total_size
90       << ", written: " << consumed.bytes_consumed;
91   stream_->OnUnrecoverableError(QUIC_INTERNAL_ERROR,
92                                 std::string(kErrorMessage));
93   return absl::InternalError(kErrorMessage);
94 }
95 
CheckBeforeStreamWrite() const96 absl::Status WebTransportStreamAdapter::CheckBeforeStreamWrite() const {
97   if (stream_->write_side_closed() || stream_->fin_buffered()) {
98     return absl::FailedPreconditionError("Stream write side is closed");
99   }
100   if (!stream_->CanWriteNewData()) {
101     return absl::UnavailableError("Stream write-blocked");
102   }
103   return absl::OkStatus();
104 }
105 
CanWrite() const106 bool WebTransportStreamAdapter::CanWrite() const {
107   return CheckBeforeStreamWrite().ok();
108 }
109 
AbruptlyTerminate(absl::Status error)110 void WebTransportStreamAdapter::AbruptlyTerminate(absl::Status error) {
111   QUIC_DLOG(WARNING) << (session_->perspective() == Perspective::IS_CLIENT
112                              ? "Client: "
113                              : "Server: ")
114                      << "Abruptly terminating stream " << stream_->id()
115                      << " due to the following error: " << error;
116   ResetDueToInternalError();
117 }
118 
ReadableBytes() const119 size_t WebTransportStreamAdapter::ReadableBytes() const {
120   return sequencer_->ReadableBytes();
121 }
122 
OnDataAvailable()123 void WebTransportStreamAdapter::OnDataAvailable() {
124   if (visitor_ == nullptr) {
125     return;
126   }
127   const bool fin_readable = sequencer_->IsClosed() && !fin_read_;
128   if (ReadableBytes() == 0 && !fin_readable) {
129     return;
130   }
131   visitor_->OnCanRead();
132 }
133 
OnCanWriteNewData()134 void WebTransportStreamAdapter::OnCanWriteNewData() {
135   // Ensure the origin check has been completed, as the stream can be notified
136   // about being writable before that.
137   if (!CanWrite()) {
138     return;
139   }
140   if (visitor_ != nullptr) {
141     visitor_->OnCanWrite();
142   }
143 }
144 
ResetWithUserCode(WebTransportStreamError error)145 void WebTransportStreamAdapter::ResetWithUserCode(
146     WebTransportStreamError error) {
147   stream_->ResetWriteSide(QuicResetStreamError(
148       QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
149 }
150 
SendStopSending(WebTransportStreamError error)151 void WebTransportStreamAdapter::SendStopSending(WebTransportStreamError error) {
152   stream_->SendStopSending(QuicResetStreamError(
153       QUIC_STREAM_CANCELLED, WebTransportErrorToHttp3(error)));
154 }
155 
156 }  // namespace quic
157