1 // Copyright (c) 2021 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ 6 #define QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ 7 8 #include <string> 9 10 #include "absl/status/status.h" 11 #include "quiche/quic/core/web_transport_interface.h" 12 #include "quiche/quic/platform/api/quic_logging.h" 13 #include "quiche/common/platform/api/quiche_logging.h" 14 #include "quiche/common/platform/api/quiche_mem_slice.h" 15 #include "quiche/common/quiche_circular_deque.h" 16 #include "quiche/common/quiche_stream.h" 17 #include "quiche/common/simple_buffer_allocator.h" 18 #include "quiche/spdy/core/http2_header_block.h" 19 20 namespace quic { 21 22 // Discards any incoming data. 23 class WebTransportDiscardVisitor : public WebTransportStreamVisitor { 24 public: WebTransportDiscardVisitor(WebTransportStream * stream)25 WebTransportDiscardVisitor(WebTransportStream* stream) : stream_(stream) {} 26 OnCanRead()27 void OnCanRead() override { 28 std::string buffer; 29 WebTransportStream::ReadResult result = stream_->Read(&buffer); 30 QUIC_DVLOG(2) << "Read " << result.bytes_read 31 << " bytes from WebTransport stream " 32 << stream_->GetStreamId() << ", fin: " << result.fin; 33 } 34 OnCanWrite()35 void OnCanWrite() override {} 36 OnResetStreamReceived(WebTransportStreamError)37 void OnResetStreamReceived(WebTransportStreamError /*error*/) override {} OnStopSendingReceived(WebTransportStreamError)38 void OnStopSendingReceived(WebTransportStreamError /*error*/) override {} OnWriteSideInDataRecvdState()39 void OnWriteSideInDataRecvdState() override {} 40 41 private: 42 WebTransportStream* stream_; 43 }; 44 45 // Echoes any incoming data back on the same stream. 46 class WebTransportBidirectionalEchoVisitor : public WebTransportStreamVisitor { 47 public: WebTransportBidirectionalEchoVisitor(WebTransportStream * stream)48 WebTransportBidirectionalEchoVisitor(WebTransportStream* stream) 49 : stream_(stream) {} 50 OnCanRead()51 void OnCanRead() override { 52 WebTransportStream::ReadResult result = stream_->Read(&buffer_); 53 QUIC_DVLOG(1) << "Attempted reading on WebTransport bidirectional stream " 54 << stream_->GetStreamId() 55 << ", bytes read: " << result.bytes_read; 56 if (result.fin) { 57 send_fin_ = true; 58 } 59 OnCanWrite(); 60 } 61 OnCanWrite()62 void OnCanWrite() override { 63 if (stop_sending_received_) { 64 return; 65 } 66 67 if (!buffer_.empty()) { 68 absl::Status status = quiche::WriteIntoStream(*stream_, buffer_); 69 QUIC_DVLOG(1) << "Attempted writing on WebTransport bidirectional stream " 70 << stream_->GetStreamId() << ", success: " << status; 71 if (!status.ok()) { 72 return; 73 } 74 75 buffer_ = ""; 76 } 77 78 if (send_fin_ && !fin_sent_) { 79 absl::Status status = quiche::SendFinOnStream(*stream_); 80 if (status.ok()) { 81 fin_sent_ = true; 82 } 83 } 84 } 85 OnResetStreamReceived(WebTransportStreamError)86 void OnResetStreamReceived(WebTransportStreamError /*error*/) override { 87 // Send FIN in response to a stream reset. We want to test that we can 88 // operate one side of the stream cleanly while the other is reset, thus 89 // replying with a FIN rather than a RESET_STREAM is more appropriate here. 90 send_fin_ = true; 91 OnCanWrite(); 92 } OnStopSendingReceived(WebTransportStreamError)93 void OnStopSendingReceived(WebTransportStreamError /*error*/) override { 94 stop_sending_received_ = true; 95 } OnWriteSideInDataRecvdState()96 void OnWriteSideInDataRecvdState() override {} 97 98 protected: stream()99 WebTransportStream* stream() { return stream_; } 100 101 private: 102 WebTransportStream* stream_; 103 std::string buffer_; 104 bool send_fin_ = false; 105 bool fin_sent_ = false; 106 bool stop_sending_received_ = false; 107 }; 108 109 // Buffers all of the data and calls |callback| with the entirety of the stream 110 // data. 111 class WebTransportUnidirectionalEchoReadVisitor 112 : public WebTransportStreamVisitor { 113 public: 114 using Callback = std::function<void(const std::string&)>; 115 WebTransportUnidirectionalEchoReadVisitor(WebTransportStream * stream,Callback callback)116 WebTransportUnidirectionalEchoReadVisitor(WebTransportStream* stream, 117 Callback callback) 118 : stream_(stream), callback_(std::move(callback)) {} 119 OnCanRead()120 void OnCanRead() override { 121 WebTransportStream::ReadResult result = stream_->Read(&buffer_); 122 QUIC_DVLOG(1) << "Attempted reading on WebTransport unidirectional stream " 123 << stream_->GetStreamId() 124 << ", bytes read: " << result.bytes_read; 125 if (result.fin) { 126 QUIC_DVLOG(1) << "Finished receiving data on a WebTransport stream " 127 << stream_->GetStreamId() << ", queueing up the echo"; 128 callback_(buffer_); 129 } 130 } 131 OnCanWrite()132 void OnCanWrite() override { QUICHE_NOTREACHED(); } 133 OnResetStreamReceived(WebTransportStreamError)134 void OnResetStreamReceived(WebTransportStreamError /*error*/) override {} OnStopSendingReceived(WebTransportStreamError)135 void OnStopSendingReceived(WebTransportStreamError /*error*/) override {} OnWriteSideInDataRecvdState()136 void OnWriteSideInDataRecvdState() override {} 137 138 private: 139 WebTransportStream* stream_; 140 std::string buffer_; 141 Callback callback_; 142 }; 143 144 // Sends supplied data. 145 class WebTransportUnidirectionalEchoWriteVisitor 146 : public WebTransportStreamVisitor { 147 public: WebTransportUnidirectionalEchoWriteVisitor(WebTransportStream * stream,const std::string & data)148 WebTransportUnidirectionalEchoWriteVisitor(WebTransportStream* stream, 149 const std::string& data) 150 : stream_(stream), data_(data) {} 151 OnCanRead()152 void OnCanRead() override { QUICHE_NOTREACHED(); } OnCanWrite()153 void OnCanWrite() override { 154 if (data_.empty()) { 155 return; 156 } 157 absl::Status write_status = quiche::WriteIntoStream(*stream_, data_); 158 if (!write_status.ok()) { 159 QUICHE_DLOG_IF(WARNING, !absl::IsUnavailable(write_status)) 160 << "Failed to write into stream: " << write_status; 161 return; 162 } 163 data_ = ""; 164 absl::Status fin_status = quiche::SendFinOnStream(*stream_); 165 QUICHE_DVLOG(1) 166 << "WebTransportUnidirectionalEchoWriteVisitor finished sending data."; 167 QUICHE_DCHECK(fin_status.ok()); 168 } 169 OnResetStreamReceived(WebTransportStreamError)170 void OnResetStreamReceived(WebTransportStreamError /*error*/) override {} OnStopSendingReceived(WebTransportStreamError)171 void OnStopSendingReceived(WebTransportStreamError /*error*/) override {} OnWriteSideInDataRecvdState()172 void OnWriteSideInDataRecvdState() override {} 173 174 private: 175 WebTransportStream* stream_; 176 std::string data_; 177 }; 178 179 // A session visitor which sets unidirectional or bidirectional stream visitors 180 // to echo. 181 class EchoWebTransportSessionVisitor : public WebTransportVisitor { 182 public: EchoWebTransportSessionVisitor(WebTransportSession * session)183 EchoWebTransportSessionVisitor(WebTransportSession* session) 184 : session_(session) {} 185 OnSessionReady(const spdy::Http2HeaderBlock &)186 void OnSessionReady(const spdy::Http2HeaderBlock&) override { 187 if (session_->CanOpenNextOutgoingBidirectionalStream()) { 188 OnCanCreateNewOutgoingBidirectionalStream(); 189 } 190 } 191 OnSessionClosed(WebTransportSessionError,const std::string &)192 void OnSessionClosed(WebTransportSessionError /*error_code*/, 193 const std::string& /*error_message*/) override {} 194 OnIncomingBidirectionalStreamAvailable()195 void OnIncomingBidirectionalStreamAvailable() override { 196 while (true) { 197 WebTransportStream* stream = 198 session_->AcceptIncomingBidirectionalStream(); 199 if (stream == nullptr) { 200 return; 201 } 202 QUIC_DVLOG(1) 203 << "EchoWebTransportSessionVisitor received a bidirectional stream " 204 << stream->GetStreamId(); 205 stream->SetVisitor( 206 std::make_unique<WebTransportBidirectionalEchoVisitor>(stream)); 207 stream->visitor()->OnCanRead(); 208 } 209 } 210 OnIncomingUnidirectionalStreamAvailable()211 void OnIncomingUnidirectionalStreamAvailable() override { 212 while (true) { 213 WebTransportStream* stream = 214 session_->AcceptIncomingUnidirectionalStream(); 215 if (stream == nullptr) { 216 return; 217 } 218 QUIC_DVLOG(1) 219 << "EchoWebTransportSessionVisitor received a unidirectional stream"; 220 stream->SetVisitor( 221 std::make_unique<WebTransportUnidirectionalEchoReadVisitor>( 222 stream, [this](const std::string& data) { 223 streams_to_echo_back_.push_back(data); 224 TrySendingUnidirectionalStreams(); 225 })); 226 stream->visitor()->OnCanRead(); 227 } 228 } 229 OnDatagramReceived(absl::string_view datagram)230 void OnDatagramReceived(absl::string_view datagram) override { 231 session_->SendOrQueueDatagram(datagram); 232 } 233 OnCanCreateNewOutgoingBidirectionalStream()234 void OnCanCreateNewOutgoingBidirectionalStream() override { 235 if (!echo_stream_opened_) { 236 WebTransportStream* stream = session_->OpenOutgoingBidirectionalStream(); 237 stream->SetVisitor( 238 std::make_unique<WebTransportBidirectionalEchoVisitor>(stream)); 239 echo_stream_opened_ = true; 240 } 241 } OnCanCreateNewOutgoingUnidirectionalStream()242 void OnCanCreateNewOutgoingUnidirectionalStream() override { 243 TrySendingUnidirectionalStreams(); 244 } 245 TrySendingUnidirectionalStreams()246 void TrySendingUnidirectionalStreams() { 247 while (!streams_to_echo_back_.empty() && 248 session_->CanOpenNextOutgoingUnidirectionalStream()) { 249 QUIC_DVLOG(1) 250 << "EchoWebTransportServer echoed a unidirectional stream back"; 251 WebTransportStream* stream = session_->OpenOutgoingUnidirectionalStream(); 252 stream->SetVisitor( 253 std::make_unique<WebTransportUnidirectionalEchoWriteVisitor>( 254 stream, streams_to_echo_back_.front())); 255 streams_to_echo_back_.pop_front(); 256 stream->visitor()->OnCanWrite(); 257 } 258 } 259 260 private: 261 WebTransportSession* session_; 262 quiche::SimpleBufferAllocator allocator_; 263 bool echo_stream_opened_ = false; 264 265 quiche::QuicheCircularDeque<std::string> streams_to_echo_back_; 266 }; 267 268 } // namespace quic 269 270 #endif // QUICHE_QUIC_TOOLS_WEB_TRANSPORT_TEST_VISITORS_H_ 271