• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/channel.h"
6 
7 #include <errno.h>
8 #include <sys/socket.h>
9 
10 #include <algorithm>
11 #include <deque>
12 #include <limits>
13 #include <memory>
14 
15 #include "base/bind.h"
16 #include "base/location.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/synchronization/lock.h"
21 #include "base/task_runner.h"
22 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
23 #include "mojo/edk/embedder/platform_handle_vector.h"
24 
25 #if !defined(OS_NACL)
26 #include <sys/uio.h>
27 #endif
28 
29 namespace mojo {
30 namespace edk {
31 
32 namespace {
33 
34 const size_t kMaxBatchReadCapacity = 256 * 1024;
35 
36 // A view over a Channel::Message object. The write queue uses these since
37 // large messages may need to be sent in chunks.
38 class MessageView {
39  public:
40   // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)41   MessageView(Channel::MessagePtr message, size_t offset)
42       : message_(std::move(message)),
43         offset_(offset),
44         handles_(message_->TakeHandlesForTransport()) {
45     DCHECK_GT(message_->data_num_bytes(), offset_);
46   }
47 
MessageView(MessageView && other)48   MessageView(MessageView&& other) { *this = std::move(other); }
49 
operator =(MessageView && other)50   MessageView& operator=(MessageView&& other) {
51     message_ = std::move(other.message_);
52     offset_ = other.offset_;
53     handles_ = std::move(other.handles_);
54     return *this;
55   }
56 
~MessageView()57   ~MessageView() {}
58 
data() const59   const void* data() const {
60     return static_cast<const char*>(message_->data()) + offset_;
61   }
62 
data_num_bytes() const63   size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
64 
data_offset() const65   size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)66   void advance_data_offset(size_t num_bytes) {
67     DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
68     offset_ += num_bytes;
69   }
70 
TakeHandles()71   ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
TakeMessage()72   Channel::MessagePtr TakeMessage() { return std::move(message_); }
73 
SetHandles(ScopedPlatformHandleVectorPtr handles)74   void SetHandles(ScopedPlatformHandleVectorPtr handles) {
75     handles_ = std::move(handles);
76   }
77 
78  private:
79   Channel::MessagePtr message_;
80   size_t offset_;
81   ScopedPlatformHandleVectorPtr handles_;
82 
83   DISALLOW_COPY_AND_ASSIGN(MessageView);
84 };
85 
86 class ChannelPosix : public Channel,
87                      public base::MessageLoop::DestructionObserver,
88                      public base::MessageLoopForIO::Watcher {
89  public:
ChannelPosix(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)90   ChannelPosix(Delegate* delegate,
91                ConnectionParams connection_params,
92                scoped_refptr<base::TaskRunner> io_task_runner)
93       : Channel(delegate),
94         self_(this),
95         handle_(connection_params.TakeChannelHandle()),
96         io_task_runner_(io_task_runner)
97 #if defined(OS_MACOSX)
98         ,
99         handles_to_close_(new PlatformHandleVector)
100 #endif
101   {
102     CHECK(handle_.is_valid());
103   }
104 
Start()105   void Start() override {
106     if (io_task_runner_->RunsTasksOnCurrentThread()) {
107       StartOnIOThread();
108     } else {
109       io_task_runner_->PostTask(
110           FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
111     }
112   }
113 
ShutDownImpl()114   void ShutDownImpl() override {
115     // Always shut down asynchronously when called through the public interface.
116     io_task_runner_->PostTask(
117         FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
118   }
119 
Write(MessagePtr message)120   void Write(MessagePtr message) override {
121     bool write_error = false;
122     {
123       base::AutoLock lock(write_lock_);
124       if (reject_writes_)
125         return;
126       if (outgoing_messages_.empty()) {
127         if (!WriteNoLock(MessageView(std::move(message), 0)))
128           reject_writes_ = write_error = true;
129       } else {
130         outgoing_messages_.emplace_back(std::move(message), 0);
131       }
132     }
133     if (write_error) {
134       // Do not synchronously invoke OnError(). Write() may have been called by
135       // the delegate and we don't want to re-enter it.
136       io_task_runner_->PostTask(FROM_HERE,
137                                 base::Bind(&ChannelPosix::OnError, this));
138     }
139   }
140 
LeakHandle()141   void LeakHandle() override {
142     DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
143     leak_handle_ = true;
144   }
145 
GetReadPlatformHandles(size_t num_handles,const void * extra_header,size_t extra_header_size,ScopedPlatformHandleVectorPtr * handles)146   bool GetReadPlatformHandles(
147       size_t num_handles,
148       const void* extra_header,
149       size_t extra_header_size,
150       ScopedPlatformHandleVectorPtr* handles) override {
151     if (num_handles > std::numeric_limits<uint16_t>::max())
152       return false;
153 #if defined(OS_MACOSX) && !defined(OS_IOS)
154     // On OSX, we can have mach ports which are located in the extra header
155     // section.
156     using MachPortsEntry = Channel::Message::MachPortsEntry;
157     using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
158     CHECK(extra_header_size >=
159           sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry));
160     const MachPortsExtraHeader* mach_ports_header =
161         reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
162     size_t num_mach_ports = mach_ports_header->num_ports;
163     CHECK(num_mach_ports <= num_handles);
164     if (incoming_platform_handles_.size() + num_mach_ports < num_handles) {
165       handles->reset();
166       return true;
167     }
168 
169     handles->reset(new PlatformHandleVector(num_handles));
170     const MachPortsEntry* mach_ports = mach_ports_header->entries;
171     for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
172       if (mach_port_index < num_mach_ports &&
173           mach_ports[mach_port_index].index == i) {
174         (*handles)->at(i) = PlatformHandle(
175             static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port));
176         CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH);
177         // These are actually just Mach port names until they're resolved from
178         // the remote process.
179         (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME;
180         mach_port_index++;
181       } else {
182         CHECK(!incoming_platform_handles_.empty());
183         (*handles)->at(i) = incoming_platform_handles_.front();
184         incoming_platform_handles_.pop_front();
185       }
186     }
187 #else
188     if (incoming_platform_handles_.size() < num_handles) {
189       handles->reset();
190       return true;
191     }
192 
193     handles->reset(new PlatformHandleVector(num_handles));
194     for (size_t i = 0; i < num_handles; ++i) {
195       (*handles)->at(i) = incoming_platform_handles_.front();
196       incoming_platform_handles_.pop_front();
197     }
198 #endif
199 
200     return true;
201   }
202 
203  private:
~ChannelPosix()204   ~ChannelPosix() override {
205     DCHECK(!read_watcher_);
206     DCHECK(!write_watcher_);
207     for (auto handle : incoming_platform_handles_)
208       handle.CloseIfNecessary();
209   }
210 
StartOnIOThread()211   void StartOnIOThread() {
212     DCHECK(!read_watcher_);
213     DCHECK(!write_watcher_);
214     read_watcher_.reset(
215         new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE));
216     base::MessageLoop::current()->AddDestructionObserver(this);
217     if (handle_.get().needs_connection) {
218       base::MessageLoopForIO::current()->WatchFileDescriptor(
219           handle_.get().handle, false /* persistent */,
220           base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
221     } else {
222       write_watcher_.reset(
223           new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE));
224       base::MessageLoopForIO::current()->WatchFileDescriptor(
225           handle_.get().handle, true /* persistent */,
226           base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
227       base::AutoLock lock(write_lock_);
228       FlushOutgoingMessagesNoLock();
229     }
230   }
231 
WaitForWriteOnIOThread()232   void WaitForWriteOnIOThread() {
233     base::AutoLock lock(write_lock_);
234     WaitForWriteOnIOThreadNoLock();
235   }
236 
WaitForWriteOnIOThreadNoLock()237   void WaitForWriteOnIOThreadNoLock() {
238     if (pending_write_)
239       return;
240     if (!write_watcher_)
241       return;
242     if (io_task_runner_->RunsTasksOnCurrentThread()) {
243       pending_write_ = true;
244       base::MessageLoopForIO::current()->WatchFileDescriptor(
245           handle_.get().handle, false /* persistent */,
246           base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
247     } else {
248       io_task_runner_->PostTask(
249           FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
250     }
251   }
252 
ShutDownOnIOThread()253   void ShutDownOnIOThread() {
254     base::MessageLoop::current()->RemoveDestructionObserver(this);
255 
256     read_watcher_.reset();
257     write_watcher_.reset();
258     if (leak_handle_)
259       ignore_result(handle_.release());
260     handle_.reset();
261 #if defined(OS_MACOSX)
262     handles_to_close_.reset();
263 #endif
264 
265     // May destroy the |this| if it was the last reference.
266     self_ = nullptr;
267   }
268 
269   // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()270   void WillDestroyCurrentMessageLoop() override {
271     DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
272     if (self_)
273       ShutDownOnIOThread();
274   }
275 
276   // base::MessageLoopForIO::Watcher:
OnFileCanReadWithoutBlocking(int fd)277   void OnFileCanReadWithoutBlocking(int fd) override {
278     CHECK_EQ(fd, handle_.get().handle);
279     if (handle_.get().needs_connection) {
280 #if !defined(OS_NACL)
281       read_watcher_.reset();
282       base::MessageLoop::current()->RemoveDestructionObserver(this);
283 
284       ScopedPlatformHandle accept_fd;
285       ServerAcceptConnection(handle_.get(), &accept_fd);
286       if (!accept_fd.is_valid()) {
287         OnError();
288         return;
289       }
290       handle_ = std::move(accept_fd);
291       StartOnIOThread();
292 #else
293       NOTREACHED();
294 #endif
295       return;
296     }
297 
298     bool read_error = false;
299     size_t next_read_size = 0;
300     size_t buffer_capacity = 0;
301     size_t total_bytes_read = 0;
302     size_t bytes_read = 0;
303     do {
304       buffer_capacity = next_read_size;
305       char* buffer = GetReadBuffer(&buffer_capacity);
306       DCHECK_GT(buffer_capacity, 0u);
307 
308       ssize_t read_result = PlatformChannelRecvmsg(
309           handle_.get(),
310           buffer,
311           buffer_capacity,
312           &incoming_platform_handles_);
313 
314       if (read_result > 0) {
315         bytes_read = static_cast<size_t>(read_result);
316         total_bytes_read += bytes_read;
317         if (!OnReadComplete(bytes_read, &next_read_size)) {
318           read_error = true;
319           break;
320         }
321       } else if (read_result == 0 ||
322                  (errno != EAGAIN && errno != EWOULDBLOCK)) {
323         read_error = true;
324         break;
325       }
326     } while (bytes_read == buffer_capacity &&
327              total_bytes_read < kMaxBatchReadCapacity &&
328              next_read_size > 0);
329     if (read_error) {
330       // Stop receiving read notifications.
331       read_watcher_.reset();
332 
333       OnError();
334     }
335   }
336 
OnFileCanWriteWithoutBlocking(int fd)337   void OnFileCanWriteWithoutBlocking(int fd) override {
338     bool write_error = false;
339     {
340       base::AutoLock lock(write_lock_);
341       pending_write_ = false;
342       if (!FlushOutgoingMessagesNoLock())
343         reject_writes_ = write_error = true;
344     }
345     if (write_error)
346       OnError();
347   }
348 
349   // Attempts to write a message directly to the channel. If the full message
350   // cannot be written, it's queued and a wait is initiated to write the message
351   // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)352   bool WriteNoLock(MessageView message_view) {
353     if (handle_.get().needs_connection) {
354       outgoing_messages_.emplace_front(std::move(message_view));
355       return true;
356     }
357     size_t bytes_written = 0;
358     do {
359       message_view.advance_data_offset(bytes_written);
360 
361       ssize_t result;
362       ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
363       if (handles && handles->size()) {
364         iovec iov = {
365           const_cast<void*>(message_view.data()),
366           message_view.data_num_bytes()
367         };
368         // TODO: Handle lots of handles.
369         result = PlatformChannelSendmsgWithHandles(
370             handle_.get(), &iov, 1, handles->data(), handles->size());
371         if (result >= 0) {
372 #if defined(OS_MACOSX)
373           // There is a bug on OSX which makes it dangerous to close
374           // a file descriptor while it is in transit. So instead we
375           // store the file descriptor in a set and send a message to
376           // the recipient, which is queued AFTER the message that
377           // sent the FD. The recipient will reply to the message,
378           // letting us know that it is now safe to close the file
379           // descriptor. For more information, see:
380           // http://crbug.com/298276
381           std::vector<int> fds;
382           for (auto& handle : *handles)
383             fds.push_back(handle.handle);
384           {
385             base::AutoLock l(handles_to_close_lock_);
386             for (auto& handle : *handles)
387               handles_to_close_->push_back(handle);
388           }
389           MessagePtr fds_message(
390               new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
391                                    Message::MessageType::HANDLES_SENT));
392           memcpy(fds_message->mutable_payload(), fds.data(),
393                  sizeof(fds[0]) * fds.size());
394           outgoing_messages_.emplace_back(std::move(fds_message), 0);
395           handles->clear();
396 #else
397           handles.reset();
398 #endif  // defined(OS_MACOSX)
399         }
400       } else {
401         result = PlatformChannelWrite(handle_.get(), message_view.data(),
402                                       message_view.data_num_bytes());
403       }
404 
405       if (result < 0) {
406         if (errno != EAGAIN && errno != EWOULDBLOCK
407 #if defined(OS_MACOSX)
408             // On OS X if sendmsg() is trying to send fds between processes and
409             // there isn't enough room in the output buffer to send the fd
410             // structure over atomically then EMSGSIZE is returned.
411             //
412             // EMSGSIZE presents a problem since the system APIs can only call
413             // us when there's room in the socket buffer and not when there is
414             // "enough" room.
415             //
416             // The current behavior is to return to the event loop when EMSGSIZE
417             // is received and hopefull service another FD.  This is however
418             // still technically a busy wait since the event loop will call us
419             // right back until the receiver has read enough data to allow
420             // passing the FD over atomically.
421             && errno != EMSGSIZE
422 #endif
423             ) {
424           return false;
425         }
426         message_view.SetHandles(std::move(handles));
427         outgoing_messages_.emplace_front(std::move(message_view));
428         WaitForWriteOnIOThreadNoLock();
429         return true;
430       }
431 
432       bytes_written = static_cast<size_t>(result);
433     } while (bytes_written < message_view.data_num_bytes());
434 
435     return FlushOutgoingMessagesNoLock();
436   }
437 
FlushOutgoingMessagesNoLock()438   bool FlushOutgoingMessagesNoLock() {
439     std::deque<MessageView> messages;
440     std::swap(outgoing_messages_, messages);
441 
442     while (!messages.empty()) {
443       if (!WriteNoLock(std::move(messages.front())))
444         return false;
445 
446       messages.pop_front();
447       if (!outgoing_messages_.empty()) {
448         // The message was requeued by WriteNoLock(), so we have to wait for
449         // pipe to become writable again. Repopulate the message queue and exit.
450         // If sending the message triggered any control messages, they may be
451         // in |outgoing_messages_| in addition to or instead of the message
452         // being sent.
453         std::swap(messages, outgoing_messages_);
454         while (!messages.empty()) {
455           outgoing_messages_.push_front(std::move(messages.back()));
456           messages.pop_back();
457         }
458         return true;
459       }
460     }
461 
462     return true;
463   }
464 
465 #if defined(OS_MACOSX)
OnControlMessage(Message::MessageType message_type,const void * payload,size_t payload_size,ScopedPlatformHandleVectorPtr handles)466   bool OnControlMessage(Message::MessageType message_type,
467                         const void* payload,
468                         size_t payload_size,
469                         ScopedPlatformHandleVectorPtr handles) override {
470     switch (message_type) {
471       case Message::MessageType::HANDLES_SENT: {
472         if (payload_size == 0)
473           break;
474         MessagePtr message(new Channel::Message(
475             payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
476         memcpy(message->mutable_payload(), payload, payload_size);
477         Write(std::move(message));
478         return true;
479       }
480 
481       case Message::MessageType::HANDLES_SENT_ACK: {
482         size_t num_fds = payload_size / sizeof(int);
483         if (num_fds == 0 || payload_size % sizeof(int) != 0)
484           break;
485 
486         const int* fds = reinterpret_cast<const int*>(payload);
487         if (!CloseHandles(fds, num_fds))
488           break;
489         return true;
490       }
491 
492       default:
493         break;
494     }
495 
496     return false;
497   }
498 
499   // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
500   // |fds| does not match a sequence of handles in |handles_to_close_|.
CloseHandles(const int * fds,size_t num_fds)501   bool CloseHandles(const int* fds, size_t num_fds) {
502     base::AutoLock l(handles_to_close_lock_);
503     if (!num_fds)
504       return false;
505 
506     auto start =
507         std::find_if(handles_to_close_->begin(), handles_to_close_->end(),
508                      [&fds](const PlatformHandle& handle) {
509                        return handle.handle == fds[0];
510                      });
511     if (start == handles_to_close_->end())
512       return false;
513 
514     auto it = start;
515     size_t i = 0;
516     // The FDs in the message should match a sequence of handles in
517     // |handles_to_close_|.
518     for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) {
519       if (it->handle != fds[i])
520         return false;
521 
522       it->CloseIfNecessary();
523     }
524     if (i != num_fds)
525       return false;
526 
527     handles_to_close_->erase(start, it);
528     return true;
529   }
530 #endif  // defined(OS_MACOSX)
531 
532   // Keeps the Channel alive at least until explicit shutdown on the IO thread.
533   scoped_refptr<Channel> self_;
534 
535   ScopedPlatformHandle handle_;
536   scoped_refptr<base::TaskRunner> io_task_runner_;
537 
538   // These watchers must only be accessed on the IO thread.
539   std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
540   std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
541 
542   std::deque<PlatformHandle> incoming_platform_handles_;
543 
544   // Protects |pending_write_| and |outgoing_messages_|.
545   base::Lock write_lock_;
546   bool pending_write_ = false;
547   bool reject_writes_ = false;
548   std::deque<MessageView> outgoing_messages_;
549 
550   bool leak_handle_ = false;
551 
552 #if defined(OS_MACOSX)
553   base::Lock handles_to_close_lock_;
554   ScopedPlatformHandleVectorPtr handles_to_close_;
555 #endif
556 
557   DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
558 };
559 
560 }  // namespace
561 
562 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)563 scoped_refptr<Channel> Channel::Create(
564     Delegate* delegate,
565     ConnectionParams connection_params,
566     scoped_refptr<base::TaskRunner> io_task_runner) {
567   return new ChannelPosix(delegate, std::move(connection_params),
568                           io_task_runner);
569 }
570 
571 }  // namespace edk
572 }  // namespace mojo
573