1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/channel.h"
6
7 #include <errno.h>
8 #include <sys/socket.h>
9
10 #include <algorithm>
11 #include <deque>
12 #include <limits>
13 #include <memory>
14
15 #include "base/bind.h"
16 #include "base/location.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/synchronization/lock.h"
21 #include "base/task_runner.h"
22 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
23 #include "mojo/edk/embedder/platform_handle_vector.h"
24
25 #if !defined(OS_NACL)
26 #include <sys/uio.h>
27 #endif
28
29 namespace mojo {
30 namespace edk {
31
32 namespace {
33
34 const size_t kMaxBatchReadCapacity = 256 * 1024;
35
36 // A view over a Channel::Message object. The write queue uses these since
37 // large messages may need to be sent in chunks.
38 class MessageView {
39 public:
40 // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)41 MessageView(Channel::MessagePtr message, size_t offset)
42 : message_(std::move(message)),
43 offset_(offset),
44 handles_(message_->TakeHandlesForTransport()) {
45 DCHECK_GT(message_->data_num_bytes(), offset_);
46 }
47
MessageView(MessageView && other)48 MessageView(MessageView&& other) { *this = std::move(other); }
49
operator =(MessageView && other)50 MessageView& operator=(MessageView&& other) {
51 message_ = std::move(other.message_);
52 offset_ = other.offset_;
53 handles_ = std::move(other.handles_);
54 return *this;
55 }
56
~MessageView()57 ~MessageView() {}
58
data() const59 const void* data() const {
60 return static_cast<const char*>(message_->data()) + offset_;
61 }
62
data_num_bytes() const63 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
64
data_offset() const65 size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)66 void advance_data_offset(size_t num_bytes) {
67 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
68 offset_ += num_bytes;
69 }
70
TakeHandles()71 ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
TakeMessage()72 Channel::MessagePtr TakeMessage() { return std::move(message_); }
73
SetHandles(ScopedPlatformHandleVectorPtr handles)74 void SetHandles(ScopedPlatformHandleVectorPtr handles) {
75 handles_ = std::move(handles);
76 }
77
78 private:
79 Channel::MessagePtr message_;
80 size_t offset_;
81 ScopedPlatformHandleVectorPtr handles_;
82
83 DISALLOW_COPY_AND_ASSIGN(MessageView);
84 };
85
86 class ChannelPosix : public Channel,
87 public base::MessageLoop::DestructionObserver,
88 public base::MessageLoopForIO::Watcher {
89 public:
ChannelPosix(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)90 ChannelPosix(Delegate* delegate,
91 ConnectionParams connection_params,
92 scoped_refptr<base::TaskRunner> io_task_runner)
93 : Channel(delegate),
94 self_(this),
95 handle_(connection_params.TakeChannelHandle()),
96 io_task_runner_(io_task_runner)
97 #if defined(OS_MACOSX)
98 ,
99 handles_to_close_(new PlatformHandleVector)
100 #endif
101 {
102 CHECK(handle_.is_valid());
103 }
104
Start()105 void Start() override {
106 if (io_task_runner_->RunsTasksOnCurrentThread()) {
107 StartOnIOThread();
108 } else {
109 io_task_runner_->PostTask(
110 FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
111 }
112 }
113
ShutDownImpl()114 void ShutDownImpl() override {
115 // Always shut down asynchronously when called through the public interface.
116 io_task_runner_->PostTask(
117 FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
118 }
119
Write(MessagePtr message)120 void Write(MessagePtr message) override {
121 bool write_error = false;
122 {
123 base::AutoLock lock(write_lock_);
124 if (reject_writes_)
125 return;
126 if (outgoing_messages_.empty()) {
127 if (!WriteNoLock(MessageView(std::move(message), 0)))
128 reject_writes_ = write_error = true;
129 } else {
130 outgoing_messages_.emplace_back(std::move(message), 0);
131 }
132 }
133 if (write_error) {
134 // Do not synchronously invoke OnError(). Write() may have been called by
135 // the delegate and we don't want to re-enter it.
136 io_task_runner_->PostTask(FROM_HERE,
137 base::Bind(&ChannelPosix::OnError, this));
138 }
139 }
140
LeakHandle()141 void LeakHandle() override {
142 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
143 leak_handle_ = true;
144 }
145
GetReadPlatformHandles(size_t num_handles,const void * extra_header,size_t extra_header_size,ScopedPlatformHandleVectorPtr * handles)146 bool GetReadPlatformHandles(
147 size_t num_handles,
148 const void* extra_header,
149 size_t extra_header_size,
150 ScopedPlatformHandleVectorPtr* handles) override {
151 if (num_handles > std::numeric_limits<uint16_t>::max())
152 return false;
153 #if defined(OS_MACOSX) && !defined(OS_IOS)
154 // On OSX, we can have mach ports which are located in the extra header
155 // section.
156 using MachPortsEntry = Channel::Message::MachPortsEntry;
157 using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
158 CHECK(extra_header_size >=
159 sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry));
160 const MachPortsExtraHeader* mach_ports_header =
161 reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
162 size_t num_mach_ports = mach_ports_header->num_ports;
163 CHECK(num_mach_ports <= num_handles);
164 if (incoming_platform_handles_.size() + num_mach_ports < num_handles) {
165 handles->reset();
166 return true;
167 }
168
169 handles->reset(new PlatformHandleVector(num_handles));
170 const MachPortsEntry* mach_ports = mach_ports_header->entries;
171 for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
172 if (mach_port_index < num_mach_ports &&
173 mach_ports[mach_port_index].index == i) {
174 (*handles)->at(i) = PlatformHandle(
175 static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port));
176 CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH);
177 // These are actually just Mach port names until they're resolved from
178 // the remote process.
179 (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME;
180 mach_port_index++;
181 } else {
182 CHECK(!incoming_platform_handles_.empty());
183 (*handles)->at(i) = incoming_platform_handles_.front();
184 incoming_platform_handles_.pop_front();
185 }
186 }
187 #else
188 if (incoming_platform_handles_.size() < num_handles) {
189 handles->reset();
190 return true;
191 }
192
193 handles->reset(new PlatformHandleVector(num_handles));
194 for (size_t i = 0; i < num_handles; ++i) {
195 (*handles)->at(i) = incoming_platform_handles_.front();
196 incoming_platform_handles_.pop_front();
197 }
198 #endif
199
200 return true;
201 }
202
203 private:
~ChannelPosix()204 ~ChannelPosix() override {
205 DCHECK(!read_watcher_);
206 DCHECK(!write_watcher_);
207 for (auto handle : incoming_platform_handles_)
208 handle.CloseIfNecessary();
209 }
210
StartOnIOThread()211 void StartOnIOThread() {
212 DCHECK(!read_watcher_);
213 DCHECK(!write_watcher_);
214 read_watcher_.reset(
215 new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE));
216 base::MessageLoop::current()->AddDestructionObserver(this);
217 if (handle_.get().needs_connection) {
218 base::MessageLoopForIO::current()->WatchFileDescriptor(
219 handle_.get().handle, false /* persistent */,
220 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
221 } else {
222 write_watcher_.reset(
223 new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE));
224 base::MessageLoopForIO::current()->WatchFileDescriptor(
225 handle_.get().handle, true /* persistent */,
226 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
227 base::AutoLock lock(write_lock_);
228 FlushOutgoingMessagesNoLock();
229 }
230 }
231
WaitForWriteOnIOThread()232 void WaitForWriteOnIOThread() {
233 base::AutoLock lock(write_lock_);
234 WaitForWriteOnIOThreadNoLock();
235 }
236
WaitForWriteOnIOThreadNoLock()237 void WaitForWriteOnIOThreadNoLock() {
238 if (pending_write_)
239 return;
240 if (!write_watcher_)
241 return;
242 if (io_task_runner_->RunsTasksOnCurrentThread()) {
243 pending_write_ = true;
244 base::MessageLoopForIO::current()->WatchFileDescriptor(
245 handle_.get().handle, false /* persistent */,
246 base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
247 } else {
248 io_task_runner_->PostTask(
249 FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
250 }
251 }
252
ShutDownOnIOThread()253 void ShutDownOnIOThread() {
254 base::MessageLoop::current()->RemoveDestructionObserver(this);
255
256 read_watcher_.reset();
257 write_watcher_.reset();
258 if (leak_handle_)
259 ignore_result(handle_.release());
260 handle_.reset();
261 #if defined(OS_MACOSX)
262 handles_to_close_.reset();
263 #endif
264
265 // May destroy the |this| if it was the last reference.
266 self_ = nullptr;
267 }
268
269 // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()270 void WillDestroyCurrentMessageLoop() override {
271 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
272 if (self_)
273 ShutDownOnIOThread();
274 }
275
276 // base::MessageLoopForIO::Watcher:
OnFileCanReadWithoutBlocking(int fd)277 void OnFileCanReadWithoutBlocking(int fd) override {
278 CHECK_EQ(fd, handle_.get().handle);
279 if (handle_.get().needs_connection) {
280 #if !defined(OS_NACL)
281 read_watcher_.reset();
282 base::MessageLoop::current()->RemoveDestructionObserver(this);
283
284 ScopedPlatformHandle accept_fd;
285 ServerAcceptConnection(handle_.get(), &accept_fd);
286 if (!accept_fd.is_valid()) {
287 OnError();
288 return;
289 }
290 handle_ = std::move(accept_fd);
291 StartOnIOThread();
292 #else
293 NOTREACHED();
294 #endif
295 return;
296 }
297
298 bool read_error = false;
299 size_t next_read_size = 0;
300 size_t buffer_capacity = 0;
301 size_t total_bytes_read = 0;
302 size_t bytes_read = 0;
303 do {
304 buffer_capacity = next_read_size;
305 char* buffer = GetReadBuffer(&buffer_capacity);
306 DCHECK_GT(buffer_capacity, 0u);
307
308 ssize_t read_result = PlatformChannelRecvmsg(
309 handle_.get(),
310 buffer,
311 buffer_capacity,
312 &incoming_platform_handles_);
313
314 if (read_result > 0) {
315 bytes_read = static_cast<size_t>(read_result);
316 total_bytes_read += bytes_read;
317 if (!OnReadComplete(bytes_read, &next_read_size)) {
318 read_error = true;
319 break;
320 }
321 } else if (read_result == 0 ||
322 (errno != EAGAIN && errno != EWOULDBLOCK)) {
323 read_error = true;
324 break;
325 }
326 } while (bytes_read == buffer_capacity &&
327 total_bytes_read < kMaxBatchReadCapacity &&
328 next_read_size > 0);
329 if (read_error) {
330 // Stop receiving read notifications.
331 read_watcher_.reset();
332
333 OnError();
334 }
335 }
336
OnFileCanWriteWithoutBlocking(int fd)337 void OnFileCanWriteWithoutBlocking(int fd) override {
338 bool write_error = false;
339 {
340 base::AutoLock lock(write_lock_);
341 pending_write_ = false;
342 if (!FlushOutgoingMessagesNoLock())
343 reject_writes_ = write_error = true;
344 }
345 if (write_error)
346 OnError();
347 }
348
349 // Attempts to write a message directly to the channel. If the full message
350 // cannot be written, it's queued and a wait is initiated to write the message
351 // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)352 bool WriteNoLock(MessageView message_view) {
353 if (handle_.get().needs_connection) {
354 outgoing_messages_.emplace_front(std::move(message_view));
355 return true;
356 }
357 size_t bytes_written = 0;
358 do {
359 message_view.advance_data_offset(bytes_written);
360
361 ssize_t result;
362 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
363 if (handles && handles->size()) {
364 iovec iov = {
365 const_cast<void*>(message_view.data()),
366 message_view.data_num_bytes()
367 };
368 // TODO: Handle lots of handles.
369 result = PlatformChannelSendmsgWithHandles(
370 handle_.get(), &iov, 1, handles->data(), handles->size());
371 if (result >= 0) {
372 #if defined(OS_MACOSX)
373 // There is a bug on OSX which makes it dangerous to close
374 // a file descriptor while it is in transit. So instead we
375 // store the file descriptor in a set and send a message to
376 // the recipient, which is queued AFTER the message that
377 // sent the FD. The recipient will reply to the message,
378 // letting us know that it is now safe to close the file
379 // descriptor. For more information, see:
380 // http://crbug.com/298276
381 std::vector<int> fds;
382 for (auto& handle : *handles)
383 fds.push_back(handle.handle);
384 {
385 base::AutoLock l(handles_to_close_lock_);
386 for (auto& handle : *handles)
387 handles_to_close_->push_back(handle);
388 }
389 MessagePtr fds_message(
390 new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
391 Message::MessageType::HANDLES_SENT));
392 memcpy(fds_message->mutable_payload(), fds.data(),
393 sizeof(fds[0]) * fds.size());
394 outgoing_messages_.emplace_back(std::move(fds_message), 0);
395 handles->clear();
396 #else
397 handles.reset();
398 #endif // defined(OS_MACOSX)
399 }
400 } else {
401 result = PlatformChannelWrite(handle_.get(), message_view.data(),
402 message_view.data_num_bytes());
403 }
404
405 if (result < 0) {
406 if (errno != EAGAIN && errno != EWOULDBLOCK
407 #if defined(OS_MACOSX)
408 // On OS X if sendmsg() is trying to send fds between processes and
409 // there isn't enough room in the output buffer to send the fd
410 // structure over atomically then EMSGSIZE is returned.
411 //
412 // EMSGSIZE presents a problem since the system APIs can only call
413 // us when there's room in the socket buffer and not when there is
414 // "enough" room.
415 //
416 // The current behavior is to return to the event loop when EMSGSIZE
417 // is received and hopefull service another FD. This is however
418 // still technically a busy wait since the event loop will call us
419 // right back until the receiver has read enough data to allow
420 // passing the FD over atomically.
421 && errno != EMSGSIZE
422 #endif
423 ) {
424 return false;
425 }
426 message_view.SetHandles(std::move(handles));
427 outgoing_messages_.emplace_front(std::move(message_view));
428 WaitForWriteOnIOThreadNoLock();
429 return true;
430 }
431
432 bytes_written = static_cast<size_t>(result);
433 } while (bytes_written < message_view.data_num_bytes());
434
435 return FlushOutgoingMessagesNoLock();
436 }
437
FlushOutgoingMessagesNoLock()438 bool FlushOutgoingMessagesNoLock() {
439 std::deque<MessageView> messages;
440 std::swap(outgoing_messages_, messages);
441
442 while (!messages.empty()) {
443 if (!WriteNoLock(std::move(messages.front())))
444 return false;
445
446 messages.pop_front();
447 if (!outgoing_messages_.empty()) {
448 // The message was requeued by WriteNoLock(), so we have to wait for
449 // pipe to become writable again. Repopulate the message queue and exit.
450 // If sending the message triggered any control messages, they may be
451 // in |outgoing_messages_| in addition to or instead of the message
452 // being sent.
453 std::swap(messages, outgoing_messages_);
454 while (!messages.empty()) {
455 outgoing_messages_.push_front(std::move(messages.back()));
456 messages.pop_back();
457 }
458 return true;
459 }
460 }
461
462 return true;
463 }
464
465 #if defined(OS_MACOSX)
OnControlMessage(Message::MessageType message_type,const void * payload,size_t payload_size,ScopedPlatformHandleVectorPtr handles)466 bool OnControlMessage(Message::MessageType message_type,
467 const void* payload,
468 size_t payload_size,
469 ScopedPlatformHandleVectorPtr handles) override {
470 switch (message_type) {
471 case Message::MessageType::HANDLES_SENT: {
472 if (payload_size == 0)
473 break;
474 MessagePtr message(new Channel::Message(
475 payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
476 memcpy(message->mutable_payload(), payload, payload_size);
477 Write(std::move(message));
478 return true;
479 }
480
481 case Message::MessageType::HANDLES_SENT_ACK: {
482 size_t num_fds = payload_size / sizeof(int);
483 if (num_fds == 0 || payload_size % sizeof(int) != 0)
484 break;
485
486 const int* fds = reinterpret_cast<const int*>(payload);
487 if (!CloseHandles(fds, num_fds))
488 break;
489 return true;
490 }
491
492 default:
493 break;
494 }
495
496 return false;
497 }
498
499 // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
500 // |fds| does not match a sequence of handles in |handles_to_close_|.
CloseHandles(const int * fds,size_t num_fds)501 bool CloseHandles(const int* fds, size_t num_fds) {
502 base::AutoLock l(handles_to_close_lock_);
503 if (!num_fds)
504 return false;
505
506 auto start =
507 std::find_if(handles_to_close_->begin(), handles_to_close_->end(),
508 [&fds](const PlatformHandle& handle) {
509 return handle.handle == fds[0];
510 });
511 if (start == handles_to_close_->end())
512 return false;
513
514 auto it = start;
515 size_t i = 0;
516 // The FDs in the message should match a sequence of handles in
517 // |handles_to_close_|.
518 for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) {
519 if (it->handle != fds[i])
520 return false;
521
522 it->CloseIfNecessary();
523 }
524 if (i != num_fds)
525 return false;
526
527 handles_to_close_->erase(start, it);
528 return true;
529 }
530 #endif // defined(OS_MACOSX)
531
532 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
533 scoped_refptr<Channel> self_;
534
535 ScopedPlatformHandle handle_;
536 scoped_refptr<base::TaskRunner> io_task_runner_;
537
538 // These watchers must only be accessed on the IO thread.
539 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
540 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
541
542 std::deque<PlatformHandle> incoming_platform_handles_;
543
544 // Protects |pending_write_| and |outgoing_messages_|.
545 base::Lock write_lock_;
546 bool pending_write_ = false;
547 bool reject_writes_ = false;
548 std::deque<MessageView> outgoing_messages_;
549
550 bool leak_handle_ = false;
551
552 #if defined(OS_MACOSX)
553 base::Lock handles_to_close_lock_;
554 ScopedPlatformHandleVectorPtr handles_to_close_;
555 #endif
556
557 DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
558 };
559
560 } // namespace
561
562 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)563 scoped_refptr<Channel> Channel::Create(
564 Delegate* delegate,
565 ConnectionParams connection_params,
566 scoped_refptr<base::TaskRunner> io_task_runner) {
567 return new ChannelPosix(delegate, std::move(connection_params),
568 io_task_runner);
569 }
570
571 } // namespace edk
572 } // namespace mojo
573