1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "nacl_io/socket/udp_node.h"
6
7 #include <errno.h>
8 #include <string.h>
9
10 #include <algorithm>
11
12 #include "nacl_io/pepper_interface.h"
13 #include "nacl_io/socket/packet.h"
14 #include "nacl_io/socket/udp_event_emitter.h"
15 #include "nacl_io/stream/stream_fs.h"
16
17 namespace {
18 const size_t kMaxPacketSize = 65536;
19 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
20 }
21
22 namespace nacl_io {
23
24 class UdpWork : public StreamFs::Work {
25 public:
UdpWork(const ScopedUdpEventEmitter & emitter)26 explicit UdpWork(const ScopedUdpEventEmitter& emitter)
27 : StreamFs::Work(emitter->stream()->stream()),
28 emitter_(emitter),
29 packet_(NULL) {}
30
~UdpWork()31 ~UdpWork() { delete packet_; }
32
UDPInterface()33 UDPSocketInterface* UDPInterface() {
34 return filesystem()->ppapi()->GetUDPSocketInterface();
35 }
36
37 protected:
38 ScopedUdpEventEmitter emitter_;
39 Packet* packet_;
40 };
41
42 class UdpSendWork : public UdpWork {
43 public:
UdpSendWork(const ScopedUdpEventEmitter & emitter,const ScopedSocketNode & node)44 explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
45 const ScopedSocketNode& node)
46 : UdpWork(emitter), node_(node) {}
47
Start(int32_t val)48 virtual bool Start(int32_t val) {
49 AUTO_LOCK(emitter_->GetLock());
50
51 // Does the stream exist, and can it send?
52 if (!node_->TestStreamFlags(SSF_CAN_SEND))
53 return false;
54
55 packet_ = emitter_->ReadTXPacket_Locked();
56 if (NULL == packet_)
57 return false;
58
59 int err = UDPInterface()->SendTo(node_->socket_resource(),
60 packet_->buffer(),
61 packet_->len(),
62 packet_->addr(),
63 filesystem()->GetRunCompletion(this));
64 if (err != PP_OK_COMPLETIONPENDING) {
65 // Anything else, we should assume the socket has gone bad.
66 node_->SetError_Locked(err);
67 return false;
68 }
69
70 node_->SetStreamFlags(SSF_SENDING);
71 return true;
72 }
73
Run(int32_t length_error)74 virtual void Run(int32_t length_error) {
75 AUTO_LOCK(emitter_->GetLock());
76
77 if (length_error < 0) {
78 node_->SetError_Locked(length_error);
79 return;
80 }
81
82 // If we did send, then Q more work.
83 node_->ClearStreamFlags(SSF_SENDING);
84 node_->QueueOutput();
85 }
86
87 private:
88 // We assume that transmits will always complete. If the upstream
89 // actually back pressures, enough to prevent the Send callback
90 // from triggering, this resource may never go away.
91 ScopedSocketNode node_;
92 };
93
94 class UdpRecvWork : public UdpWork {
95 public:
UdpRecvWork(const ScopedUdpEventEmitter & emitter)96 explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
97 : UdpWork(emitter) {
98 data_ = new char[kMaxPacketSize];
99 }
100
~UdpRecvWork()101 ~UdpRecvWork() { delete[] data_; }
102
Start(int32_t val)103 virtual bool Start(int32_t val) {
104 AUTO_LOCK(emitter_->GetLock());
105 UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
106
107 // Does the stream exist, and can it recv?
108 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
109 return false;
110
111 // Check if we are already receiving.
112 if (stream->TestStreamFlags(SSF_RECVING))
113 return false;
114
115 stream->SetStreamFlags(SSF_RECVING);
116 int err = UDPInterface()->RecvFrom(stream->socket_resource(),
117 data_,
118 kMaxPacketSize,
119 &addr_,
120 filesystem()->GetRunCompletion(this));
121 if (err != PP_OK_COMPLETIONPENDING) {
122 stream->SetError_Locked(err);
123 return false;
124 }
125
126 return true;
127 }
128
Run(int32_t length_error)129 virtual void Run(int32_t length_error) {
130 AUTO_LOCK(emitter_->GetLock());
131 UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
132 if (NULL == stream)
133 return;
134
135 // On successful receive we queue more input
136 if (length_error > 0) {
137 Packet* packet = new Packet(filesystem()->ppapi());
138 packet->Copy(data_, length_error, addr_);
139 emitter_->WriteRXPacket_Locked(packet);
140 stream->ClearStreamFlags(SSF_RECVING);
141 stream->QueueInput();
142 } else {
143 stream->SetError_Locked(length_error);
144 }
145 }
146
147 private:
148 char* data_;
149 PP_Resource addr_;
150 };
151
UdpNode(Filesystem * filesystem)152 UdpNode::UdpNode(Filesystem* filesystem)
153 : SocketNode(filesystem),
154 emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
155 emitter_->AttachStream(this);
156 }
157
Destroy()158 void UdpNode::Destroy() {
159 emitter_->DetachStream();
160 SocketNode::Destroy();
161 }
162
GetEventEmitter()163 UdpEventEmitter* UdpNode::GetEventEmitter() {
164 return emitter_.get();
165 }
166
Init(int open_flags)167 Error UdpNode::Init(int open_flags) {
168 Error err = SocketNode::Init(open_flags);
169 if (err != 0)
170 return err;
171
172 if (UDPInterface() == NULL)
173 return EACCES;
174
175 socket_resource_ =
176 UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
177 if (0 == socket_resource_)
178 return EACCES;
179
180 return 0;
181 }
182
QueueInput()183 void UdpNode::QueueInput() {
184 UdpRecvWork* work = new UdpRecvWork(emitter_);
185 stream()->EnqueueWork(work);
186 }
187
QueueOutput()188 void UdpNode::QueueOutput() {
189 if (!TestStreamFlags(SSF_CAN_SEND))
190 return;
191
192 if (TestStreamFlags(SSF_SENDING))
193 return;
194
195 UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
196 stream()->EnqueueWork(work);
197 }
198
Bind(const struct sockaddr * addr,socklen_t len)199 Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
200 if (0 == socket_resource_)
201 return EBADF;
202
203 /* Only bind once. */
204 if (IsBound())
205 return EINVAL;
206
207 PP_Resource out_addr = SockAddrToResource(addr, len);
208 if (0 == out_addr)
209 return EINVAL;
210
211 int err =
212 UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
213 filesystem_->ppapi()->ReleaseResource(out_addr);
214 if (err != 0)
215 return PPErrorToErrno(err);
216
217 // Get the address that was actually bound (in case addr was 0.0.0.0:0).
218 out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
219 if (out_addr == 0)
220 return EINVAL;
221
222 // Now that we are bound, we can start sending and receiving.
223 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
224 QueueInput();
225
226 local_addr_ = out_addr;
227 return 0;
228 }
229
Connect(const HandleAttr & attr,const struct sockaddr * addr,socklen_t len)230 Error UdpNode::Connect(const HandleAttr& attr,
231 const struct sockaddr* addr,
232 socklen_t len) {
233 if (0 == socket_resource_)
234 return EBADF;
235
236 /* Connect for UDP is the default dest, it's legal to change it. */
237 if (remote_addr_ != 0) {
238 filesystem_->ppapi()->ReleaseResource(remote_addr_);
239 remote_addr_ = 0;
240 }
241
242 remote_addr_ = SockAddrToResource(addr, len);
243 if (0 == remote_addr_)
244 return EINVAL;
245
246 return 0;
247 }
248
Recv_Locked(void * buf,size_t len,PP_Resource * out_addr,int * out_len)249 Error UdpNode::Recv_Locked(void* buf,
250 size_t len,
251 PP_Resource* out_addr,
252 int* out_len) {
253 Packet* packet = emitter_->ReadRXPacket_Locked();
254 *out_len = 0;
255 *out_addr = 0;
256
257 if (packet) {
258 int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
259 memcpy(buf, packet->buffer(), capped_len);
260
261 if (packet->addr() != 0) {
262 filesystem_->ppapi()->AddRefResource(packet->addr());
263 *out_addr = packet->addr();
264 }
265
266 *out_len = capped_len;
267 delete packet;
268 return 0;
269 }
270
271 // Should never happen, Recv_Locked should not be called
272 // unless already in a POLLIN state.
273 return EBADF;
274 }
275
Send_Locked(const void * buf,size_t len,PP_Resource addr,int * out_len)276 Error UdpNode::Send_Locked(const void* buf,
277 size_t len,
278 PP_Resource addr,
279 int* out_len) {
280 if (!IsBound()) {
281 // Pepper requires a socket to be bound before it can send.
282 sockaddr_in addr;
283 addr.sin_family = AF_INET;
284 addr.sin_port = 0;
285 memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
286 Error err =
287 Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
288 if (err != 0)
289 return err;
290 }
291
292 *out_len = 0;
293 int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
294 Packet* packet = new Packet(filesystem_->ppapi());
295 packet->Copy(buf, capped_len, addr);
296
297 emitter_->WriteTXPacket_Locked(packet);
298 *out_len = capped_len;
299 return 0;
300 }
301
302 } // namespace nacl_io
303