• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "jingle/glue/pseudotcp_adapter.h"
6 
7 #include "base/compiler_specific.h"
8 #include "base/logging.h"
9 #include "base/time/time.h"
10 #include "net/base/address_list.h"
11 #include "net/base/completion_callback.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/net_util.h"
15 
16 using cricket::PseudoTcp;
17 
18 namespace {
19 const int kReadBufferSize = 65536;  // Maximum size of a packet.
20 const uint16 kDefaultMtu = 1280;
21 }  // namespace
22 
23 namespace jingle_glue {
24 
25 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
26                                public base::RefCounted<Core> {
27  public:
28   Core(net::Socket* socket);
29 
30   // Functions used to implement net::StreamSocket.
31   int Read(net::IOBuffer* buffer, int buffer_size,
32            const net::CompletionCallback& callback);
33   int Write(net::IOBuffer* buffer, int buffer_size,
34             const net::CompletionCallback& callback);
35   int Connect(const net::CompletionCallback& callback);
36   void Disconnect();
37   bool IsConnected() const;
38 
39   // cricket::IPseudoTcpNotify interface.
40   // These notifications are triggered from NotifyPacket.
41   virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE;
42   virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE;
43   virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE;
44   // This is triggered by NotifyClock or NotifyPacket.
45   virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE;
46   // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
47   virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
48                                      const char* buffer, size_t len) OVERRIDE;
49 
50   void SetAckDelay(int delay_ms);
51   void SetNoDelay(bool no_delay);
52   void SetReceiveBufferSize(int32 size);
53   void SetSendBufferSize(int32 size);
54   void SetWriteWaitsForSend(bool write_waits_for_send);
55 
56   void DeleteSocket();
57 
58  private:
59   friend class base::RefCounted<Core>;
60   virtual ~Core();
61 
62   // These are invoked by the underlying Socket, and may trigger callbacks.
63   // They hold a reference to |this| while running, to protect from deletion.
64   void OnRead(int result);
65   void OnWritten(int result);
66 
67   // These may trigger callbacks, so the holder must hold a reference on
68   // the stack while calling them.
69   void DoReadFromSocket();
70   void HandleReadResults(int result);
71   void HandleTcpClock();
72 
73   // Checks if current write has completed in the write-waits-for-send
74   // mode.
75   void CheckWriteComplete();
76 
77   // This re-sets |timer| without triggering callbacks.
78   void AdjustClock();
79 
80   net::CompletionCallback connect_callback_;
81   net::CompletionCallback read_callback_;
82   net::CompletionCallback write_callback_;
83 
84   cricket::PseudoTcp pseudo_tcp_;
85   scoped_ptr<net::Socket> socket_;
86 
87   scoped_refptr<net::IOBuffer> read_buffer_;
88   int read_buffer_size_;
89   scoped_refptr<net::IOBuffer> write_buffer_;
90   int write_buffer_size_;
91 
92   // Whether we need to wait for data to be sent before completing write.
93   bool write_waits_for_send_;
94 
95   // Set to true in the write-waits-for-send mode when we've
96   // successfully writtend data to the send buffer and waiting for the
97   // data to be sent to the remote end.
98   bool waiting_write_position_;
99 
100   // Number of the bytes written by the last write stored while we wait
101   // for the data to be sent (i.e. when waiting_write_position_ = true).
102   int last_write_result_;
103 
104   bool socket_write_pending_;
105   scoped_refptr<net::IOBuffer> socket_read_buffer_;
106 
107   base::OneShotTimer<Core> timer_;
108 
109   DISALLOW_COPY_AND_ASSIGN(Core);
110 };
111 
112 
Core(net::Socket * socket)113 PseudoTcpAdapter::Core::Core(net::Socket* socket)
114     : pseudo_tcp_(this, 0),
115       socket_(socket),
116       write_waits_for_send_(false),
117       waiting_write_position_(false),
118       socket_write_pending_(false) {
119   // Doesn't trigger callbacks.
120   pseudo_tcp_.NotifyMTU(kDefaultMtu);
121 }
122 
~Core()123 PseudoTcpAdapter::Core::~Core() {
124 }
125 
Read(net::IOBuffer * buffer,int buffer_size,const net::CompletionCallback & callback)126 int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size,
127                                  const net::CompletionCallback& callback) {
128   DCHECK(read_callback_.is_null());
129 
130   // Reference the Core in case a callback deletes the adapter.
131   scoped_refptr<Core> core(this);
132 
133   int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
134   if (result < 0) {
135     result = net::MapSystemError(pseudo_tcp_.GetError());
136     DCHECK(result < 0);
137   }
138 
139   if (result == net::ERR_IO_PENDING) {
140     read_buffer_ = buffer;
141     read_buffer_size_ = buffer_size;
142     read_callback_ = callback;
143   }
144 
145   AdjustClock();
146 
147   return result;
148 }
149 
Write(net::IOBuffer * buffer,int buffer_size,const net::CompletionCallback & callback)150 int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size,
151                                   const net::CompletionCallback& callback) {
152   DCHECK(write_callback_.is_null());
153 
154   // Reference the Core in case a callback deletes the adapter.
155   scoped_refptr<Core> core(this);
156 
157   int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
158   if (result < 0) {
159     result = net::MapSystemError(pseudo_tcp_.GetError());
160     DCHECK(result < 0);
161   }
162 
163   AdjustClock();
164 
165   if (result == net::ERR_IO_PENDING) {
166     write_buffer_ = buffer;
167     write_buffer_size_ = buffer_size;
168     write_callback_ = callback;
169     return result;
170   }
171 
172   if (result < 0)
173     return result;
174 
175   // Need to wait until the data is sent to the peer when
176   // send-confirmation mode is enabled.
177   if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
178     DCHECK(!waiting_write_position_);
179     waiting_write_position_ = true;
180     last_write_result_ = result;
181     write_buffer_ = buffer;
182     write_buffer_size_ = buffer_size;
183     write_callback_ = callback;
184     return net::ERR_IO_PENDING;
185   }
186 
187   return result;
188 }
189 
Connect(const net::CompletionCallback & callback)190 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) {
191   DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
192 
193   // Reference the Core in case a callback deletes the adapter.
194   scoped_refptr<Core> core(this);
195 
196   // Start the connection attempt.
197   int result = pseudo_tcp_.Connect();
198   if (result < 0)
199     return net::ERR_FAILED;
200 
201   AdjustClock();
202 
203   connect_callback_ = callback;
204   DoReadFromSocket();
205 
206   return net::ERR_IO_PENDING;
207 }
208 
Disconnect()209 void PseudoTcpAdapter::Core::Disconnect() {
210   // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
211   read_callback_.Reset();
212   read_buffer_ = NULL;
213   write_callback_.Reset();
214   write_buffer_ = NULL;
215   connect_callback_.Reset();
216 
217   // TODO(wez): Connect should succeed if called after Disconnect, which
218   // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
219   // and create a new one in Connect.
220   // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
221   // effect.  This should be addressed in PseudoTcp, really.
222   // In the meantime we can fake OnTcpClosed notification and tear down the
223   // PseudoTcp.
224   pseudo_tcp_.Close(true);
225 }
226 
IsConnected() const227 bool PseudoTcpAdapter::Core::IsConnected() const {
228   return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
229 }
230 
OnTcpOpen(PseudoTcp * tcp)231 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
232   DCHECK(tcp == &pseudo_tcp_);
233 
234   if (!connect_callback_.is_null()) {
235     net::CompletionCallback callback = connect_callback_;
236     connect_callback_.Reset();
237     callback.Run(net::OK);
238   }
239 
240   OnTcpReadable(tcp);
241   OnTcpWriteable(tcp);
242 }
243 
OnTcpReadable(PseudoTcp * tcp)244 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
245   DCHECK_EQ(tcp, &pseudo_tcp_);
246   if (read_callback_.is_null())
247     return;
248 
249   int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
250   if (result < 0) {
251     result = net::MapSystemError(pseudo_tcp_.GetError());
252     DCHECK(result < 0);
253     if (result == net::ERR_IO_PENDING)
254       return;
255   }
256 
257   AdjustClock();
258 
259   net::CompletionCallback callback = read_callback_;
260   read_callback_.Reset();
261   read_buffer_ = NULL;
262   callback.Run(result);
263 }
264 
OnTcpWriteable(PseudoTcp * tcp)265 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
266   DCHECK_EQ(tcp, &pseudo_tcp_);
267   if (write_callback_.is_null())
268     return;
269 
270   if (waiting_write_position_) {
271     CheckWriteComplete();
272     return;
273   }
274 
275   int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
276   if (result < 0) {
277     result = net::MapSystemError(pseudo_tcp_.GetError());
278     DCHECK(result < 0);
279     if (result == net::ERR_IO_PENDING)
280       return;
281   }
282 
283   AdjustClock();
284 
285   if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
286     DCHECK(!waiting_write_position_);
287     waiting_write_position_ = true;
288     last_write_result_ = result;
289     return;
290   }
291 
292   net::CompletionCallback callback = write_callback_;
293   write_callback_.Reset();
294   write_buffer_ = NULL;
295   callback.Run(result);
296 }
297 
OnTcpClosed(PseudoTcp * tcp,uint32 error)298 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
299   DCHECK_EQ(tcp, &pseudo_tcp_);
300 
301   if (!connect_callback_.is_null()) {
302     net::CompletionCallback callback = connect_callback_;
303     connect_callback_.Reset();
304     callback.Run(net::MapSystemError(error));
305   }
306 
307   if (!read_callback_.is_null()) {
308     net::CompletionCallback callback = read_callback_;
309     read_callback_.Reset();
310     callback.Run(net::MapSystemError(error));
311   }
312 
313   if (!write_callback_.is_null()) {
314     net::CompletionCallback callback = write_callback_;
315     write_callback_.Reset();
316     callback.Run(net::MapSystemError(error));
317   }
318 }
319 
SetAckDelay(int delay_ms)320 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
321   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
322 }
323 
SetNoDelay(bool no_delay)324 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
325   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
326 }
327 
SetReceiveBufferSize(int32 size)328 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) {
329   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
330 }
331 
SetSendBufferSize(int32 size)332 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
333   pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
334 }
335 
SetWriteWaitsForSend(bool write_waits_for_send)336 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
337   write_waits_for_send_ = write_waits_for_send;
338 }
339 
DeleteSocket()340 void PseudoTcpAdapter::Core::DeleteSocket() {
341   socket_.reset();
342 }
343 
TcpWritePacket(PseudoTcp * tcp,const char * buffer,size_t len)344 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
345     PseudoTcp* tcp,
346     const char* buffer,
347     size_t len) {
348   DCHECK_EQ(tcp, &pseudo_tcp_);
349 
350   // If we already have a write pending, we behave like a congested network,
351   // returning success for the write, but dropping the packet.  PseudoTcp will
352   // back-off and retransmit, adjusting for the perceived congestion.
353   if (socket_write_pending_)
354     return IPseudoTcpNotify::WR_SUCCESS;
355 
356   scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
357   memcpy(write_buffer->data(), buffer, len);
358 
359   // Our underlying socket is datagram-oriented, which means it should either
360   // send exactly as many bytes as we requested, or fail.
361   int result;
362   if (socket_.get()) {
363     result = socket_->Write(
364         write_buffer.get(),
365         len,
366         base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
367   } else {
368     result = net::ERR_CONNECTION_CLOSED;
369   }
370   if (result == net::ERR_IO_PENDING) {
371     socket_write_pending_ = true;
372     return IPseudoTcpNotify::WR_SUCCESS;
373   } else if (result == net::ERR_MSG_TOO_BIG) {
374     return IPseudoTcpNotify::WR_TOO_LARGE;
375   } else if (result < 0) {
376     return IPseudoTcpNotify::WR_FAIL;
377   } else {
378     return IPseudoTcpNotify::WR_SUCCESS;
379   }
380 }
381 
DoReadFromSocket()382 void PseudoTcpAdapter::Core::DoReadFromSocket() {
383   if (!socket_read_buffer_.get())
384     socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
385 
386   int result = 1;
387   while (socket_.get() && result > 0) {
388     result = socket_->Read(
389         socket_read_buffer_.get(),
390         kReadBufferSize,
391         base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
392     if (result != net::ERR_IO_PENDING)
393       HandleReadResults(result);
394   }
395 }
396 
HandleReadResults(int result)397 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
398   if (result <= 0) {
399     LOG(ERROR) << "Read returned " << result;
400     return;
401   }
402 
403   // TODO(wez): Disconnect on failure of NotifyPacket?
404   pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
405   AdjustClock();
406 
407   CheckWriteComplete();
408 }
409 
OnRead(int result)410 void PseudoTcpAdapter::Core::OnRead(int result) {
411   // Reference the Core in case a callback deletes the adapter.
412   scoped_refptr<Core> core(this);
413 
414   HandleReadResults(result);
415   if (result >= 0)
416     DoReadFromSocket();
417 }
418 
OnWritten(int result)419 void PseudoTcpAdapter::Core::OnWritten(int result) {
420   // Reference the Core in case a callback deletes the adapter.
421   scoped_refptr<Core> core(this);
422 
423   socket_write_pending_ = false;
424   if (result < 0) {
425     LOG(WARNING) << "Write failed. Error code: " << result;
426   }
427 }
428 
AdjustClock()429 void PseudoTcpAdapter::Core::AdjustClock() {
430   long timeout = 0;
431   if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
432     timer_.Stop();
433     timer_.Start(FROM_HERE,
434                  base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
435                  &PseudoTcpAdapter::Core::HandleTcpClock);
436   }
437 }
438 
HandleTcpClock()439 void PseudoTcpAdapter::Core::HandleTcpClock() {
440   // Reference the Core in case a callback deletes the adapter.
441   scoped_refptr<Core> core(this);
442 
443   pseudo_tcp_.NotifyClock(PseudoTcp::Now());
444   AdjustClock();
445 
446   CheckWriteComplete();
447 }
448 
CheckWriteComplete()449 void PseudoTcpAdapter::Core::CheckWriteComplete() {
450   if (!write_callback_.is_null() && waiting_write_position_) {
451     if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
452       waiting_write_position_ = false;
453 
454       net::CompletionCallback callback = write_callback_;
455       write_callback_.Reset();
456       write_buffer_ = NULL;
457       callback.Run(last_write_result_);
458     }
459   }
460 }
461 
462 // Public interface implemention.
463 
PseudoTcpAdapter(net::Socket * socket)464 PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket)
465     : core_(new Core(socket)) {
466 }
467 
~PseudoTcpAdapter()468 PseudoTcpAdapter::~PseudoTcpAdapter() {
469   Disconnect();
470 
471   // Make sure that the underlying socket is destroyed before PseudoTcp.
472   core_->DeleteSocket();
473 }
474 
Read(net::IOBuffer * buffer,int buffer_size,const net::CompletionCallback & callback)475 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
476                            const net::CompletionCallback& callback) {
477   DCHECK(CalledOnValidThread());
478   return core_->Read(buffer, buffer_size, callback);
479 }
480 
Write(net::IOBuffer * buffer,int buffer_size,const net::CompletionCallback & callback)481 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
482                             const net::CompletionCallback& callback) {
483   DCHECK(CalledOnValidThread());
484   return core_->Write(buffer, buffer_size, callback);
485 }
486 
SetReceiveBufferSize(int32 size)487 bool PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
488   DCHECK(CalledOnValidThread());
489 
490   core_->SetReceiveBufferSize(size);
491   return false;
492 }
493 
SetSendBufferSize(int32 size)494 bool PseudoTcpAdapter::SetSendBufferSize(int32 size) {
495   DCHECK(CalledOnValidThread());
496 
497   core_->SetSendBufferSize(size);
498   return false;
499 }
500 
Connect(const net::CompletionCallback & callback)501 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
502   DCHECK(CalledOnValidThread());
503 
504   // net::StreamSocket requires that Connect return OK if already connected.
505   if (IsConnected())
506     return net::OK;
507 
508   return core_->Connect(callback);
509 }
510 
Disconnect()511 void PseudoTcpAdapter::Disconnect() {
512   DCHECK(CalledOnValidThread());
513   core_->Disconnect();
514 }
515 
IsConnected() const516 bool PseudoTcpAdapter::IsConnected() const {
517   return core_->IsConnected();
518 }
519 
IsConnectedAndIdle() const520 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
521   DCHECK(CalledOnValidThread());
522   NOTIMPLEMENTED();
523   return false;
524 }
525 
GetPeerAddress(net::IPEndPoint * address) const526 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
527   DCHECK(CalledOnValidThread());
528 
529   // We don't have a meaningful peer address, but we can't return an
530   // error, so we return a INADDR_ANY instead.
531   net::IPAddressNumber ip_address(net::kIPv4AddressSize);
532   *address = net::IPEndPoint(ip_address, 0);
533   return net::OK;
534 }
535 
GetLocalAddress(net::IPEndPoint * address) const536 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
537   DCHECK(CalledOnValidThread());
538   NOTIMPLEMENTED();
539   return net::ERR_FAILED;
540 }
541 
NetLog() const542 const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
543   DCHECK(CalledOnValidThread());
544   return net_log_;
545 }
546 
SetSubresourceSpeculation()547 void PseudoTcpAdapter::SetSubresourceSpeculation() {
548   DCHECK(CalledOnValidThread());
549   NOTIMPLEMENTED();
550 }
551 
SetOmniboxSpeculation()552 void PseudoTcpAdapter::SetOmniboxSpeculation() {
553   DCHECK(CalledOnValidThread());
554   NOTIMPLEMENTED();
555 }
556 
WasEverUsed() const557 bool PseudoTcpAdapter::WasEverUsed() const {
558   DCHECK(CalledOnValidThread());
559   NOTIMPLEMENTED();
560   return true;
561 }
562 
UsingTCPFastOpen() const563 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
564   DCHECK(CalledOnValidThread());
565   return false;
566 }
567 
WasNpnNegotiated() const568 bool PseudoTcpAdapter::WasNpnNegotiated() const {
569   DCHECK(CalledOnValidThread());
570   return false;
571 }
572 
GetNegotiatedProtocol() const573 net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
574   DCHECK(CalledOnValidThread());
575   return net::kProtoUnknown;
576 }
577 
GetSSLInfo(net::SSLInfo * ssl_info)578 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
579   DCHECK(CalledOnValidThread());
580   return false;
581 }
582 
SetAckDelay(int delay_ms)583 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
584   DCHECK(CalledOnValidThread());
585   core_->SetAckDelay(delay_ms);
586 }
587 
SetNoDelay(bool no_delay)588 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
589   DCHECK(CalledOnValidThread());
590   core_->SetNoDelay(no_delay);
591 }
592 
SetWriteWaitsForSend(bool write_waits_for_send)593 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
594   DCHECK(CalledOnValidThread());
595   core_->SetWriteWaitsForSend(write_waits_for_send);
596 }
597 
598 }  // namespace jingle_glue
599