• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "nacl_io/socket/udp_node.h"
6 
7 #include <errno.h>
8 #include <string.h>
9 
10 #include <algorithm>
11 
12 #include "nacl_io/pepper_interface.h"
13 #include "nacl_io/socket/packet.h"
14 #include "nacl_io/socket/udp_event_emitter.h"
15 #include "nacl_io/stream/stream_fs.h"
16 
17 namespace {
18 const size_t kMaxPacketSize = 65536;
19 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
20 }
21 
22 namespace nacl_io {
23 
24 class UdpWork : public StreamFs::Work {
25  public:
UdpWork(const ScopedUdpEventEmitter & emitter)26   explicit UdpWork(const ScopedUdpEventEmitter& emitter)
27       : StreamFs::Work(emitter->stream()->stream()),
28         emitter_(emitter),
29         packet_(NULL) {}
30 
~UdpWork()31   ~UdpWork() { delete packet_; }
32 
UDPInterface()33   UDPSocketInterface* UDPInterface() {
34     return filesystem()->ppapi()->GetUDPSocketInterface();
35   }
36 
37  protected:
38   ScopedUdpEventEmitter emitter_;
39   Packet* packet_;
40 };
41 
42 class UdpSendWork : public UdpWork {
43  public:
UdpSendWork(const ScopedUdpEventEmitter & emitter,const ScopedSocketNode & node)44   explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
45                        const ScopedSocketNode& node)
46       : UdpWork(emitter), node_(node) {}
47 
Start(int32_t val)48   virtual bool Start(int32_t val) {
49     AUTO_LOCK(emitter_->GetLock());
50 
51     // Does the stream exist, and can it send?
52     if (!node_->TestStreamFlags(SSF_CAN_SEND))
53       return false;
54 
55     packet_ = emitter_->ReadTXPacket_Locked();
56     if (NULL == packet_)
57       return false;
58 
59     int err = UDPInterface()->SendTo(node_->socket_resource(),
60                                      packet_->buffer(),
61                                      packet_->len(),
62                                      packet_->addr(),
63                                      filesystem()->GetRunCompletion(this));
64     if (err != PP_OK_COMPLETIONPENDING) {
65       // Anything else, we should assume the socket has gone bad.
66       node_->SetError_Locked(err);
67       return false;
68     }
69 
70     node_->SetStreamFlags(SSF_SENDING);
71     return true;
72   }
73 
Run(int32_t length_error)74   virtual void Run(int32_t length_error) {
75     AUTO_LOCK(emitter_->GetLock());
76 
77     if (length_error < 0) {
78       node_->SetError_Locked(length_error);
79       return;
80     }
81 
82     // If we did send, then Q more work.
83     node_->ClearStreamFlags(SSF_SENDING);
84     node_->QueueOutput();
85   }
86 
87  private:
88   // We assume that transmits will always complete.  If the upstream
89   // actually back pressures, enough to prevent the Send callback
90   // from triggering, this resource may never go away.
91   ScopedSocketNode node_;
92 };
93 
94 class UdpRecvWork : public UdpWork {
95  public:
UdpRecvWork(const ScopedUdpEventEmitter & emitter)96   explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
97       : UdpWork(emitter) {
98     data_ = new char[kMaxPacketSize];
99   }
100 
~UdpRecvWork()101   ~UdpRecvWork() { delete[] data_; }
102 
Start(int32_t val)103   virtual bool Start(int32_t val) {
104     AUTO_LOCK(emitter_->GetLock());
105     UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
106 
107     // Does the stream exist, and can it recv?
108     if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
109       return false;
110 
111     // Check if we are already receiving.
112     if (stream->TestStreamFlags(SSF_RECVING))
113       return false;
114 
115     stream->SetStreamFlags(SSF_RECVING);
116     int err = UDPInterface()->RecvFrom(stream->socket_resource(),
117                                        data_,
118                                        kMaxPacketSize,
119                                        &addr_,
120                                        filesystem()->GetRunCompletion(this));
121     if (err != PP_OK_COMPLETIONPENDING) {
122       stream->SetError_Locked(err);
123       return false;
124     }
125 
126     return true;
127   }
128 
Run(int32_t length_error)129   virtual void Run(int32_t length_error) {
130     AUTO_LOCK(emitter_->GetLock());
131     UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
132     if (NULL == stream)
133       return;
134 
135     // On successful receive we queue more input
136     if (length_error > 0) {
137       Packet* packet = new Packet(filesystem()->ppapi());
138       packet->Copy(data_, length_error, addr_);
139       emitter_->WriteRXPacket_Locked(packet);
140       stream->ClearStreamFlags(SSF_RECVING);
141       stream->QueueInput();
142     } else {
143       stream->SetError_Locked(length_error);
144     }
145   }
146 
147  private:
148   char* data_;
149   PP_Resource addr_;
150 };
151 
UdpNode(Filesystem * filesystem)152 UdpNode::UdpNode(Filesystem* filesystem)
153     : SocketNode(filesystem),
154       emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
155   emitter_->AttachStream(this);
156 }
157 
Destroy()158 void UdpNode::Destroy() {
159   emitter_->DetachStream();
160   SocketNode::Destroy();
161 }
162 
GetEventEmitter()163 UdpEventEmitter* UdpNode::GetEventEmitter() {
164   return emitter_.get();
165 }
166 
Init(int open_flags)167 Error UdpNode::Init(int open_flags) {
168   Error err = SocketNode::Init(open_flags);
169   if (err != 0)
170     return err;
171 
172   if (UDPInterface() == NULL)
173     return EACCES;
174 
175   socket_resource_ =
176       UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
177   if (0 == socket_resource_)
178     return EACCES;
179 
180   return 0;
181 }
182 
QueueInput()183 void UdpNode::QueueInput() {
184   UdpRecvWork* work = new UdpRecvWork(emitter_);
185   stream()->EnqueueWork(work);
186 }
187 
QueueOutput()188 void UdpNode::QueueOutput() {
189   if (!TestStreamFlags(SSF_CAN_SEND))
190     return;
191 
192   if (TestStreamFlags(SSF_SENDING))
193     return;
194 
195   UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
196   stream()->EnqueueWork(work);
197 }
198 
Bind(const struct sockaddr * addr,socklen_t len)199 Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
200   if (0 == socket_resource_)
201     return EBADF;
202 
203   /* Only bind once. */
204   if (IsBound())
205     return EINVAL;
206 
207   PP_Resource out_addr = SockAddrToResource(addr, len);
208   if (0 == out_addr)
209     return EINVAL;
210 
211   int err =
212       UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
213   filesystem_->ppapi()->ReleaseResource(out_addr);
214   if (err != 0)
215     return PPErrorToErrno(err);
216 
217   // Get the address that was actually bound (in case addr was 0.0.0.0:0).
218   out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
219   if (out_addr == 0)
220     return EINVAL;
221 
222   // Now that we are bound, we can start sending and receiving.
223   SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
224   QueueInput();
225 
226   local_addr_ = out_addr;
227   return 0;
228 }
229 
Connect(const HandleAttr & attr,const struct sockaddr * addr,socklen_t len)230 Error UdpNode::Connect(const HandleAttr& attr,
231                        const struct sockaddr* addr,
232                        socklen_t len) {
233   if (0 == socket_resource_)
234     return EBADF;
235 
236   /* Connect for UDP is the default dest, it's legal to change it. */
237   if (remote_addr_ != 0) {
238     filesystem_->ppapi()->ReleaseResource(remote_addr_);
239     remote_addr_ = 0;
240   }
241 
242   remote_addr_ = SockAddrToResource(addr, len);
243   if (0 == remote_addr_)
244     return EINVAL;
245 
246   return 0;
247 }
248 
Recv_Locked(void * buf,size_t len,PP_Resource * out_addr,int * out_len)249 Error UdpNode::Recv_Locked(void* buf,
250                            size_t len,
251                            PP_Resource* out_addr,
252                            int* out_len) {
253   Packet* packet = emitter_->ReadRXPacket_Locked();
254   *out_len = 0;
255   *out_addr = 0;
256 
257   if (packet) {
258     int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
259     memcpy(buf, packet->buffer(), capped_len);
260 
261     if (packet->addr() != 0) {
262       filesystem_->ppapi()->AddRefResource(packet->addr());
263       *out_addr = packet->addr();
264     }
265 
266     *out_len = capped_len;
267     delete packet;
268     return 0;
269   }
270 
271   // Should never happen, Recv_Locked should not be called
272   // unless already in a POLLIN state.
273   return EBADF;
274 }
275 
Send_Locked(const void * buf,size_t len,PP_Resource addr,int * out_len)276 Error UdpNode::Send_Locked(const void* buf,
277                            size_t len,
278                            PP_Resource addr,
279                            int* out_len) {
280   if (!IsBound()) {
281     // Pepper requires a socket to be bound before it can send.
282     sockaddr_in addr;
283     addr.sin_family = AF_INET;
284     addr.sin_port = 0;
285     memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
286     Error err =
287         Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
288     if (err != 0)
289       return err;
290   }
291 
292   *out_len = 0;
293   int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
294   Packet* packet = new Packet(filesystem_->ppapi());
295   packet->Copy(buf, capped_len, addr);
296 
297   emitter_->WriteTXPacket_Locked(packet);
298   *out_len = capped_len;
299   return 0;
300 }
301 
302 }  // namespace nacl_io
303