1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "nacl_io/ossocket.h"
6 #ifdef PROVIDES_SOCKET_API
7
8 #include <assert.h>
9 #include <errno.h>
10 #include <string.h>
11 #include <algorithm>
12
13 #include "nacl_io/kernel_handle.h"
14 #include "nacl_io/pepper_interface.h"
15 #include "nacl_io/socket/tcp_node.h"
16 #include "nacl_io/stream/stream_fs.h"
17
18 namespace {
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
21 }
22
23 namespace nacl_io {
24
25 class TcpWork : public StreamFs::Work {
26 public:
TcpWork(const ScopedTcpEventEmitter & emitter)27 explicit TcpWork(const ScopedTcpEventEmitter& emitter)
28 : StreamFs::Work(emitter->stream()->stream()),
29 emitter_(emitter),
30 data_(NULL) {}
31
~TcpWork()32 ~TcpWork() { delete[] data_; }
33
TCPInterface()34 TCPSocketInterface* TCPInterface() {
35 return filesystem()->ppapi()->GetTCPSocketInterface();
36 }
37
38 protected:
39 ScopedTcpEventEmitter emitter_;
40 char* data_;
41 };
42
43 class TcpSendWork : public TcpWork {
44 public:
TcpSendWork(const ScopedTcpEventEmitter & emitter,const ScopedSocketNode & stream)45 explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
46 const ScopedSocketNode& stream)
47 : TcpWork(emitter), node_(stream) {}
48
Start(int32_t val)49 virtual bool Start(int32_t val) {
50 AUTO_LOCK(emitter_->GetLock());
51
52 // Does the stream exist, and can it send?
53 if (!node_->TestStreamFlags(SSF_CAN_SEND))
54 return false;
55
56 // Check if we are already sending.
57 if (node_->TestStreamFlags(SSF_SENDING))
58 return false;
59
60 size_t tx_data_avail = emitter_->BytesInOutputFIFO();
61 int capped_len = std::min(tx_data_avail, kMaxPacketSize);
62 if (capped_len == 0)
63 return false;
64
65 data_ = new char[capped_len];
66 emitter_->ReadOut_Locked(data_, capped_len);
67
68 int err = TCPInterface()->Write(node_->socket_resource(),
69 data_,
70 capped_len,
71 filesystem()->GetRunCompletion(this));
72
73 if (err != PP_OK_COMPLETIONPENDING) {
74 // Anything else, we should assume the socket has gone bad.
75 node_->SetError_Locked(err);
76 return false;
77 }
78
79 node_->SetStreamFlags(SSF_SENDING);
80 return true;
81 }
82
Run(int32_t length_error)83 virtual void Run(int32_t length_error) {
84 AUTO_LOCK(emitter_->GetLock());
85
86 if (length_error < 0) {
87 // Send failed, mark the socket as bad
88 node_->SetError_Locked(length_error);
89 return;
90 }
91
92 // If we did send, then Q more work.
93 node_->ClearStreamFlags(SSF_SENDING);
94 node_->QueueOutput();
95 }
96
97 private:
98 // We assume that transmits will always complete. If the upstream
99 // actually back pressures, enough to prevent the Send callback
100 // from triggering, this resource may never go away.
101 ScopedSocketNode node_;
102 };
103
104 class TcpRecvWork : public TcpWork {
105 public:
TcpRecvWork(const ScopedTcpEventEmitter & emitter)106 explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
107 : TcpWork(emitter) {}
108
Start(int32_t val)109 virtual bool Start(int32_t val) {
110 AUTO_LOCK(emitter_->GetLock());
111 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
112
113 // Does the stream exist, and can it recv?
114 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
115 return false;
116
117 // If we are not currently receiving
118 if (stream->TestStreamFlags(SSF_RECVING))
119 return false;
120
121 size_t rx_space_avail = emitter_->SpaceInInputFIFO();
122 int capped_len =
123 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
124
125 if (capped_len == 0)
126 return false;
127
128 data_ = new char[capped_len];
129 int err = TCPInterface()->Read(stream->socket_resource(),
130 data_,
131 capped_len,
132 filesystem()->GetRunCompletion(this));
133 if (err != PP_OK_COMPLETIONPENDING) {
134 // Anything else, we should assume the socket has gone bad.
135 stream->SetError_Locked(err);
136 return false;
137 }
138
139 stream->SetStreamFlags(SSF_RECVING);
140 return true;
141 }
142
Run(int32_t length_error)143 virtual void Run(int32_t length_error) {
144 AUTO_LOCK(emitter_->GetLock());
145 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
146
147 if (!stream)
148 return;
149
150 if (length_error <= 0) {
151 stream->SetError_Locked(length_error);
152 return;
153 }
154
155 // If we successfully received, queue more input
156 emitter_->WriteIn_Locked(data_, length_error);
157 stream->ClearStreamFlags(SSF_RECVING);
158 stream->QueueInput();
159 }
160 };
161
162 class TCPAcceptWork : public StreamFs::Work {
163 public:
TCPAcceptWork(StreamFs * stream,const ScopedTcpEventEmitter & emitter)164 explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
165 : StreamFs::Work(stream), emitter_(emitter) {}
166
TCPInterface()167 TCPSocketInterface* TCPInterface() {
168 return filesystem()->ppapi()->GetTCPSocketInterface();
169 }
170
Start(int32_t val)171 virtual bool Start(int32_t val) {
172 AUTO_LOCK(emitter_->GetLock());
173 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
174
175 // Does the stream exist, and can it accept?
176 if (NULL == node)
177 return false;
178
179 // If we are not currently accepting
180 if (!node->TestStreamFlags(SSF_LISTENING))
181 return false;
182
183 int err = TCPInterface()->Accept(node->socket_resource(),
184 &new_socket_,
185 filesystem()->GetRunCompletion(this));
186
187 if (err != PP_OK_COMPLETIONPENDING) {
188 // Anything else, we should assume the socket has gone bad.
189 node->SetError_Locked(err);
190 return false;
191 }
192
193 return true;
194 }
195
Run(int32_t error)196 virtual void Run(int32_t error) {
197 AUTO_LOCK(emitter_->GetLock());
198 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
199
200 if (node == NULL)
201 return;
202
203 if (error != PP_OK) {
204 node->SetError_Locked(error);
205 return;
206 }
207
208 emitter_->SetAcceptedSocket_Locked(new_socket_);
209 }
210
211 protected:
212 PP_Resource new_socket_;
213 ScopedTcpEventEmitter emitter_;
214 };
215
216 class TCPConnectWork : public StreamFs::Work {
217 public:
TCPConnectWork(StreamFs * stream,const ScopedTcpEventEmitter & emitter)218 explicit TCPConnectWork(StreamFs* stream,
219 const ScopedTcpEventEmitter& emitter)
220 : StreamFs::Work(stream), emitter_(emitter) {}
221
TCPInterface()222 TCPSocketInterface* TCPInterface() {
223 return filesystem()->ppapi()->GetTCPSocketInterface();
224 }
225
Start(int32_t val)226 virtual bool Start(int32_t val) {
227 AUTO_LOCK(emitter_->GetLock());
228 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
229
230 // Does the stream exist, and can it connect?
231 if (NULL == node)
232 return false;
233
234 int err = TCPInterface()->Connect(node->socket_resource(),
235 node->remote_addr(),
236 filesystem()->GetRunCompletion(this));
237 if (err != PP_OK_COMPLETIONPENDING) {
238 // Anything else, we should assume the socket has gone bad.
239 node->SetError_Locked(err);
240 return false;
241 }
242
243 return true;
244 }
245
Run(int32_t error)246 virtual void Run(int32_t error) {
247 AUTO_LOCK(emitter_->GetLock());
248 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
249
250 if (node == NULL)
251 return;
252
253 if (error != PP_OK) {
254 node->ConnectFailed_Locked();
255 node->SetError_Locked(error);
256 return;
257 }
258
259 node->ConnectDone_Locked();
260 }
261
262 protected:
263 ScopedTcpEventEmitter emitter_;
264 };
265
TcpNode(Filesystem * filesystem)266 TcpNode::TcpNode(Filesystem* filesystem)
267 : SocketNode(filesystem),
268 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
269 tcp_nodelay_(false) {
270 emitter_->AttachStream(this);
271 }
272
TcpNode(Filesystem * filesystem,PP_Resource socket)273 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
274 : SocketNode(filesystem, socket),
275 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
276 tcp_nodelay_(false) {
277 emitter_->AttachStream(this);
278 }
279
Destroy()280 void TcpNode::Destroy() {
281 emitter_->DetachStream();
282 SocketNode::Destroy();
283 }
284
Init(int open_flags)285 Error TcpNode::Init(int open_flags) {
286 Error err = SocketNode::Init(open_flags);
287 if (err != 0)
288 return err;
289
290 if (TCPInterface() == NULL)
291 return EACCES;
292
293 if (socket_resource_ != 0) {
294 // TCP sockets that are contructed with an existing socket_resource_
295 // are those that generated from calls to Accept() and therefore are
296 // already connected.
297 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
298 ConnectDone_Locked();
299 } else {
300 socket_resource_ =
301 TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
302 if (0 == socket_resource_)
303 return EACCES;
304 SetStreamFlags(SSF_CAN_CONNECT);
305 }
306
307 return 0;
308 }
309
GetEventEmitter()310 EventEmitter* TcpNode::GetEventEmitter() {
311 return emitter_.get();
312 }
313
SetError_Locked(int pp_error_num)314 void TcpNode::SetError_Locked(int pp_error_num) {
315 SocketNode::SetError_Locked(pp_error_num);
316 emitter_->SetError_Locked();
317 }
318
GetSockOpt(int lvl,int optname,void * optval,socklen_t * len)319 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
320 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
321 AUTO_LOCK(node_lock_);
322 int value = tcp_nodelay_;
323 socklen_t value_len = static_cast<socklen_t>(sizeof(value));
324 int copy_bytes = std::min(value_len, *len);
325 memcpy(optval, &value, copy_bytes);
326 *len = value_len;
327 return 0;
328 }
329
330 return SocketNode::GetSockOpt(lvl, optname, optval, len);
331 }
332
SetNoDelay_Locked()333 Error TcpNode::SetNoDelay_Locked() {
334 if (!IsConnected())
335 return 0;
336
337 int32_t error =
338 TCPInterface()->SetOption(socket_resource_,
339 PP_TCPSOCKET_OPTION_NO_DELAY,
340 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
341 PP_BlockUntilComplete());
342 return PPErrorToErrno(error);
343 }
344
SetSockOpt(int lvl,int optname,const void * optval,socklen_t len)345 Error TcpNode::SetSockOpt(int lvl,
346 int optname,
347 const void* optval,
348 socklen_t len) {
349 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
350 if (static_cast<size_t>(len) < sizeof(int))
351 return EINVAL;
352 AUTO_LOCK(node_lock_);
353 tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
354 return SetNoDelay_Locked();
355 }
356
357 return SocketNode::SetSockOpt(lvl, optname, optval, len);
358 }
359
QueueAccept()360 void TcpNode::QueueAccept() {
361 StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
362 stream()->EnqueueWork(work);
363 }
364
QueueConnect()365 void TcpNode::QueueConnect() {
366 StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
367 stream()->EnqueueWork(work);
368 }
369
QueueInput()370 void TcpNode::QueueInput() {
371 StreamFs::Work* work = new TcpRecvWork(emitter_);
372 stream()->EnqueueWork(work);
373 }
374
QueueOutput()375 void TcpNode::QueueOutput() {
376 if (TestStreamFlags(SSF_SENDING))
377 return;
378
379 if (!TestStreamFlags(SSF_CAN_SEND))
380 return;
381
382 if (0 == emitter_->BytesInOutputFIFO())
383 return;
384
385 StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
386 stream()->EnqueueWork(work);
387 }
388
Accept(const HandleAttr & attr,PP_Resource * out_sock,struct sockaddr * addr,socklen_t * len)389 Error TcpNode::Accept(const HandleAttr& attr,
390 PP_Resource* out_sock,
391 struct sockaddr* addr,
392 socklen_t* len) {
393 EventListenerLock wait(GetEventEmitter());
394
395 if (!TestStreamFlags(SSF_LISTENING))
396 return EINVAL;
397
398 // Either block forever or not at all
399 int ms = attr.IsBlocking() ? -1 : 0;
400
401 Error err = wait.WaitOnEvent(POLLIN, ms);
402 if (ETIMEDOUT == err)
403 return EWOULDBLOCK;
404
405 int s = emitter_->GetAcceptedSocket_Locked();
406 // Non-blocking case.
407 if (s == 0)
408 return EAGAIN;
409
410 // Consume the new socket and start listening for the next one
411 *out_sock = s;
412 emitter_->ClearEvents_Locked(POLLIN);
413
414 // Set the out paramaters
415 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
416 *len = ResourceToSockAddr(remote_addr, *len, addr);
417 filesystem_->ppapi()->ReleaseResource(remote_addr);
418
419 QueueAccept();
420 return 0;
421 }
422
423 // We can not bind a client socket with PPAPI. For now we ignore the
424 // bind but report the correct address later, just in case someone is
425 // binding without really caring what the address is (for example to
426 // select a more optimized interface/route.)
Bind(const struct sockaddr * addr,socklen_t len)427 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
428 AUTO_LOCK(node_lock_);
429
430 /* Only bind once. */
431 if (IsBound())
432 return EINVAL;
433
434 local_addr_ = SockAddrToResource(addr, len);
435 int err = TCPInterface()->Bind(
436 socket_resource_, local_addr_, PP_BlockUntilComplete());
437
438 // If we fail, release the local addr resource
439 if (err != PP_OK) {
440 filesystem_->ppapi()->ReleaseResource(local_addr_);
441 local_addr_ = 0;
442 return PPErrorToErrno(err);
443 }
444
445 return 0;
446 }
447
Connect(const HandleAttr & attr,const struct sockaddr * addr,socklen_t len)448 Error TcpNode::Connect(const HandleAttr& attr,
449 const struct sockaddr* addr,
450 socklen_t len) {
451 EventListenerLock wait(GetEventEmitter());
452
453 if (TestStreamFlags(SSF_CONNECTING))
454 return EALREADY;
455
456 if (IsConnected())
457 return EISCONN;
458
459 remote_addr_ = SockAddrToResource(addr, len);
460 if (0 == remote_addr_)
461 return EINVAL;
462
463 int ms = attr.IsBlocking() ? -1 : 0;
464
465 SetStreamFlags(SSF_CONNECTING);
466 QueueConnect();
467
468 Error err = wait.WaitOnEvent(POLLOUT, ms);
469 if (ETIMEDOUT == err)
470 return EINPROGRESS;
471
472 // If we fail, release the dest addr resource
473 if (err != 0) {
474 ConnectFailed_Locked();
475 return err;
476 }
477
478 ConnectDone_Locked();
479 return 0;
480 }
481
Shutdown(int how)482 Error TcpNode::Shutdown(int how) {
483 AUTO_LOCK(node_lock_);
484 if (!IsConnected())
485 return ENOTCONN;
486 {
487 AUTO_LOCK(emitter_->GetLock());
488 emitter_->SetError_Locked();
489 }
490 return 0;
491 }
492
ConnectDone_Locked()493 void TcpNode::ConnectDone_Locked() {
494 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
495
496 // Now that we are connected, we can start sending and receiving.
497 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
498 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
499
500 emitter_->ConnectDone_Locked();
501
502 // The NODELAY option cannot be set in PPAPI before the socket
503 // is connected, but setsockopt() might have already set it.
504 SetNoDelay_Locked();
505
506 // Begin the input pump
507 QueueInput();
508 }
509
ConnectFailed_Locked()510 void TcpNode::ConnectFailed_Locked() {
511 filesystem_->ppapi()->ReleaseResource(remote_addr_);
512 remote_addr_ = 0;
513 }
514
Listen(int backlog)515 Error TcpNode::Listen(int backlog) {
516 AUTO_LOCK(node_lock_);
517 if (!IsBound())
518 return EINVAL;
519
520 int err = TCPInterface()->Listen(
521 socket_resource_, backlog, PP_BlockUntilComplete());
522 if (err != PP_OK)
523 return PPErrorToErrno(err);
524
525 ClearStreamFlags(SSF_CAN_CONNECT);
526 SetStreamFlags(SSF_LISTENING);
527 emitter_->SetListening_Locked();
528 QueueAccept();
529 return 0;
530 }
531
Recv_Locked(void * buf,size_t len,PP_Resource * out_addr,int * out_len)532 Error TcpNode::Recv_Locked(void* buf,
533 size_t len,
534 PP_Resource* out_addr,
535 int* out_len) {
536 assert(emitter_.get());
537 *out_len = emitter_->ReadIn_Locked((char*)buf, len);
538 *out_addr = remote_addr_;
539
540 // Ref the address copy we pass back.
541 filesystem_->ppapi()->AddRefResource(remote_addr_);
542 return 0;
543 }
544
545 // TCP ignores dst addr passed to send_to, and always uses bound address
Send_Locked(const void * buf,size_t len,PP_Resource,int * out_len)546 Error TcpNode::Send_Locked(const void* buf,
547 size_t len,
548 PP_Resource,
549 int* out_len) {
550 assert(emitter_.get());
551 if (emitter_->GetError_Locked())
552 return EPIPE;
553 *out_len = emitter_->WriteOut_Locked((char*)buf, len);
554 return 0;
555 }
556
557 } // namespace nacl_io
558
559 #endif // PROVIDES_SOCKET_API
560