• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "nacl_io/ossocket.h"
6 #ifdef PROVIDES_SOCKET_API
7 
8 #include <assert.h>
9 #include <errno.h>
10 #include <string.h>
11 #include <algorithm>
12 
13 #include "nacl_io/kernel_handle.h"
14 #include "nacl_io/pepper_interface.h"
15 #include "nacl_io/socket/tcp_node.h"
16 #include "nacl_io/stream/stream_fs.h"
17 
18 namespace {
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
21 }
22 
23 namespace nacl_io {
24 
25 class TcpWork : public StreamFs::Work {
26  public:
TcpWork(const ScopedTcpEventEmitter & emitter)27   explicit TcpWork(const ScopedTcpEventEmitter& emitter)
28       : StreamFs::Work(emitter->stream()->stream()),
29         emitter_(emitter),
30         data_(NULL) {}
31 
~TcpWork()32   ~TcpWork() { delete[] data_; }
33 
TCPInterface()34   TCPSocketInterface* TCPInterface() {
35     return filesystem()->ppapi()->GetTCPSocketInterface();
36   }
37 
38  protected:
39   ScopedTcpEventEmitter emitter_;
40   char* data_;
41 };
42 
43 class TcpSendWork : public TcpWork {
44  public:
TcpSendWork(const ScopedTcpEventEmitter & emitter,const ScopedSocketNode & stream)45   explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
46                        const ScopedSocketNode& stream)
47       : TcpWork(emitter), node_(stream) {}
48 
Start(int32_t val)49   virtual bool Start(int32_t val) {
50     AUTO_LOCK(emitter_->GetLock());
51 
52     // Does the stream exist, and can it send?
53     if (!node_->TestStreamFlags(SSF_CAN_SEND))
54       return false;
55 
56     // Check if we are already sending.
57     if (node_->TestStreamFlags(SSF_SENDING))
58       return false;
59 
60     size_t tx_data_avail = emitter_->BytesInOutputFIFO();
61     int capped_len = std::min(tx_data_avail, kMaxPacketSize);
62     if (capped_len == 0)
63       return false;
64 
65     data_ = new char[capped_len];
66     emitter_->ReadOut_Locked(data_, capped_len);
67 
68     int err = TCPInterface()->Write(node_->socket_resource(),
69                                     data_,
70                                     capped_len,
71                                     filesystem()->GetRunCompletion(this));
72 
73     if (err != PP_OK_COMPLETIONPENDING) {
74       // Anything else, we should assume the socket has gone bad.
75       node_->SetError_Locked(err);
76       return false;
77     }
78 
79     node_->SetStreamFlags(SSF_SENDING);
80     return true;
81   }
82 
Run(int32_t length_error)83   virtual void Run(int32_t length_error) {
84     AUTO_LOCK(emitter_->GetLock());
85 
86     if (length_error < 0) {
87       // Send failed, mark the socket as bad
88       node_->SetError_Locked(length_error);
89       return;
90     }
91 
92     // If we did send, then Q more work.
93     node_->ClearStreamFlags(SSF_SENDING);
94     node_->QueueOutput();
95   }
96 
97  private:
98   // We assume that transmits will always complete.  If the upstream
99   // actually back pressures, enough to prevent the Send callback
100   // from triggering, this resource may never go away.
101   ScopedSocketNode node_;
102 };
103 
104 class TcpRecvWork : public TcpWork {
105  public:
TcpRecvWork(const ScopedTcpEventEmitter & emitter)106   explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
107       : TcpWork(emitter) {}
108 
Start(int32_t val)109   virtual bool Start(int32_t val) {
110     AUTO_LOCK(emitter_->GetLock());
111     TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
112 
113     // Does the stream exist, and can it recv?
114     if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
115       return false;
116 
117     // If we are not currently receiving
118     if (stream->TestStreamFlags(SSF_RECVING))
119       return false;
120 
121     size_t rx_space_avail = emitter_->SpaceInInputFIFO();
122     int capped_len =
123         static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
124 
125     if (capped_len == 0)
126       return false;
127 
128     data_ = new char[capped_len];
129     int err = TCPInterface()->Read(stream->socket_resource(),
130                                    data_,
131                                    capped_len,
132                                    filesystem()->GetRunCompletion(this));
133     if (err != PP_OK_COMPLETIONPENDING) {
134       // Anything else, we should assume the socket has gone bad.
135       stream->SetError_Locked(err);
136       return false;
137     }
138 
139     stream->SetStreamFlags(SSF_RECVING);
140     return true;
141   }
142 
Run(int32_t length_error)143   virtual void Run(int32_t length_error) {
144     AUTO_LOCK(emitter_->GetLock());
145     TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
146 
147     if (!stream)
148       return;
149 
150     if (length_error <= 0) {
151       stream->SetError_Locked(length_error);
152       return;
153     }
154 
155     // If we successfully received, queue more input
156     emitter_->WriteIn_Locked(data_, length_error);
157     stream->ClearStreamFlags(SSF_RECVING);
158     stream->QueueInput();
159   }
160 };
161 
162 class TCPAcceptWork : public StreamFs::Work {
163  public:
TCPAcceptWork(StreamFs * stream,const ScopedTcpEventEmitter & emitter)164   explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
165       : StreamFs::Work(stream), emitter_(emitter) {}
166 
TCPInterface()167   TCPSocketInterface* TCPInterface() {
168     return filesystem()->ppapi()->GetTCPSocketInterface();
169   }
170 
Start(int32_t val)171   virtual bool Start(int32_t val) {
172     AUTO_LOCK(emitter_->GetLock());
173     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
174 
175     // Does the stream exist, and can it accept?
176     if (NULL == node)
177       return false;
178 
179     // If we are not currently accepting
180     if (!node->TestStreamFlags(SSF_LISTENING))
181       return false;
182 
183     int err = TCPInterface()->Accept(node->socket_resource(),
184                                      &new_socket_,
185                                      filesystem()->GetRunCompletion(this));
186 
187     if (err != PP_OK_COMPLETIONPENDING) {
188       // Anything else, we should assume the socket has gone bad.
189       node->SetError_Locked(err);
190       return false;
191     }
192 
193     return true;
194   }
195 
Run(int32_t error)196   virtual void Run(int32_t error) {
197     AUTO_LOCK(emitter_->GetLock());
198     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
199 
200     if (node == NULL)
201       return;
202 
203     if (error != PP_OK) {
204       node->SetError_Locked(error);
205       return;
206     }
207 
208     emitter_->SetAcceptedSocket_Locked(new_socket_);
209   }
210 
211  protected:
212   PP_Resource new_socket_;
213   ScopedTcpEventEmitter emitter_;
214 };
215 
216 class TCPConnectWork : public StreamFs::Work {
217  public:
TCPConnectWork(StreamFs * stream,const ScopedTcpEventEmitter & emitter)218   explicit TCPConnectWork(StreamFs* stream,
219                           const ScopedTcpEventEmitter& emitter)
220       : StreamFs::Work(stream), emitter_(emitter) {}
221 
TCPInterface()222   TCPSocketInterface* TCPInterface() {
223     return filesystem()->ppapi()->GetTCPSocketInterface();
224   }
225 
Start(int32_t val)226   virtual bool Start(int32_t val) {
227     AUTO_LOCK(emitter_->GetLock());
228     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
229 
230     // Does the stream exist, and can it connect?
231     if (NULL == node)
232       return false;
233 
234     int err = TCPInterface()->Connect(node->socket_resource(),
235                                       node->remote_addr(),
236                                       filesystem()->GetRunCompletion(this));
237     if (err != PP_OK_COMPLETIONPENDING) {
238       // Anything else, we should assume the socket has gone bad.
239       node->SetError_Locked(err);
240       return false;
241     }
242 
243     return true;
244   }
245 
Run(int32_t error)246   virtual void Run(int32_t error) {
247     AUTO_LOCK(emitter_->GetLock());
248     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
249 
250     if (node == NULL)
251       return;
252 
253     if (error != PP_OK) {
254       node->ConnectFailed_Locked();
255       node->SetError_Locked(error);
256       return;
257     }
258 
259     node->ConnectDone_Locked();
260   }
261 
262  protected:
263   ScopedTcpEventEmitter emitter_;
264 };
265 
TcpNode(Filesystem * filesystem)266 TcpNode::TcpNode(Filesystem* filesystem)
267     : SocketNode(filesystem),
268       emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
269       tcp_nodelay_(false) {
270   emitter_->AttachStream(this);
271 }
272 
TcpNode(Filesystem * filesystem,PP_Resource socket)273 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
274     : SocketNode(filesystem, socket),
275       emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
276       tcp_nodelay_(false) {
277   emitter_->AttachStream(this);
278 }
279 
Destroy()280 void TcpNode::Destroy() {
281   emitter_->DetachStream();
282   SocketNode::Destroy();
283 }
284 
Init(int open_flags)285 Error TcpNode::Init(int open_flags) {
286   Error err = SocketNode::Init(open_flags);
287   if (err != 0)
288     return err;
289 
290   if (TCPInterface() == NULL)
291     return EACCES;
292 
293   if (socket_resource_ != 0) {
294     // TCP sockets that are contructed with an existing socket_resource_
295     // are those that generated from calls to Accept() and therefore are
296     // already connected.
297     remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
298     ConnectDone_Locked();
299   } else {
300     socket_resource_ =
301         TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
302     if (0 == socket_resource_)
303       return EACCES;
304     SetStreamFlags(SSF_CAN_CONNECT);
305   }
306 
307   return 0;
308 }
309 
GetEventEmitter()310 EventEmitter* TcpNode::GetEventEmitter() {
311   return emitter_.get();
312 }
313 
SetError_Locked(int pp_error_num)314 void TcpNode::SetError_Locked(int pp_error_num) {
315   SocketNode::SetError_Locked(pp_error_num);
316   emitter_->SetError_Locked();
317 }
318 
GetSockOpt(int lvl,int optname,void * optval,socklen_t * len)319 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
320   if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
321     AUTO_LOCK(node_lock_);
322     int value = tcp_nodelay_;
323     socklen_t value_len = static_cast<socklen_t>(sizeof(value));
324     int copy_bytes = std::min(value_len, *len);
325     memcpy(optval, &value, copy_bytes);
326     *len = value_len;
327     return 0;
328   }
329 
330   return SocketNode::GetSockOpt(lvl, optname, optval, len);
331 }
332 
SetNoDelay_Locked()333 Error TcpNode::SetNoDelay_Locked() {
334   if (!IsConnected())
335     return 0;
336 
337   int32_t error =
338       TCPInterface()->SetOption(socket_resource_,
339                                 PP_TCPSOCKET_OPTION_NO_DELAY,
340                                 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
341                                 PP_BlockUntilComplete());
342   return PPErrorToErrno(error);
343 }
344 
SetSockOpt(int lvl,int optname,const void * optval,socklen_t len)345 Error TcpNode::SetSockOpt(int lvl,
346                           int optname,
347                           const void* optval,
348                           socklen_t len) {
349   if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
350     if (static_cast<size_t>(len) < sizeof(int))
351       return EINVAL;
352     AUTO_LOCK(node_lock_);
353     tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
354     return SetNoDelay_Locked();
355   }
356 
357   return SocketNode::SetSockOpt(lvl, optname, optval, len);
358 }
359 
QueueAccept()360 void TcpNode::QueueAccept() {
361   StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
362   stream()->EnqueueWork(work);
363 }
364 
QueueConnect()365 void TcpNode::QueueConnect() {
366   StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
367   stream()->EnqueueWork(work);
368 }
369 
QueueInput()370 void TcpNode::QueueInput() {
371   StreamFs::Work* work = new TcpRecvWork(emitter_);
372   stream()->EnqueueWork(work);
373 }
374 
QueueOutput()375 void TcpNode::QueueOutput() {
376   if (TestStreamFlags(SSF_SENDING))
377     return;
378 
379   if (!TestStreamFlags(SSF_CAN_SEND))
380     return;
381 
382   if (0 == emitter_->BytesInOutputFIFO())
383     return;
384 
385   StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
386   stream()->EnqueueWork(work);
387 }
388 
Accept(const HandleAttr & attr,PP_Resource * out_sock,struct sockaddr * addr,socklen_t * len)389 Error TcpNode::Accept(const HandleAttr& attr,
390                       PP_Resource* out_sock,
391                       struct sockaddr* addr,
392                       socklen_t* len) {
393   EventListenerLock wait(GetEventEmitter());
394 
395   if (!TestStreamFlags(SSF_LISTENING))
396     return EINVAL;
397 
398   // Either block forever or not at all
399   int ms = attr.IsBlocking() ? -1 : 0;
400 
401   Error err = wait.WaitOnEvent(POLLIN, ms);
402   if (ETIMEDOUT == err)
403     return EWOULDBLOCK;
404 
405   int s = emitter_->GetAcceptedSocket_Locked();
406   // Non-blocking case.
407   if (s == 0)
408     return EAGAIN;
409 
410   // Consume the new socket and start listening for the next one
411   *out_sock = s;
412   emitter_->ClearEvents_Locked(POLLIN);
413 
414   // Set the out paramaters
415   PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
416   *len = ResourceToSockAddr(remote_addr, *len, addr);
417   filesystem_->ppapi()->ReleaseResource(remote_addr);
418 
419   QueueAccept();
420   return 0;
421 }
422 
423 // We can not bind a client socket with PPAPI.  For now we ignore the
424 // bind but report the correct address later, just in case someone is
425 // binding without really caring what the address is (for example to
426 // select a more optimized interface/route.)
Bind(const struct sockaddr * addr,socklen_t len)427 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
428   AUTO_LOCK(node_lock_);
429 
430   /* Only bind once. */
431   if (IsBound())
432     return EINVAL;
433 
434   local_addr_ = SockAddrToResource(addr, len);
435   int err = TCPInterface()->Bind(
436       socket_resource_, local_addr_, PP_BlockUntilComplete());
437 
438   // If we fail, release the local addr resource
439   if (err != PP_OK) {
440     filesystem_->ppapi()->ReleaseResource(local_addr_);
441     local_addr_ = 0;
442     return PPErrorToErrno(err);
443   }
444 
445   return 0;
446 }
447 
Connect(const HandleAttr & attr,const struct sockaddr * addr,socklen_t len)448 Error TcpNode::Connect(const HandleAttr& attr,
449                        const struct sockaddr* addr,
450                        socklen_t len) {
451   EventListenerLock wait(GetEventEmitter());
452 
453   if (TestStreamFlags(SSF_CONNECTING))
454     return EALREADY;
455 
456   if (IsConnected())
457     return EISCONN;
458 
459   remote_addr_ = SockAddrToResource(addr, len);
460   if (0 == remote_addr_)
461     return EINVAL;
462 
463   int ms = attr.IsBlocking() ? -1 : 0;
464 
465   SetStreamFlags(SSF_CONNECTING);
466   QueueConnect();
467 
468   Error err = wait.WaitOnEvent(POLLOUT, ms);
469   if (ETIMEDOUT == err)
470     return EINPROGRESS;
471 
472   // If we fail, release the dest addr resource
473   if (err != 0) {
474     ConnectFailed_Locked();
475     return err;
476   }
477 
478   ConnectDone_Locked();
479   return 0;
480 }
481 
Shutdown(int how)482 Error TcpNode::Shutdown(int how) {
483   AUTO_LOCK(node_lock_);
484   if (!IsConnected())
485     return ENOTCONN;
486   {
487     AUTO_LOCK(emitter_->GetLock());
488     emitter_->SetError_Locked();
489   }
490   return 0;
491 }
492 
ConnectDone_Locked()493 void TcpNode::ConnectDone_Locked() {
494   local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
495 
496   // Now that we are connected, we can start sending and receiving.
497   ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
498   SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
499 
500   emitter_->ConnectDone_Locked();
501 
502   // The NODELAY option cannot be set in PPAPI before the socket
503   // is connected, but setsockopt() might have already set it.
504   SetNoDelay_Locked();
505 
506   // Begin the input pump
507   QueueInput();
508 }
509 
ConnectFailed_Locked()510 void TcpNode::ConnectFailed_Locked() {
511   filesystem_->ppapi()->ReleaseResource(remote_addr_);
512   remote_addr_ = 0;
513 }
514 
Listen(int backlog)515 Error TcpNode::Listen(int backlog) {
516   AUTO_LOCK(node_lock_);
517   if (!IsBound())
518     return EINVAL;
519 
520   int err = TCPInterface()->Listen(
521       socket_resource_, backlog, PP_BlockUntilComplete());
522   if (err != PP_OK)
523     return PPErrorToErrno(err);
524 
525   ClearStreamFlags(SSF_CAN_CONNECT);
526   SetStreamFlags(SSF_LISTENING);
527   emitter_->SetListening_Locked();
528   QueueAccept();
529   return 0;
530 }
531 
Recv_Locked(void * buf,size_t len,PP_Resource * out_addr,int * out_len)532 Error TcpNode::Recv_Locked(void* buf,
533                            size_t len,
534                            PP_Resource* out_addr,
535                            int* out_len) {
536   assert(emitter_.get());
537   *out_len = emitter_->ReadIn_Locked((char*)buf, len);
538   *out_addr = remote_addr_;
539 
540   // Ref the address copy we pass back.
541   filesystem_->ppapi()->AddRefResource(remote_addr_);
542   return 0;
543 }
544 
545 // TCP ignores dst addr passed to send_to, and always uses bound address
Send_Locked(const void * buf,size_t len,PP_Resource,int * out_len)546 Error TcpNode::Send_Locked(const void* buf,
547                            size_t len,
548                            PP_Resource,
549                            int* out_len) {
550   assert(emitter_.get());
551   if (emitter_->GetError_Locked())
552     return EPIPE;
553   *out_len = emitter_->WriteOut_Locked((char*)buf, len);
554   return 0;
555 }
556 
557 }  // namespace nacl_io
558 
559 #endif  // PROVIDES_SOCKET_API
560