• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/io/event_loop_connecting_client_socket.h"
6 
7 #include <limits>
8 #include <string>
9 #include <utility>
10 
11 #include "absl/status/status.h"
12 #include "absl/status/statusor.h"
13 #include "absl/strings/string_view.h"
14 #include "absl/types/span.h"
15 #include "absl/types/variant.h"
16 #include "quiche/quic/core/io/quic_event_loop.h"
17 #include "quiche/quic/core/io/socket.h"
18 #include "quiche/quic/platform/api/quic_socket_address.h"
19 #include "quiche/common/platform/api/quiche_logging.h"
20 #include "quiche/common/platform/api/quiche_mem_slice.h"
21 
22 namespace quic {
23 
EventLoopConnectingClientSocket(socket_api::SocketProtocol protocol,const quic::QuicSocketAddress & peer_address,QuicByteCount receive_buffer_size,QuicByteCount send_buffer_size,QuicEventLoop * event_loop,quiche::QuicheBufferAllocator * buffer_allocator,AsyncVisitor * async_visitor)24 EventLoopConnectingClientSocket::EventLoopConnectingClientSocket(
25     socket_api::SocketProtocol protocol,
26     const quic::QuicSocketAddress& peer_address,
27     QuicByteCount receive_buffer_size, QuicByteCount send_buffer_size,
28     QuicEventLoop* event_loop, quiche::QuicheBufferAllocator* buffer_allocator,
29     AsyncVisitor* async_visitor)
30     : protocol_(protocol),
31       peer_address_(peer_address),
32       receive_buffer_size_(receive_buffer_size),
33       send_buffer_size_(send_buffer_size),
34       event_loop_(event_loop),
35       buffer_allocator_(buffer_allocator),
36       async_visitor_(async_visitor) {
37   QUICHE_DCHECK(event_loop_);
38   QUICHE_DCHECK(buffer_allocator_);
39 }
40 
~EventLoopConnectingClientSocket()41 EventLoopConnectingClientSocket::~EventLoopConnectingClientSocket() {
42   // Connected socket must be closed via Disconnect() before destruction. Cannot
43   // safely recover if state indicates caller may be expecting async callbacks.
44   QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
45   QUICHE_DCHECK(!receive_max_size_.has_value());
46   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
47   if (descriptor_ != kInvalidSocketFd) {
48     QUICHE_BUG(quic_event_loop_connecting_socket_invalid_destruction)
49         << "Must call Disconnect() on connected socket before destruction.";
50     Close();
51   }
52 
53   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
54   QUICHE_DCHECK(send_remaining_.empty());
55 }
56 
ConnectBlocking()57 absl::Status EventLoopConnectingClientSocket::ConnectBlocking() {
58   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
59   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
60   QUICHE_DCHECK(!receive_max_size_.has_value());
61   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
62 
63   absl::Status status = Open();
64   if (!status.ok()) {
65     return status;
66   }
67 
68   status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
69   if (!status.ok()) {
70     QUICHE_LOG_FIRST_N(WARNING, 100)
71         << "Failed to set socket to address: " << peer_address_.ToString()
72         << " as blocking for connect with error: " << status;
73     Close();
74     return status;
75   }
76 
77   status = DoInitialConnect();
78 
79   if (absl::IsUnavailable(status)) {
80     QUICHE_LOG_FIRST_N(ERROR, 100)
81         << "Non-blocking connect to should-be blocking socket to address:"
82         << peer_address_.ToString() << ".";
83     Close();
84     connect_status_ = ConnectStatus::kNotConnected;
85     return status;
86   } else if (!status.ok()) {
87     // DoInitialConnect() closes the socket on failures.
88     QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
89     QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
90     return status;
91   }
92 
93   status = socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
94   if (!status.ok()) {
95     QUICHE_LOG_FIRST_N(WARNING, 100)
96         << "Failed to return socket to address: " << peer_address_.ToString()
97         << " to non-blocking after connect with error: " << status;
98     Close();
99     connect_status_ = ConnectStatus::kNotConnected;
100   }
101 
102   QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
103   return status;
104 }
105 
ConnectAsync()106 void EventLoopConnectingClientSocket::ConnectAsync() {
107   QUICHE_DCHECK(async_visitor_);
108   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
109   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
110   QUICHE_DCHECK(!receive_max_size_.has_value());
111   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
112 
113   absl::Status status = Open();
114   if (!status.ok()) {
115     async_visitor_->ConnectComplete(status);
116     return;
117   }
118 
119   FinishOrRearmAsyncConnect(DoInitialConnect());
120 }
121 
Disconnect()122 void EventLoopConnectingClientSocket::Disconnect() {
123   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
124   QUICHE_DCHECK(connect_status_ != ConnectStatus::kNotConnected);
125 
126   Close();
127   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
128 
129   // Reset all state before invoking any callbacks.
130   bool require_connect_callback = connect_status_ == ConnectStatus::kConnecting;
131   connect_status_ = ConnectStatus::kNotConnected;
132   bool require_receive_callback = receive_max_size_.has_value();
133   receive_max_size_.reset();
134   bool require_send_callback =
135       !absl::holds_alternative<absl::monostate>(send_data_);
136   send_data_ = absl::monostate();
137   send_remaining_ = "";
138 
139   if (require_connect_callback) {
140     QUICHE_DCHECK(async_visitor_);
141     async_visitor_->ConnectComplete(absl::CancelledError());
142   }
143   if (require_receive_callback) {
144     QUICHE_DCHECK(async_visitor_);
145     async_visitor_->ReceiveComplete(absl::CancelledError());
146   }
147   if (require_send_callback) {
148     QUICHE_DCHECK(async_visitor_);
149     async_visitor_->SendComplete(absl::CancelledError());
150   }
151 }
152 
153 absl::StatusOr<QuicSocketAddress>
GetLocalAddress()154 EventLoopConnectingClientSocket::GetLocalAddress() {
155   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
156   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
157 
158   return socket_api::GetSocketAddress(descriptor_);
159 }
160 
161 absl::StatusOr<quiche::QuicheMemSlice>
ReceiveBlocking(QuicByteCount max_size)162 EventLoopConnectingClientSocket::ReceiveBlocking(QuicByteCount max_size) {
163   QUICHE_DCHECK_GT(max_size, 0u);
164   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
165   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
166   QUICHE_DCHECK(!receive_max_size_.has_value());
167 
168   absl::Status status =
169       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
170   if (!status.ok()) {
171     QUICHE_LOG_FIRST_N(WARNING, 100)
172         << "Failed to set socket to address: " << peer_address_.ToString()
173         << " as blocking for receive with error: " << status;
174     return status;
175   }
176 
177   receive_max_size_ = max_size;
178   absl::StatusOr<quiche::QuicheMemSlice> buffer = ReceiveInternal();
179 
180   if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
181     QUICHE_LOG_FIRST_N(ERROR, 100)
182         << "Non-blocking receive from should-be blocking socket to address:"
183         << peer_address_.ToString() << ".";
184     receive_max_size_.reset();
185   } else {
186     QUICHE_DCHECK(!receive_max_size_.has_value());
187   }
188 
189   absl::Status set_non_blocking_status =
190       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
191   if (!set_non_blocking_status.ok()) {
192     QUICHE_LOG_FIRST_N(WARNING, 100)
193         << "Failed to return socket to address: " << peer_address_.ToString()
194         << " to non-blocking after receive with error: "
195         << set_non_blocking_status;
196     return set_non_blocking_status;
197   }
198 
199   return buffer;
200 }
201 
ReceiveAsync(QuicByteCount max_size)202 void EventLoopConnectingClientSocket::ReceiveAsync(QuicByteCount max_size) {
203   QUICHE_DCHECK(async_visitor_);
204   QUICHE_DCHECK_GT(max_size, 0u);
205   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
206   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
207   QUICHE_DCHECK(!receive_max_size_.has_value());
208 
209   receive_max_size_ = max_size;
210 
211   FinishOrRearmAsyncReceive(ReceiveInternal());
212 }
213 
SendBlocking(std::string data)214 absl::Status EventLoopConnectingClientSocket::SendBlocking(std::string data) {
215   QUICHE_DCHECK(!data.empty());
216   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
217 
218   send_data_ = std::move(data);
219   return SendBlockingInternal();
220 }
221 
SendBlocking(quiche::QuicheMemSlice data)222 absl::Status EventLoopConnectingClientSocket::SendBlocking(
223     quiche::QuicheMemSlice data) {
224   QUICHE_DCHECK(!data.empty());
225   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
226 
227   send_data_ = std::move(data);
228   return SendBlockingInternal();
229 }
230 
SendAsync(std::string data)231 void EventLoopConnectingClientSocket::SendAsync(std::string data) {
232   QUICHE_DCHECK(!data.empty());
233   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
234 
235   send_data_ = std::move(data);
236   send_remaining_ = absl::get<std::string>(send_data_);
237 
238   FinishOrRearmAsyncSend(SendInternal());
239 }
240 
SendAsync(quiche::QuicheMemSlice data)241 void EventLoopConnectingClientSocket::SendAsync(quiche::QuicheMemSlice data) {
242   QUICHE_DCHECK(!data.empty());
243   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
244 
245   send_data_ = std::move(data);
246   send_remaining_ =
247       absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
248 
249   FinishOrRearmAsyncSend(SendInternal());
250 }
251 
OnSocketEvent(QuicEventLoop * event_loop,SocketFd fd,QuicSocketEventMask events)252 void EventLoopConnectingClientSocket::OnSocketEvent(
253     QuicEventLoop* event_loop, SocketFd fd, QuicSocketEventMask events) {
254   QUICHE_DCHECK_EQ(event_loop, event_loop_);
255   QUICHE_DCHECK_EQ(fd, descriptor_);
256 
257   if (connect_status_ == ConnectStatus::kConnecting &&
258       (events & (kSocketEventWritable | kSocketEventError))) {
259     FinishOrRearmAsyncConnect(GetConnectResult());
260     return;
261   }
262 
263   if (receive_max_size_.has_value() &&
264       (events & (kSocketEventReadable | kSocketEventError))) {
265     FinishOrRearmAsyncReceive(ReceiveInternal());
266   }
267   if (!send_remaining_.empty() &&
268       (events & (kSocketEventWritable | kSocketEventError))) {
269     FinishOrRearmAsyncSend(SendInternal());
270   }
271 }
272 
Open()273 absl::Status EventLoopConnectingClientSocket::Open() {
274   QUICHE_DCHECK_EQ(descriptor_, kInvalidSocketFd);
275   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
276   QUICHE_DCHECK(!receive_max_size_.has_value());
277   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
278   QUICHE_DCHECK(send_remaining_.empty());
279 
280   absl::StatusOr<SocketFd> descriptor =
281       socket_api::CreateSocket(peer_address_.host().address_family(), protocol_,
282                                /*blocking=*/false);
283   if (!descriptor.ok()) {
284     QUICHE_DVLOG(1) << "Failed to open socket for connection to address: "
285                     << peer_address_.ToString()
286                     << " with error: " << descriptor.status();
287     return descriptor.status();
288   }
289   QUICHE_DCHECK_NE(descriptor.value(), kInvalidSocketFd);
290 
291   descriptor_ = descriptor.value();
292 
293   if (async_visitor_) {
294     bool registered;
295     if (event_loop_->SupportsEdgeTriggered()) {
296       registered = event_loop_->RegisterSocket(
297           descriptor_,
298           kSocketEventReadable | kSocketEventWritable | kSocketEventError,
299           this);
300     } else {
301       // Just register the socket without any armed events for now.  Will rearm
302       // with specific events as needed.  Registering now before events are
303       // needed makes it easier to ensure the socket is registered only once
304       // and can always be unregistered on socket close.
305       registered = event_loop_->RegisterSocket(descriptor_, /*events=*/0, this);
306     }
307     QUICHE_DCHECK(registered);
308   }
309 
310   if (receive_buffer_size_ != 0) {
311     absl::Status status =
312         socket_api::SetReceiveBufferSize(descriptor_, receive_buffer_size_);
313     if (!status.ok()) {
314       QUICHE_LOG_FIRST_N(WARNING, 100)
315           << "Failed to set receive buffer size to: " << receive_buffer_size_
316           << " for socket to address: " << peer_address_.ToString()
317           << " with error: " << status;
318       Close();
319       return status;
320     }
321   }
322 
323   if (send_buffer_size_ != 0) {
324     absl::Status status =
325         socket_api::SetSendBufferSize(descriptor_, send_buffer_size_);
326     if (!status.ok()) {
327       QUICHE_LOG_FIRST_N(WARNING, 100)
328           << "Failed to set send buffer size to: " << send_buffer_size_
329           << " for socket to address: " << peer_address_.ToString()
330           << " with error: " << status;
331       Close();
332       return status;
333     }
334   }
335 
336   return absl::OkStatus();
337 }
338 
Close()339 void EventLoopConnectingClientSocket::Close() {
340   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
341 
342   bool unregistered = event_loop_->UnregisterSocket(descriptor_);
343   QUICHE_DCHECK_EQ(unregistered, !!async_visitor_);
344 
345   absl::Status status = socket_api::Close(descriptor_);
346   if (!status.ok()) {
347     QUICHE_LOG_FIRST_N(WARNING, 100)
348         << "Could not close socket to address: " << peer_address_.ToString()
349         << " with error: " << status;
350   }
351 
352   descriptor_ = kInvalidSocketFd;
353 }
354 
DoInitialConnect()355 absl::Status EventLoopConnectingClientSocket::DoInitialConnect() {
356   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
357   QUICHE_DCHECK(connect_status_ == ConnectStatus::kNotConnected);
358   QUICHE_DCHECK(!receive_max_size_.has_value());
359   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
360 
361   absl::Status connect_result = socket_api::Connect(descriptor_, peer_address_);
362 
363   if (connect_result.ok()) {
364     connect_status_ = ConnectStatus::kConnected;
365   } else if (absl::IsUnavailable(connect_result)) {
366     connect_status_ = ConnectStatus::kConnecting;
367   } else {
368     QUICHE_DVLOG(1) << "Synchronously failed to connect socket to address: "
369                     << peer_address_.ToString()
370                     << " with error: " << connect_result;
371     Close();
372     connect_status_ = ConnectStatus::kNotConnected;
373   }
374 
375   return connect_result;
376 }
377 
GetConnectResult()378 absl::Status EventLoopConnectingClientSocket::GetConnectResult() {
379   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
380   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
381   QUICHE_DCHECK(!receive_max_size_.has_value());
382   QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
383 
384   absl::Status error = socket_api::GetSocketError(descriptor_);
385 
386   if (!error.ok()) {
387     QUICHE_DVLOG(1) << "Asynchronously failed to connect socket to address: "
388                     << peer_address_.ToString() << " with error: " << error;
389     Close();
390     connect_status_ = ConnectStatus::kNotConnected;
391     return error;
392   }
393 
394   // Peek at one byte to confirm the connection is actually alive. Motivation:
395   // 1) Plausibly could have a lot of cases where the connection operation
396   //    itself technically succeeds but the socket then quickly fails.  Don't
397   //    want to claim connection success here if, by the time this code is
398   //    running after event triggers and such, the socket has already failed.
399   //    Lot of undefined room around whether or not such errors would be saved
400   //    into SO_ERROR and returned by socket_api::GetSocketError().
401   // 2) With the various platforms and event systems involved, less than 100%
402   //    trust that it's impossible to end up in this method before the async
403   //    connect has completed/errored. Given that Connect() and GetSocketError()
404   //    does not difinitevely differentiate between success and
405   //    still-in-progress, and given that there's a very simple and performant
406   //    way to positively confirm the socket is connected (peek), do that here.
407   //    (Could consider making the not-connected case a QUIC_BUG if a way is
408   //    found to differentiate it from (1).)
409   absl::StatusOr<bool> peek_data = OneBytePeek();
410   if (peek_data.ok() || absl::IsUnavailable(peek_data.status())) {
411     connect_status_ = ConnectStatus::kConnected;
412   } else {
413     error = peek_data.status();
414     QUICHE_LOG_FIRST_N(WARNING, 100)
415         << "Socket to address: " << peer_address_.ToString()
416         << " signalled writable after connect and no connect error found, "
417            "but socket does not appear connected with error: "
418         << error;
419     Close();
420     connect_status_ = ConnectStatus::kNotConnected;
421   }
422 
423   return error;
424 }
425 
FinishOrRearmAsyncConnect(absl::Status status)426 void EventLoopConnectingClientSocket::FinishOrRearmAsyncConnect(
427     absl::Status status) {
428   if (absl::IsUnavailable(status)) {
429     if (!event_loop_->SupportsEdgeTriggered()) {
430       bool result = event_loop_->RearmSocket(
431           descriptor_, kSocketEventWritable | kSocketEventError);
432       QUICHE_DCHECK(result);
433     }
434     QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnecting);
435   } else {
436     QUICHE_DCHECK(connect_status_ != ConnectStatus::kConnecting);
437     async_visitor_->ConnectComplete(status);
438   }
439 }
440 
441 absl::StatusOr<quiche::QuicheMemSlice>
ReceiveInternal()442 EventLoopConnectingClientSocket::ReceiveInternal() {
443   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
444   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
445   QUICHE_CHECK(receive_max_size_.has_value());
446   QUICHE_DCHECK_GE(receive_max_size_.value(), 1u);
447   QUICHE_DCHECK_LE(receive_max_size_.value(),
448                    std::numeric_limits<size_t>::max());
449 
450   // Before allocating a buffer, do a 1-byte peek to determine if needed.
451   if (receive_max_size_.value() > 1) {
452     absl::StatusOr<bool> peek_data = OneBytePeek();
453     if (!peek_data.ok()) {
454       if (!absl::IsUnavailable(peek_data.status())) {
455         receive_max_size_.reset();
456       }
457       return peek_data.status();
458     } else if (!peek_data.value()) {
459       receive_max_size_.reset();
460       return quiche::QuicheMemSlice();
461     }
462   }
463 
464   quiche::QuicheBuffer buffer(buffer_allocator_, receive_max_size_.value());
465   absl::StatusOr<absl::Span<char>> received = socket_api::Receive(
466       descriptor_, absl::MakeSpan(buffer.data(), buffer.size()));
467 
468   if (received.ok()) {
469     QUICHE_DCHECK_LE(received.value().size(), buffer.size());
470     QUICHE_DCHECK_EQ(received.value().data(), buffer.data());
471 
472     receive_max_size_.reset();
473     return quiche::QuicheMemSlice(
474         quiche::QuicheBuffer(buffer.Release(), received.value().size()));
475   } else {
476     if (!absl::IsUnavailable(received.status())) {
477       QUICHE_DVLOG(1) << "Failed to receive from socket to address: "
478                       << peer_address_.ToString()
479                       << " with error: " << received.status();
480       receive_max_size_.reset();
481     }
482     return received.status();
483   }
484 }
485 
FinishOrRearmAsyncReceive(absl::StatusOr<quiche::QuicheMemSlice> buffer)486 void EventLoopConnectingClientSocket::FinishOrRearmAsyncReceive(
487     absl::StatusOr<quiche::QuicheMemSlice> buffer) {
488   QUICHE_DCHECK(async_visitor_);
489   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
490 
491   if (!buffer.ok() && absl::IsUnavailable(buffer.status())) {
492     if (!event_loop_->SupportsEdgeTriggered()) {
493       bool result = event_loop_->RearmSocket(
494           descriptor_, kSocketEventReadable | kSocketEventError);
495       QUICHE_DCHECK(result);
496     }
497     QUICHE_DCHECK(receive_max_size_.has_value());
498   } else {
499     QUICHE_DCHECK(!receive_max_size_.has_value());
500     async_visitor_->ReceiveComplete(std::move(buffer));
501   }
502 }
503 
OneBytePeek()504 absl::StatusOr<bool> EventLoopConnectingClientSocket::OneBytePeek() {
505   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
506 
507   char peek_buffer;
508   absl::StatusOr<absl::Span<char>> peek_received = socket_api::Receive(
509       descriptor_, absl::MakeSpan(&peek_buffer, /*size=*/1), /*peek=*/true);
510   if (!peek_received.ok()) {
511     return peek_received.status();
512   } else {
513     return !peek_received.value().empty();
514   }
515 }
516 
SendBlockingInternal()517 absl::Status EventLoopConnectingClientSocket::SendBlockingInternal() {
518   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
519   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
520   QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
521   QUICHE_DCHECK(send_remaining_.empty());
522 
523   absl::Status status =
524       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/true);
525   if (!status.ok()) {
526     QUICHE_LOG_FIRST_N(WARNING, 100)
527         << "Failed to set socket to address: " << peer_address_.ToString()
528         << " as blocking for send with error: " << status;
529     send_data_ = absl::monostate();
530     return status;
531   }
532 
533   if (absl::holds_alternative<std::string>(send_data_)) {
534     send_remaining_ = absl::get<std::string>(send_data_);
535   } else {
536     send_remaining_ =
537         absl::get<quiche::QuicheMemSlice>(send_data_).AsStringView();
538   }
539 
540   status = SendInternal();
541   if (absl::IsUnavailable(status)) {
542     QUICHE_LOG_FIRST_N(ERROR, 100)
543         << "Non-blocking send for should-be blocking socket to address:"
544         << peer_address_.ToString();
545     send_data_ = absl::monostate();
546     send_remaining_ = "";
547   } else {
548     QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
549     QUICHE_DCHECK(send_remaining_.empty());
550   }
551 
552   absl::Status set_non_blocking_status =
553       socket_api::SetSocketBlocking(descriptor_, /*blocking=*/false);
554   if (!set_non_blocking_status.ok()) {
555     QUICHE_LOG_FIRST_N(WARNING, 100)
556         << "Failed to return socket to address: " << peer_address_.ToString()
557         << " to non-blocking after send with error: "
558         << set_non_blocking_status;
559     return set_non_blocking_status;
560   }
561 
562   return status;
563 }
564 
SendInternal()565 absl::Status EventLoopConnectingClientSocket::SendInternal() {
566   QUICHE_DCHECK_NE(descriptor_, kInvalidSocketFd);
567   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
568   QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
569   QUICHE_DCHECK(!send_remaining_.empty());
570 
571   // Repeat send until all data sent, unavailable, or error.
572   while (!send_remaining_.empty()) {
573     absl::StatusOr<absl::string_view> remainder =
574         socket_api::Send(descriptor_, send_remaining_);
575 
576     if (remainder.ok()) {
577       QUICHE_DCHECK(remainder.value().empty() ||
578                     (remainder.value().data() >= send_remaining_.data() &&
579                      remainder.value().data() <
580                          send_remaining_.data() + send_remaining_.size()));
581       QUICHE_DCHECK(remainder.value().empty() ||
582                     (remainder.value().data() + remainder.value().size() ==
583                      send_remaining_.data() + send_remaining_.size()));
584       send_remaining_ = remainder.value();
585     } else {
586       if (!absl::IsUnavailable(remainder.status())) {
587         QUICHE_DVLOG(1) << "Failed to send to socket to address: "
588                         << peer_address_.ToString()
589                         << " with error: " << remainder.status();
590         send_data_ = absl::monostate();
591         send_remaining_ = "";
592       }
593       return remainder.status();
594     }
595   }
596 
597   send_data_ = absl::monostate();
598   return absl::OkStatus();
599 }
600 
FinishOrRearmAsyncSend(absl::Status status)601 void EventLoopConnectingClientSocket::FinishOrRearmAsyncSend(
602     absl::Status status) {
603   QUICHE_DCHECK(async_visitor_);
604   QUICHE_DCHECK(connect_status_ == ConnectStatus::kConnected);
605 
606   if (absl::IsUnavailable(status)) {
607     if (!event_loop_->SupportsEdgeTriggered()) {
608       bool result = event_loop_->RearmSocket(
609           descriptor_, kSocketEventWritable | kSocketEventError);
610       QUICHE_DCHECK(result);
611     }
612     QUICHE_DCHECK(!absl::holds_alternative<absl::monostate>(send_data_));
613     QUICHE_DCHECK(!send_remaining_.empty());
614   } else {
615     QUICHE_DCHECK(absl::holds_alternative<absl::monostate>(send_data_));
616     QUICHE_DCHECK(send_remaining_.empty());
617     async_visitor_->SendComplete(status);
618   }
619 }
620 
621 }  // namespace quic
622