• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/channel.h"
6 
7 #include <errno.h>
8 #include <sys/socket.h>
9 
10 #include <algorithm>
11 #include <deque>
12 #include <limits>
13 #include <memory>
14 
15 #include "base/bind.h"
16 #include "base/location.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/synchronization/lock.h"
21 #include "base/task_runner.h"
22 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
23 #include "mojo/edk/embedder/platform_handle_vector.h"
24 
25 #if !defined(OS_NACL)
26 #include <sys/uio.h>
27 #endif
28 
29 namespace mojo {
30 namespace edk {
31 
32 namespace {
33 
34 const size_t kMaxBatchReadCapacity = 256 * 1024;
35 
36 // A view over a Channel::Message object. The write queue uses these since
37 // large messages may need to be sent in chunks.
38 class MessageView {
39  public:
40   // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)41   MessageView(Channel::MessagePtr message, size_t offset)
42       : message_(std::move(message)),
43         offset_(offset),
44         handles_(message_->TakeHandlesForTransport()) {
45     DCHECK_GT(message_->data_num_bytes(), offset_);
46   }
47 
MessageView(MessageView && other)48   MessageView(MessageView&& other) { *this = std::move(other); }
49 
operator =(MessageView && other)50   MessageView& operator=(MessageView&& other) {
51     message_ = std::move(other.message_);
52     offset_ = other.offset_;
53     handles_ = std::move(other.handles_);
54     return *this;
55   }
56 
~MessageView()57   ~MessageView() {}
58 
data() const59   const void* data() const {
60     return static_cast<const char*>(message_->data()) + offset_;
61   }
62 
data_num_bytes() const63   size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
64 
data_offset() const65   size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)66   void advance_data_offset(size_t num_bytes) {
67     DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
68     offset_ += num_bytes;
69   }
70 
TakeHandles()71   ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
TakeMessage()72   Channel::MessagePtr TakeMessage() { return std::move(message_); }
73 
SetHandles(ScopedPlatformHandleVectorPtr handles)74   void SetHandles(ScopedPlatformHandleVectorPtr handles) {
75     handles_ = std::move(handles);
76   }
77 
78  private:
79   Channel::MessagePtr message_;
80   size_t offset_;
81   ScopedPlatformHandleVectorPtr handles_;
82 
83   DISALLOW_COPY_AND_ASSIGN(MessageView);
84 };
85 
86 class ChannelPosix : public Channel,
87                      public base::MessageLoop::DestructionObserver,
88                      public base::MessageLoopForIO::Watcher {
89  public:
ChannelPosix(Delegate * delegate,ScopedPlatformHandle handle,scoped_refptr<base::TaskRunner> io_task_runner)90   ChannelPosix(Delegate* delegate,
91                ScopedPlatformHandle handle,
92                scoped_refptr<base::TaskRunner> io_task_runner)
93       : Channel(delegate),
94         self_(this),
95         handle_(std::move(handle)),
96         io_task_runner_(io_task_runner)
97 #if defined(OS_MACOSX)
98         ,
99         handles_to_close_(new PlatformHandleVector)
100 #endif
101   {
102   }
103 
Start()104   void Start() override {
105     if (io_task_runner_->RunsTasksOnCurrentThread()) {
106       StartOnIOThread();
107     } else {
108       io_task_runner_->PostTask(
109           FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
110     }
111   }
112 
ShutDownImpl()113   void ShutDownImpl() override {
114     // Always shut down asynchronously when called through the public interface.
115     io_task_runner_->PostTask(
116         FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
117   }
118 
Write(MessagePtr message)119   void Write(MessagePtr message) override {
120     bool write_error = false;
121     {
122       base::AutoLock lock(write_lock_);
123       if (reject_writes_)
124         return;
125       if (outgoing_messages_.empty()) {
126         if (!WriteNoLock(MessageView(std::move(message), 0)))
127           reject_writes_ = write_error = true;
128       } else {
129         outgoing_messages_.emplace_back(std::move(message), 0);
130       }
131     }
132     if (write_error) {
133       // Do not synchronously invoke OnError(). Write() may have been called by
134       // the delegate and we don't want to re-enter it.
135       io_task_runner_->PostTask(FROM_HERE,
136                                 base::Bind(&ChannelPosix::OnError, this));
137     }
138   }
139 
LeakHandle()140   void LeakHandle() override {
141     DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
142     leak_handle_ = true;
143   }
144 
GetReadPlatformHandles(size_t num_handles,const void * extra_header,size_t extra_header_size,ScopedPlatformHandleVectorPtr * handles)145   bool GetReadPlatformHandles(
146       size_t num_handles,
147       const void* extra_header,
148       size_t extra_header_size,
149       ScopedPlatformHandleVectorPtr* handles) override {
150     if (num_handles > std::numeric_limits<uint16_t>::max())
151       return false;
152 #if defined(OS_MACOSX) && !defined(OS_IOS)
153     // On OSX, we can have mach ports which are located in the extra header
154     // section.
155     using MachPortsEntry = Channel::Message::MachPortsEntry;
156     using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
157     CHECK(extra_header_size >=
158           sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry));
159     const MachPortsExtraHeader* mach_ports_header =
160         reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
161     size_t num_mach_ports = mach_ports_header->num_ports;
162     CHECK(num_mach_ports <= num_handles);
163     if (incoming_platform_handles_.size() + num_mach_ports < num_handles) {
164       handles->reset();
165       return true;
166     }
167 
168     handles->reset(new PlatformHandleVector(num_handles));
169     const MachPortsEntry* mach_ports = mach_ports_header->entries;
170     for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
171       if (mach_port_index < num_mach_ports &&
172           mach_ports[mach_port_index].index == i) {
173         (*handles)->at(i) = PlatformHandle(
174             static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port));
175         CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH);
176         // These are actually just Mach port names until they're resolved from
177         // the remote process.
178         (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME;
179         mach_port_index++;
180       } else {
181         CHECK(!incoming_platform_handles_.empty());
182         (*handles)->at(i) = incoming_platform_handles_.front();
183         incoming_platform_handles_.pop_front();
184       }
185     }
186 #else
187     if (incoming_platform_handles_.size() < num_handles) {
188       handles->reset();
189       return true;
190     }
191 
192     handles->reset(new PlatformHandleVector(num_handles));
193     for (size_t i = 0; i < num_handles; ++i) {
194       (*handles)->at(i) = incoming_platform_handles_.front();
195       incoming_platform_handles_.pop_front();
196     }
197 #endif
198 
199     return true;
200   }
201 
202  private:
~ChannelPosix()203   ~ChannelPosix() override {
204     DCHECK(!read_watcher_);
205     DCHECK(!write_watcher_);
206     for (auto handle : incoming_platform_handles_)
207       handle.CloseIfNecessary();
208   }
209 
StartOnIOThread()210   void StartOnIOThread() {
211     DCHECK(!read_watcher_);
212     DCHECK(!write_watcher_);
213     read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
214     write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
215     base::MessageLoopForIO::current()->WatchFileDescriptor(
216         handle_.get().handle, true /* persistent */,
217         base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
218     base::MessageLoop::current()->AddDestructionObserver(this);
219   }
220 
WaitForWriteOnIOThread()221   void WaitForWriteOnIOThread() {
222     base::AutoLock lock(write_lock_);
223     WaitForWriteOnIOThreadNoLock();
224   }
225 
WaitForWriteOnIOThreadNoLock()226   void WaitForWriteOnIOThreadNoLock() {
227     if (pending_write_)
228       return;
229     if (!write_watcher_)
230       return;
231     if (io_task_runner_->RunsTasksOnCurrentThread()) {
232       pending_write_ = true;
233       base::MessageLoopForIO::current()->WatchFileDescriptor(
234           handle_.get().handle, false /* persistent */,
235           base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
236     } else {
237       io_task_runner_->PostTask(
238           FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
239     }
240   }
241 
ShutDownOnIOThread()242   void ShutDownOnIOThread() {
243     base::MessageLoop::current()->RemoveDestructionObserver(this);
244 
245     read_watcher_.reset();
246     write_watcher_.reset();
247     if (leak_handle_)
248       ignore_result(handle_.release());
249     handle_.reset();
250 #if defined(OS_MACOSX)
251     handles_to_close_.reset();
252 #endif
253 
254     // May destroy the |this| if it was the last reference.
255     self_ = nullptr;
256   }
257 
258   // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()259   void WillDestroyCurrentMessageLoop() override {
260     DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
261     if (self_)
262       ShutDownOnIOThread();
263   }
264 
265   // base::MessageLoopForIO::Watcher:
OnFileCanReadWithoutBlocking(int fd)266   void OnFileCanReadWithoutBlocking(int fd) override {
267     CHECK_EQ(fd, handle_.get().handle);
268 
269     bool read_error = false;
270     size_t next_read_size = 0;
271     size_t buffer_capacity = 0;
272     size_t total_bytes_read = 0;
273     size_t bytes_read = 0;
274     do {
275       buffer_capacity = next_read_size;
276       char* buffer = GetReadBuffer(&buffer_capacity);
277       DCHECK_GT(buffer_capacity, 0u);
278 
279       ssize_t read_result = PlatformChannelRecvmsg(
280           handle_.get(),
281           buffer,
282           buffer_capacity,
283           &incoming_platform_handles_);
284 
285       if (read_result > 0) {
286         bytes_read = static_cast<size_t>(read_result);
287         total_bytes_read += bytes_read;
288         if (!OnReadComplete(bytes_read, &next_read_size)) {
289           read_error = true;
290           break;
291         }
292       } else if (read_result == 0 ||
293                  (errno != EAGAIN && errno != EWOULDBLOCK)) {
294         read_error = true;
295         break;
296       }
297     } while (bytes_read == buffer_capacity &&
298              total_bytes_read < kMaxBatchReadCapacity &&
299              next_read_size > 0);
300     if (read_error) {
301       // Stop receiving read notifications.
302       read_watcher_.reset();
303 
304       OnError();
305     }
306   }
307 
OnFileCanWriteWithoutBlocking(int fd)308   void OnFileCanWriteWithoutBlocking(int fd) override {
309     bool write_error = false;
310     {
311       base::AutoLock lock(write_lock_);
312       pending_write_ = false;
313       if (!FlushOutgoingMessagesNoLock())
314         reject_writes_ = write_error = true;
315     }
316     if (write_error)
317       OnError();
318   }
319 
320   // Attempts to write a message directly to the channel. If the full message
321   // cannot be written, it's queued and a wait is initiated to write the message
322   // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)323   bool WriteNoLock(MessageView message_view) {
324     size_t bytes_written = 0;
325     do {
326       message_view.advance_data_offset(bytes_written);
327 
328       ssize_t result;
329       ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
330       if (handles && handles->size()) {
331         iovec iov = {
332           const_cast<void*>(message_view.data()),
333           message_view.data_num_bytes()
334         };
335         // TODO: Handle lots of handles.
336         result = PlatformChannelSendmsgWithHandles(
337             handle_.get(), &iov, 1, handles->data(), handles->size());
338         if (result >= 0) {
339 #if defined(OS_MACOSX)
340           // There is a bug on OSX which makes it dangerous to close
341           // a file descriptor while it is in transit. So instead we
342           // store the file descriptor in a set and send a message to
343           // the recipient, which is queued AFTER the message that
344           // sent the FD. The recipient will reply to the message,
345           // letting us know that it is now safe to close the file
346           // descriptor. For more information, see:
347           // http://crbug.com/298276
348           std::vector<int> fds;
349           for (auto& handle : *handles)
350             fds.push_back(handle.handle);
351           {
352             base::AutoLock l(handles_to_close_lock_);
353             for (auto& handle : *handles)
354               handles_to_close_->push_back(handle);
355           }
356           MessagePtr fds_message(
357               new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
358                                    Message::Header::MessageType::HANDLES_SENT));
359           memcpy(fds_message->mutable_payload(), fds.data(),
360                  sizeof(fds[0]) * fds.size());
361           outgoing_messages_.emplace_back(std::move(fds_message), 0);
362           handles->clear();
363 #else
364           handles.reset();
365 #endif  // defined(OS_MACOSX)
366         }
367       } else {
368         result = PlatformChannelWrite(handle_.get(), message_view.data(),
369                                       message_view.data_num_bytes());
370       }
371 
372       if (result < 0) {
373         if (errno != EAGAIN && errno != EWOULDBLOCK)
374           return false;
375         message_view.SetHandles(std::move(handles));
376         outgoing_messages_.emplace_front(std::move(message_view));
377         WaitForWriteOnIOThreadNoLock();
378         return true;
379       }
380 
381       bytes_written = static_cast<size_t>(result);
382     } while (bytes_written < message_view.data_num_bytes());
383 
384     return FlushOutgoingMessagesNoLock();
385   }
386 
FlushOutgoingMessagesNoLock()387   bool FlushOutgoingMessagesNoLock() {
388     std::deque<MessageView> messages;
389     std::swap(outgoing_messages_, messages);
390 
391     while (!messages.empty()) {
392       if (!WriteNoLock(std::move(messages.front())))
393         return false;
394 
395       messages.pop_front();
396       if (!outgoing_messages_.empty()) {
397         // The message was requeued by WriteNoLock(), so we have to wait for
398         // pipe to become writable again. Repopulate the message queue and exit.
399         // If sending the message triggered any control messages, they may be
400         // in |outgoing_messages_| in addition to or instead of the message
401         // being sent.
402         std::swap(messages, outgoing_messages_);
403         while (!messages.empty()) {
404           outgoing_messages_.push_front(std::move(messages.back()));
405           messages.pop_back();
406         }
407         return true;
408       }
409     }
410 
411     return true;
412   }
413 
414 #if defined(OS_MACOSX)
OnControlMessage(Message::Header::MessageType message_type,const void * payload,size_t payload_size,ScopedPlatformHandleVectorPtr handles)415   bool OnControlMessage(Message::Header::MessageType message_type,
416                         const void* payload,
417                         size_t payload_size,
418                         ScopedPlatformHandleVectorPtr handles) override {
419     switch (message_type) {
420       case Message::Header::MessageType::HANDLES_SENT: {
421         if (payload_size == 0)
422           break;
423         MessagePtr message(new Channel::Message(
424             payload_size, 0, Message::Header::MessageType::HANDLES_SENT_ACK));
425         memcpy(message->mutable_payload(), payload, payload_size);
426         Write(std::move(message));
427         return true;
428       }
429 
430       case Message::Header::MessageType::HANDLES_SENT_ACK: {
431         size_t num_fds = payload_size / sizeof(int);
432         if (num_fds == 0 || payload_size % sizeof(int) != 0)
433           break;
434 
435         const int* fds = reinterpret_cast<const int*>(payload);
436         if (!CloseHandles(fds, num_fds))
437           break;
438         return true;
439       }
440 
441       default:
442         break;
443     }
444 
445     return false;
446   }
447 
448   // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
449   // |fds| does not match a sequence of handles in |handles_to_close_|.
CloseHandles(const int * fds,size_t num_fds)450   bool CloseHandles(const int* fds, size_t num_fds) {
451     base::AutoLock l(handles_to_close_lock_);
452     if (!num_fds)
453       return false;
454 
455     auto start =
456         std::find_if(handles_to_close_->begin(), handles_to_close_->end(),
457                      [&fds](const PlatformHandle& handle) {
458                        return handle.handle == fds[0];
459                      });
460     if (start == handles_to_close_->end())
461       return false;
462 
463     auto it = start;
464     size_t i = 0;
465     // The FDs in the message should match a sequence of handles in
466     // |handles_to_close_|.
467     for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) {
468       if (it->handle != fds[i])
469         return false;
470 
471       it->CloseIfNecessary();
472     }
473     if (i != num_fds)
474       return false;
475 
476     handles_to_close_->erase(start, it);
477     return true;
478   }
479 #endif  // defined(OS_MACOSX)
480 
481   // Keeps the Channel alive at least until explicit shutdown on the IO thread.
482   scoped_refptr<Channel> self_;
483 
484   ScopedPlatformHandle handle_;
485   scoped_refptr<base::TaskRunner> io_task_runner_;
486 
487   // These watchers must only be accessed on the IO thread.
488   std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
489   std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
490 
491   std::deque<PlatformHandle> incoming_platform_handles_;
492 
493   // Protects |pending_write_| and |outgoing_messages_|.
494   base::Lock write_lock_;
495   bool pending_write_ = false;
496   bool reject_writes_ = false;
497   std::deque<MessageView> outgoing_messages_;
498 
499   bool leak_handle_ = false;
500 
501 #if defined(OS_MACOSX)
502   base::Lock handles_to_close_lock_;
503   ScopedPlatformHandleVectorPtr handles_to_close_;
504 #endif
505 
506   DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
507 };
508 
509 }  // namespace
510 
511 // static
Create(Delegate * delegate,ScopedPlatformHandle platform_handle,scoped_refptr<base::TaskRunner> io_task_runner)512 scoped_refptr<Channel> Channel::Create(
513     Delegate* delegate,
514     ScopedPlatformHandle platform_handle,
515     scoped_refptr<base::TaskRunner> io_task_runner) {
516   return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner);
517 }
518 
519 }  // namespace edk
520 }  // namespace mojo
521