• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2021 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_
6 #define QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_
7 
8 #include <string>
9 
10 #include "absl/status/status.h"
11 #include "quiche/quic/core/web_transport_interface.h"
12 #include "quiche/quic/platform/api/quic_logging.h"
13 #include "quiche/common/platform/api/quiche_logging.h"
14 #include "quiche/common/platform/api/quiche_mem_slice.h"
15 #include "quiche/common/quiche_circular_deque.h"
16 #include "quiche/common/quiche_stream.h"
17 #include "quiche/common/simple_buffer_allocator.h"
18 #include "quiche/spdy/core/http2_header_block.h"
19 
20 namespace quic {
21 
22 // Discards any incoming data.
23 class WebTransportDiscardVisitor : public WebTransportStreamVisitor {
24  public:
WebTransportDiscardVisitor(WebTransportStream * stream)25   WebTransportDiscardVisitor(WebTransportStream* stream) : stream_(stream) {}
26 
OnCanRead()27   void OnCanRead() override {
28     std::string buffer;
29     WebTransportStream::ReadResult result = stream_->Read(&buffer);
30     QUIC_DVLOG(2) << "Read " << result.bytes_read
31                   << " bytes from WebTransport stream "
32                   << stream_->GetStreamId() << ", fin: " << result.fin;
33   }
34 
OnCanWrite()35   void OnCanWrite() override {}
36 
OnResetStreamReceived(WebTransportStreamError)37   void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
OnStopSendingReceived(WebTransportStreamError)38   void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
OnWriteSideInDataRecvdState()39   void OnWriteSideInDataRecvdState() override {}
40 
41  private:
42   WebTransportStream* stream_;
43 };
44 
45 // Echoes any incoming data back on the same stream.
46 class WebTransportBidirectionalEchoVisitor : public WebTransportStreamVisitor {
47  public:
WebTransportBidirectionalEchoVisitor(WebTransportStream * stream)48   WebTransportBidirectionalEchoVisitor(WebTransportStream* stream)
49       : stream_(stream) {}
50 
OnCanRead()51   void OnCanRead() override {
52     WebTransportStream::ReadResult result = stream_->Read(&buffer_);
53     QUIC_DVLOG(1) << "Attempted reading on WebTransport bidirectional stream "
54                   << stream_->GetStreamId()
55                   << ", bytes read: " << result.bytes_read;
56     if (result.fin) {
57       send_fin_ = true;
58     }
59     OnCanWrite();
60   }
61 
OnCanWrite()62   void OnCanWrite() override {
63     if (stop_sending_received_) {
64       return;
65     }
66 
67     if (!buffer_.empty()) {
68       absl::Status status = quiche::WriteIntoStream(*stream_, buffer_);
69       QUIC_DVLOG(1) << "Attempted writing on WebTransport bidirectional stream "
70                     << stream_->GetStreamId() << ", success: " << status;
71       if (!status.ok()) {
72         return;
73       }
74 
75       buffer_ = "";
76     }
77 
78     if (send_fin_ && !fin_sent_) {
79       absl::Status status = quiche::SendFinOnStream(*stream_);
80       if (status.ok()) {
81         fin_sent_ = true;
82       }
83     }
84   }
85 
OnResetStreamReceived(WebTransportStreamError)86   void OnResetStreamReceived(WebTransportStreamError /*error*/) override {
87     // Send FIN in response to a stream reset.  We want to test that we can
88     // operate one side of the stream cleanly while the other is reset, thus
89     // replying with a FIN rather than a RESET_STREAM is more appropriate here.
90     send_fin_ = true;
91     OnCanWrite();
92   }
OnStopSendingReceived(WebTransportStreamError)93   void OnStopSendingReceived(WebTransportStreamError /*error*/) override {
94     stop_sending_received_ = true;
95   }
OnWriteSideInDataRecvdState()96   void OnWriteSideInDataRecvdState() override {}
97 
98  protected:
stream()99   WebTransportStream* stream() { return stream_; }
100 
101  private:
102   WebTransportStream* stream_;
103   std::string buffer_;
104   bool send_fin_ = false;
105   bool fin_sent_ = false;
106   bool stop_sending_received_ = false;
107 };
108 
109 // Buffers all of the data and calls |callback| with the entirety of the stream
110 // data.
111 class WebTransportUnidirectionalEchoReadVisitor
112     : public WebTransportStreamVisitor {
113  public:
114   using Callback = std::function<void(const std::string&)>;
115 
WebTransportUnidirectionalEchoReadVisitor(WebTransportStream * stream,Callback callback)116   WebTransportUnidirectionalEchoReadVisitor(WebTransportStream* stream,
117                                             Callback callback)
118       : stream_(stream), callback_(std::move(callback)) {}
119 
OnCanRead()120   void OnCanRead() override {
121     WebTransportStream::ReadResult result = stream_->Read(&buffer_);
122     QUIC_DVLOG(1) << "Attempted reading on WebTransport unidirectional stream "
123                   << stream_->GetStreamId()
124                   << ", bytes read: " << result.bytes_read;
125     if (result.fin) {
126       QUIC_DVLOG(1) << "Finished receiving data on a WebTransport stream "
127                     << stream_->GetStreamId() << ", queueing up the echo";
128       callback_(buffer_);
129     }
130   }
131 
OnCanWrite()132   void OnCanWrite() override { QUICHE_NOTREACHED(); }
133 
OnResetStreamReceived(WebTransportStreamError)134   void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
OnStopSendingReceived(WebTransportStreamError)135   void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
OnWriteSideInDataRecvdState()136   void OnWriteSideInDataRecvdState() override {}
137 
138  private:
139   WebTransportStream* stream_;
140   std::string buffer_;
141   Callback callback_;
142 };
143 
144 // Sends supplied data.
145 class WebTransportUnidirectionalEchoWriteVisitor
146     : public WebTransportStreamVisitor {
147  public:
WebTransportUnidirectionalEchoWriteVisitor(WebTransportStream * stream,const std::string & data)148   WebTransportUnidirectionalEchoWriteVisitor(WebTransportStream* stream,
149                                              const std::string& data)
150       : stream_(stream), data_(data) {}
151 
OnCanRead()152   void OnCanRead() override { QUICHE_NOTREACHED(); }
OnCanWrite()153   void OnCanWrite() override {
154     if (data_.empty()) {
155       return;
156     }
157     absl::Status write_status = quiche::WriteIntoStream(*stream_, data_);
158     if (!write_status.ok()) {
159       QUICHE_DLOG_IF(WARNING, !absl::IsUnavailable(write_status))
160           << "Failed to write into stream: " << write_status;
161       return;
162     }
163     data_ = "";
164     absl::Status fin_status = quiche::SendFinOnStream(*stream_);
165     QUICHE_DVLOG(1)
166         << "WebTransportUnidirectionalEchoWriteVisitor finished sending data.";
167     QUICHE_DCHECK(fin_status.ok());
168   }
169 
OnResetStreamReceived(WebTransportStreamError)170   void OnResetStreamReceived(WebTransportStreamError /*error*/) override {}
OnStopSendingReceived(WebTransportStreamError)171   void OnStopSendingReceived(WebTransportStreamError /*error*/) override {}
OnWriteSideInDataRecvdState()172   void OnWriteSideInDataRecvdState() override {}
173 
174  private:
175   WebTransportStream* stream_;
176   std::string data_;
177 };
178 
179 // A session visitor which sets unidirectional or bidirectional stream visitors
180 // to echo.
181 class EchoWebTransportSessionVisitor : public WebTransportVisitor {
182  public:
EchoWebTransportSessionVisitor(WebTransportSession * session)183   EchoWebTransportSessionVisitor(WebTransportSession* session)
184       : session_(session) {}
185 
OnSessionReady(const spdy::Http2HeaderBlock &)186   void OnSessionReady(const spdy::Http2HeaderBlock&) override {
187     if (session_->CanOpenNextOutgoingBidirectionalStream()) {
188       OnCanCreateNewOutgoingBidirectionalStream();
189     }
190   }
191 
OnSessionClosed(WebTransportSessionError,const std::string &)192   void OnSessionClosed(WebTransportSessionError /*error_code*/,
193                        const std::string& /*error_message*/) override {}
194 
OnIncomingBidirectionalStreamAvailable()195   void OnIncomingBidirectionalStreamAvailable() override {
196     while (true) {
197       WebTransportStream* stream =
198           session_->AcceptIncomingBidirectionalStream();
199       if (stream == nullptr) {
200         return;
201       }
202       QUIC_DVLOG(1)
203           << "EchoWebTransportSessionVisitor received a bidirectional stream "
204           << stream->GetStreamId();
205       stream->SetVisitor(
206           std::make_unique<WebTransportBidirectionalEchoVisitor>(stream));
207       stream->visitor()->OnCanRead();
208     }
209   }
210 
OnIncomingUnidirectionalStreamAvailable()211   void OnIncomingUnidirectionalStreamAvailable() override {
212     while (true) {
213       WebTransportStream* stream =
214           session_->AcceptIncomingUnidirectionalStream();
215       if (stream == nullptr) {
216         return;
217       }
218       QUIC_DVLOG(1)
219           << "EchoWebTransportSessionVisitor received a unidirectional stream";
220       stream->SetVisitor(
221           std::make_unique<WebTransportUnidirectionalEchoReadVisitor>(
222               stream, [this](const std::string& data) {
223                 streams_to_echo_back_.push_back(data);
224                 TrySendingUnidirectionalStreams();
225               }));
226       stream->visitor()->OnCanRead();
227     }
228   }
229 
OnDatagramReceived(absl::string_view datagram)230   void OnDatagramReceived(absl::string_view datagram) override {
231     session_->SendOrQueueDatagram(datagram);
232   }
233 
OnCanCreateNewOutgoingBidirectionalStream()234   void OnCanCreateNewOutgoingBidirectionalStream() override {
235     if (!echo_stream_opened_) {
236       WebTransportStream* stream = session_->OpenOutgoingBidirectionalStream();
237       stream->SetVisitor(
238           std::make_unique<WebTransportBidirectionalEchoVisitor>(stream));
239       echo_stream_opened_ = true;
240     }
241   }
OnCanCreateNewOutgoingUnidirectionalStream()242   void OnCanCreateNewOutgoingUnidirectionalStream() override {
243     TrySendingUnidirectionalStreams();
244   }
245 
TrySendingUnidirectionalStreams()246   void TrySendingUnidirectionalStreams() {
247     while (!streams_to_echo_back_.empty() &&
248            session_->CanOpenNextOutgoingUnidirectionalStream()) {
249       QUIC_DVLOG(1)
250           << "EchoWebTransportServer echoed a unidirectional stream back";
251       WebTransportStream* stream = session_->OpenOutgoingUnidirectionalStream();
252       stream->SetVisitor(
253           std::make_unique<WebTransportUnidirectionalEchoWriteVisitor>(
254               stream, streams_to_echo_back_.front()));
255       streams_to_echo_back_.pop_front();
256       stream->visitor()->OnCanWrite();
257     }
258   }
259 
260  private:
261   WebTransportSession* session_;
262   quiche::SimpleBufferAllocator allocator_;
263   bool echo_stream_opened_ = false;
264 
265   quiche::QuicheCircularDeque<std::string> streams_to_echo_back_;
266 };
267 
268 }  // namespace quic
269 
270 #endif  // QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_
271