• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/system/raw_channel.h"
6 
7 #include <errno.h>
8 #include <sys/uio.h>
9 #include <unistd.h>
10 
11 #include <algorithm>
12 #include <deque>
13 
14 #include "base/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "base/memory/scoped_ptr.h"
19 #include "base/memory/weak_ptr.h"
20 #include "base/message_loop/message_loop.h"
21 #include "base/synchronization/lock.h"
22 #include "mojo/embedder/platform_channel_utils_posix.h"
23 #include "mojo/embedder/platform_handle.h"
24 #include "mojo/embedder/platform_handle_vector.h"
25 #include "mojo/system/transport_data.h"
26 
27 namespace mojo {
28 namespace system {
29 
30 namespace {
31 
32 class RawChannelPosix : public RawChannel,
33                         public base::MessageLoopForIO::Watcher {
34  public:
35   explicit RawChannelPosix(embedder::ScopedPlatformHandle handle);
36   virtual ~RawChannelPosix();
37 
38   // |RawChannel| public methods:
39   virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE;
40 
41  private:
42   // |RawChannel| protected methods:
43   // Actually override this so that we can send multiple messages with (only)
44   // FDs if necessary.
45   virtual void EnqueueMessageNoLock(
46       scoped_ptr<MessageInTransit> message) OVERRIDE;
47   // Override this to handle those extra FD-only messages.
48   virtual bool OnReadMessageForRawChannel(
49       const MessageInTransit::View& message_view) OVERRIDE;
50   virtual IOResult Read(size_t* bytes_read) OVERRIDE;
51   virtual IOResult ScheduleRead() OVERRIDE;
52   virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
53       size_t num_platform_handles,
54       const void* platform_handle_table) OVERRIDE;
55   virtual IOResult WriteNoLock(size_t* platform_handles_written,
56                                size_t* bytes_written) OVERRIDE;
57   virtual IOResult ScheduleWriteNoLock() OVERRIDE;
58   virtual bool OnInit() OVERRIDE;
59   virtual void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
60                                 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
61 
62   // |base::MessageLoopForIO::Watcher| implementation:
63   virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
64   virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
65 
66   // Implements most of |Read()| (except for a bit of clean-up):
67   IOResult ReadImpl(size_t* bytes_read);
68 
69   // Watches for |fd_| to become writable. Must be called on the I/O thread.
70   void WaitToWrite();
71 
72   embedder::ScopedPlatformHandle fd_;
73 
74   // The following members are only used on the I/O thread:
75   scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
76   scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
77 
78   bool pending_read_;
79 
80   std::deque<embedder::PlatformHandle> read_platform_handles_;
81 
82   // The following members are used on multiple threads and protected by
83   // |write_lock()|:
84   bool pending_write_;
85 
86   // This is used for posting tasks from write threads to the I/O thread. It
87   // must only be accessed under |write_lock_|. The weak pointers it produces
88   // are only used/invalidated on the I/O thread.
89   base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
90 
91   DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
92 };
93 
RawChannelPosix(embedder::ScopedPlatformHandle handle)94 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle)
95     : fd_(handle.Pass()),
96       pending_read_(false),
97       pending_write_(false),
98       weak_ptr_factory_(this) {
99   DCHECK(fd_.is_valid());
100 }
101 
~RawChannelPosix()102 RawChannelPosix::~RawChannelPosix() {
103   DCHECK(!pending_read_);
104   DCHECK(!pending_write_);
105 
106   // No need to take the |write_lock()| here -- if there are still weak pointers
107   // outstanding, then we're hosed anyway (since we wouldn't be able to
108   // invalidate them cleanly, since we might not be on the I/O thread).
109   DCHECK(!weak_ptr_factory_.HasWeakPtrs());
110 
111   // These must have been shut down/destroyed on the I/O thread.
112   DCHECK(!read_watcher_);
113   DCHECK(!write_watcher_);
114 
115   embedder::CloseAllPlatformHandles(&read_platform_handles_);
116 }
117 
GetSerializedPlatformHandleSize() const118 size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
119   // We don't actually need any space on POSIX (since we just send FDs).
120   return 0;
121 }
122 
EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message)123 void RawChannelPosix::EnqueueMessageNoLock(
124     scoped_ptr<MessageInTransit> message) {
125   if (message->transport_data()) {
126     embedder::PlatformHandleVector* const platform_handles =
127         message->transport_data()->platform_handles();
128     if (platform_handles &&
129         platform_handles->size() > embedder::kPlatformChannelMaxNumHandles) {
130       // We can't attach all the FDs to a single message, so we have to "split"
131       // the message. Send as many control messages as needed first with FDs
132       // attached (and no data).
133       size_t i = 0;
134       for (; platform_handles->size() - i >
135                  embedder::kPlatformChannelMaxNumHandles;
136            i += embedder::kPlatformChannelMaxNumHandles) {
137         scoped_ptr<MessageInTransit> fd_message(new MessageInTransit(
138             MessageInTransit::kTypeRawChannel,
139             MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles,
140             0,
141             nullptr));
142         embedder::ScopedPlatformHandleVectorPtr fds(
143             new embedder::PlatformHandleVector(
144                 platform_handles->begin() + i,
145                 platform_handles->begin() + i +
146                     embedder::kPlatformChannelMaxNumHandles));
147         fd_message->SetTransportData(
148             make_scoped_ptr(new TransportData(fds.Pass())));
149         RawChannel::EnqueueMessageNoLock(fd_message.Pass());
150       }
151 
152       // Remove the handles that we "moved" into the other messages.
153       platform_handles->erase(platform_handles->begin(),
154                               platform_handles->begin() + i);
155     }
156   }
157 
158   RawChannel::EnqueueMessageNoLock(message.Pass());
159 }
160 
OnReadMessageForRawChannel(const MessageInTransit::View & message_view)161 bool RawChannelPosix::OnReadMessageForRawChannel(
162     const MessageInTransit::View& message_view) {
163   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeRawChannel);
164 
165   if (message_view.subtype() ==
166       MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles) {
167     // We don't need to do anything. |RawChannel| won't extract the platform
168     // handles, and they'll be accumulated in |Read()|.
169     return true;
170   }
171 
172   return RawChannel::OnReadMessageForRawChannel(message_view);
173 }
174 
Read(size_t * bytes_read)175 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
176   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
177   DCHECK(!pending_read_);
178 
179   IOResult rv = ReadImpl(bytes_read);
180   if (rv != IO_SUCCEEDED && rv != IO_PENDING) {
181     // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
182     read_watcher_.reset();
183   }
184   return rv;
185 }
186 
ScheduleRead()187 RawChannel::IOResult RawChannelPosix::ScheduleRead() {
188   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
189   DCHECK(!pending_read_);
190 
191   pending_read_ = true;
192 
193   return IO_PENDING;
194 }
195 
GetReadPlatformHandles(size_t num_platform_handles,const void *)196 embedder::ScopedPlatformHandleVectorPtr RawChannelPosix::GetReadPlatformHandles(
197     size_t num_platform_handles,
198     const void* /*platform_handle_table*/) {
199   DCHECK_GT(num_platform_handles, 0u);
200 
201   if (read_platform_handles_.size() < num_platform_handles) {
202     embedder::CloseAllPlatformHandles(&read_platform_handles_);
203     read_platform_handles_.clear();
204     return embedder::ScopedPlatformHandleVectorPtr();
205   }
206 
207   embedder::ScopedPlatformHandleVectorPtr rv(
208       new embedder::PlatformHandleVector(num_platform_handles));
209   rv->assign(read_platform_handles_.begin(),
210              read_platform_handles_.begin() + num_platform_handles);
211   read_platform_handles_.erase(
212       read_platform_handles_.begin(),
213       read_platform_handles_.begin() + num_platform_handles);
214   return rv.Pass();
215 }
216 
WriteNoLock(size_t * platform_handles_written,size_t * bytes_written)217 RawChannel::IOResult RawChannelPosix::WriteNoLock(
218     size_t* platform_handles_written,
219     size_t* bytes_written) {
220   write_lock().AssertAcquired();
221 
222   DCHECK(!pending_write_);
223 
224   size_t num_platform_handles = 0;
225   ssize_t write_result;
226   if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
227     embedder::PlatformHandle* platform_handles;
228     void* serialization_data;  // Actually unused.
229     write_buffer_no_lock()->GetPlatformHandlesToSend(
230         &num_platform_handles, &platform_handles, &serialization_data);
231     DCHECK_GT(num_platform_handles, 0u);
232     DCHECK_LE(num_platform_handles, embedder::kPlatformChannelMaxNumHandles);
233     DCHECK(platform_handles);
234 
235     // TODO(vtl): Reduce code duplication. (This is duplicated from below.)
236     std::vector<WriteBuffer::Buffer> buffers;
237     write_buffer_no_lock()->GetBuffers(&buffers);
238     DCHECK(!buffers.empty());
239     const size_t kMaxBufferCount = 10;
240     iovec iov[kMaxBufferCount];
241     size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
242     for (size_t i = 0; i < buffer_count; ++i) {
243       iov[i].iov_base = const_cast<char*>(buffers[i].addr);
244       iov[i].iov_len = buffers[i].size;
245     }
246 
247     write_result = embedder::PlatformChannelSendmsgWithHandles(
248         fd_.get(), iov, buffer_count, platform_handles, num_platform_handles);
249     for (size_t i = 0; i < num_platform_handles; i++)
250       platform_handles[i].CloseIfNecessary();
251   } else {
252     std::vector<WriteBuffer::Buffer> buffers;
253     write_buffer_no_lock()->GetBuffers(&buffers);
254     DCHECK(!buffers.empty());
255 
256     if (buffers.size() == 1) {
257       write_result = embedder::PlatformChannelWrite(
258           fd_.get(), buffers[0].addr, buffers[0].size);
259     } else {
260       const size_t kMaxBufferCount = 10;
261       iovec iov[kMaxBufferCount];
262       size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
263       for (size_t i = 0; i < buffer_count; ++i) {
264         iov[i].iov_base = const_cast<char*>(buffers[i].addr);
265         iov[i].iov_len = buffers[i].size;
266       }
267 
268       write_result =
269           embedder::PlatformChannelWritev(fd_.get(), iov, buffer_count);
270     }
271   }
272 
273   if (write_result >= 0) {
274     *platform_handles_written = num_platform_handles;
275     *bytes_written = static_cast<size_t>(write_result);
276     return IO_SUCCEEDED;
277   }
278 
279   if (errno == EPIPE)
280     return IO_FAILED_SHUTDOWN;
281 
282   if (errno != EAGAIN && errno != EWOULDBLOCK) {
283     PLOG(WARNING) << "sendmsg/write/writev";
284     return IO_FAILED_UNKNOWN;
285   }
286 
287   return ScheduleWriteNoLock();
288 }
289 
ScheduleWriteNoLock()290 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
291   write_lock().AssertAcquired();
292 
293   DCHECK(!pending_write_);
294 
295   // Set up to wait for the FD to become writable.
296   // If we're not on the I/O thread, we have to post a task to do this.
297   if (base::MessageLoop::current() != message_loop_for_io()) {
298     message_loop_for_io()->PostTask(FROM_HERE,
299                                     base::Bind(&RawChannelPosix::WaitToWrite,
300                                                weak_ptr_factory_.GetWeakPtr()));
301     pending_write_ = true;
302     return IO_PENDING;
303   }
304 
305   if (message_loop_for_io()->WatchFileDescriptor(
306           fd_.get().fd,
307           false,
308           base::MessageLoopForIO::WATCH_WRITE,
309           write_watcher_.get(),
310           this)) {
311     pending_write_ = true;
312     return IO_PENDING;
313   }
314 
315   return IO_FAILED_UNKNOWN;
316 }
317 
OnInit()318 bool RawChannelPosix::OnInit() {
319   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
320 
321   DCHECK(!read_watcher_);
322   read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
323   DCHECK(!write_watcher_);
324   write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
325 
326   if (!message_loop_for_io()->WatchFileDescriptor(
327           fd_.get().fd,
328           true,
329           base::MessageLoopForIO::WATCH_READ,
330           read_watcher_.get(),
331           this)) {
332     // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
333     // (in the sense of returning the message loop's state to what it was before
334     // it was called).
335     read_watcher_.reset();
336     write_watcher_.reset();
337     return false;
338   }
339 
340   return true;
341 }
342 
OnShutdownNoLock(scoped_ptr<ReadBuffer>,scoped_ptr<WriteBuffer>)343 void RawChannelPosix::OnShutdownNoLock(
344     scoped_ptr<ReadBuffer> /*read_buffer*/,
345     scoped_ptr<WriteBuffer> /*write_buffer*/) {
346   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
347   write_lock().AssertAcquired();
348 
349   read_watcher_.reset();   // This will stop watching (if necessary).
350   write_watcher_.reset();  // This will stop watching (if necessary).
351 
352   pending_read_ = false;
353   pending_write_ = false;
354 
355   DCHECK(fd_.is_valid());
356   fd_.reset();
357 
358   weak_ptr_factory_.InvalidateWeakPtrs();
359 }
360 
OnFileCanReadWithoutBlocking(int fd)361 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
362   DCHECK_EQ(fd, fd_.get().fd);
363   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
364 
365   if (!pending_read_) {
366     NOTREACHED();
367     return;
368   }
369 
370   pending_read_ = false;
371   size_t bytes_read = 0;
372   IOResult io_result = Read(&bytes_read);
373   if (io_result != IO_PENDING)
374     OnReadCompleted(io_result, bytes_read);
375 
376   // On failure, |read_watcher_| must have been reset; on success,
377   // we assume that |OnReadCompleted()| always schedules another read.
378   // Otherwise, we could end up spinning -- getting
379   // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
380   // read.
381   // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
382   // schedule a new read. But that code won't be reached under the current
383   // RawChannel implementation.
384   DCHECK(!read_watcher_ || pending_read_);
385 }
386 
OnFileCanWriteWithoutBlocking(int fd)387 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
388   DCHECK_EQ(fd, fd_.get().fd);
389   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
390 
391   IOResult io_result;
392   size_t platform_handles_written = 0;
393   size_t bytes_written = 0;
394   {
395     base::AutoLock locker(write_lock());
396 
397     DCHECK(pending_write_);
398 
399     pending_write_ = false;
400     io_result = WriteNoLock(&platform_handles_written, &bytes_written);
401   }
402 
403   if (io_result != IO_PENDING)
404     OnWriteCompleted(io_result, platform_handles_written, bytes_written);
405 }
406 
ReadImpl(size_t * bytes_read)407 RawChannel::IOResult RawChannelPosix::ReadImpl(size_t* bytes_read) {
408   char* buffer = nullptr;
409   size_t bytes_to_read = 0;
410   read_buffer()->GetBuffer(&buffer, &bytes_to_read);
411 
412   size_t old_num_platform_handles = read_platform_handles_.size();
413   ssize_t read_result = embedder::PlatformChannelRecvmsg(
414       fd_.get(), buffer, bytes_to_read, &read_platform_handles_);
415   if (read_platform_handles_.size() > old_num_platform_handles) {
416     DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles,
417               embedder::kPlatformChannelMaxNumHandles);
418 
419     // We should never accumulate more than |TransportData::kMaxPlatformHandles
420     // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
421     // possible because we could have accumulated all the handles for a message,
422     // then received the message data plus the first set of handles for the next
423     // message in the subsequent |recvmsg()|.)
424     if (read_platform_handles_.size() >
425         (TransportData::kMaxPlatformHandles +
426          embedder::kPlatformChannelMaxNumHandles)) {
427       LOG(ERROR) << "Received too many platform handles";
428       embedder::CloseAllPlatformHandles(&read_platform_handles_);
429       read_platform_handles_.clear();
430       return IO_FAILED_UNKNOWN;
431     }
432   }
433 
434   if (read_result > 0) {
435     *bytes_read = static_cast<size_t>(read_result);
436     return IO_SUCCEEDED;
437   }
438 
439   // |read_result == 0| means "end of file".
440   if (read_result == 0)
441     return IO_FAILED_SHUTDOWN;
442 
443   if (errno == EAGAIN || errno == EWOULDBLOCK)
444     return ScheduleRead();
445 
446   if (errno == ECONNRESET)
447     return IO_FAILED_BROKEN;
448 
449   PLOG(WARNING) << "recvmsg";
450   return IO_FAILED_UNKNOWN;
451 }
452 
WaitToWrite()453 void RawChannelPosix::WaitToWrite() {
454   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
455 
456   DCHECK(write_watcher_);
457 
458   if (!message_loop_for_io()->WatchFileDescriptor(
459           fd_.get().fd,
460           false,
461           base::MessageLoopForIO::WATCH_WRITE,
462           write_watcher_.get(),
463           this)) {
464     {
465       base::AutoLock locker(write_lock());
466 
467       DCHECK(pending_write_);
468       pending_write_ = false;
469     }
470     OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
471   }
472 }
473 
474 }  // namespace
475 
476 // -----------------------------------------------------------------------------
477 
478 // Static factory method declared in raw_channel.h.
479 // static
Create(embedder::ScopedPlatformHandle handle)480 scoped_ptr<RawChannel> RawChannel::Create(
481     embedder::ScopedPlatformHandle handle) {
482   return scoped_ptr<RawChannel>(new RawChannelPosix(handle.Pass()));
483 }
484 
485 }  // namespace system
486 }  // namespace mojo
487