1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/channel.h"
6
7 #include <errno.h>
8 #include <sys/socket.h>
9
10 #include <algorithm>
11 #include <deque>
12 #include <limits>
13 #include <memory>
14
15 #include "base/bind.h"
16 #include "base/location.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/synchronization/lock.h"
21 #include "base/task_runner.h"
22 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
23 #include "mojo/edk/embedder/platform_handle_vector.h"
24
25 #if !defined(OS_NACL)
26 #include <sys/uio.h>
27 #endif
28
29 namespace mojo {
30 namespace edk {
31
32 namespace {
33
34 const size_t kMaxBatchReadCapacity = 256 * 1024;
35
36 // A view over a Channel::Message object. The write queue uses these since
37 // large messages may need to be sent in chunks.
38 class MessageView {
39 public:
40 // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)41 MessageView(Channel::MessagePtr message, size_t offset)
42 : message_(std::move(message)),
43 offset_(offset),
44 handles_(message_->TakeHandlesForTransport()) {
45 DCHECK_GT(message_->data_num_bytes(), offset_);
46 }
47
MessageView(MessageView && other)48 MessageView(MessageView&& other) { *this = std::move(other); }
49
operator =(MessageView && other)50 MessageView& operator=(MessageView&& other) {
51 message_ = std::move(other.message_);
52 offset_ = other.offset_;
53 handles_ = std::move(other.handles_);
54 return *this;
55 }
56
~MessageView()57 ~MessageView() {}
58
data() const59 const void* data() const {
60 return static_cast<const char*>(message_->data()) + offset_;
61 }
62
data_num_bytes() const63 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
64
data_offset() const65 size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)66 void advance_data_offset(size_t num_bytes) {
67 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
68 offset_ += num_bytes;
69 }
70
TakeHandles()71 ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); }
TakeMessage()72 Channel::MessagePtr TakeMessage() { return std::move(message_); }
73
SetHandles(ScopedPlatformHandleVectorPtr handles)74 void SetHandles(ScopedPlatformHandleVectorPtr handles) {
75 handles_ = std::move(handles);
76 }
77
78 private:
79 Channel::MessagePtr message_;
80 size_t offset_;
81 ScopedPlatformHandleVectorPtr handles_;
82
83 DISALLOW_COPY_AND_ASSIGN(MessageView);
84 };
85
86 class ChannelPosix : public Channel,
87 public base::MessageLoop::DestructionObserver,
88 public base::MessageLoopForIO::Watcher {
89 public:
ChannelPosix(Delegate * delegate,ScopedPlatformHandle handle,scoped_refptr<base::TaskRunner> io_task_runner)90 ChannelPosix(Delegate* delegate,
91 ScopedPlatformHandle handle,
92 scoped_refptr<base::TaskRunner> io_task_runner)
93 : Channel(delegate),
94 self_(this),
95 handle_(std::move(handle)),
96 io_task_runner_(io_task_runner)
97 #if defined(OS_MACOSX)
98 ,
99 handles_to_close_(new PlatformHandleVector)
100 #endif
101 {
102 }
103
Start()104 void Start() override {
105 if (io_task_runner_->RunsTasksOnCurrentThread()) {
106 StartOnIOThread();
107 } else {
108 io_task_runner_->PostTask(
109 FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this));
110 }
111 }
112
ShutDownImpl()113 void ShutDownImpl() override {
114 // Always shut down asynchronously when called through the public interface.
115 io_task_runner_->PostTask(
116 FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this));
117 }
118
Write(MessagePtr message)119 void Write(MessagePtr message) override {
120 bool write_error = false;
121 {
122 base::AutoLock lock(write_lock_);
123 if (reject_writes_)
124 return;
125 if (outgoing_messages_.empty()) {
126 if (!WriteNoLock(MessageView(std::move(message), 0)))
127 reject_writes_ = write_error = true;
128 } else {
129 outgoing_messages_.emplace_back(std::move(message), 0);
130 }
131 }
132 if (write_error) {
133 // Do not synchronously invoke OnError(). Write() may have been called by
134 // the delegate and we don't want to re-enter it.
135 io_task_runner_->PostTask(FROM_HERE,
136 base::Bind(&ChannelPosix::OnError, this));
137 }
138 }
139
LeakHandle()140 void LeakHandle() override {
141 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
142 leak_handle_ = true;
143 }
144
GetReadPlatformHandles(size_t num_handles,const void * extra_header,size_t extra_header_size,ScopedPlatformHandleVectorPtr * handles)145 bool GetReadPlatformHandles(
146 size_t num_handles,
147 const void* extra_header,
148 size_t extra_header_size,
149 ScopedPlatformHandleVectorPtr* handles) override {
150 if (num_handles > std::numeric_limits<uint16_t>::max())
151 return false;
152 #if defined(OS_MACOSX) && !defined(OS_IOS)
153 // On OSX, we can have mach ports which are located in the extra header
154 // section.
155 using MachPortsEntry = Channel::Message::MachPortsEntry;
156 using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
157 CHECK(extra_header_size >=
158 sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry));
159 const MachPortsExtraHeader* mach_ports_header =
160 reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
161 size_t num_mach_ports = mach_ports_header->num_ports;
162 CHECK(num_mach_ports <= num_handles);
163 if (incoming_platform_handles_.size() + num_mach_ports < num_handles) {
164 handles->reset();
165 return true;
166 }
167
168 handles->reset(new PlatformHandleVector(num_handles));
169 const MachPortsEntry* mach_ports = mach_ports_header->entries;
170 for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
171 if (mach_port_index < num_mach_ports &&
172 mach_ports[mach_port_index].index == i) {
173 (*handles)->at(i) = PlatformHandle(
174 static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port));
175 CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH);
176 // These are actually just Mach port names until they're resolved from
177 // the remote process.
178 (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME;
179 mach_port_index++;
180 } else {
181 CHECK(!incoming_platform_handles_.empty());
182 (*handles)->at(i) = incoming_platform_handles_.front();
183 incoming_platform_handles_.pop_front();
184 }
185 }
186 #else
187 if (incoming_platform_handles_.size() < num_handles) {
188 handles->reset();
189 return true;
190 }
191
192 handles->reset(new PlatformHandleVector(num_handles));
193 for (size_t i = 0; i < num_handles; ++i) {
194 (*handles)->at(i) = incoming_platform_handles_.front();
195 incoming_platform_handles_.pop_front();
196 }
197 #endif
198
199 return true;
200 }
201
202 private:
~ChannelPosix()203 ~ChannelPosix() override {
204 DCHECK(!read_watcher_);
205 DCHECK(!write_watcher_);
206 for (auto handle : incoming_platform_handles_)
207 handle.CloseIfNecessary();
208 }
209
StartOnIOThread()210 void StartOnIOThread() {
211 DCHECK(!read_watcher_);
212 DCHECK(!write_watcher_);
213 read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
214 write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher);
215 base::MessageLoopForIO::current()->WatchFileDescriptor(
216 handle_.get().handle, true /* persistent */,
217 base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this);
218 base::MessageLoop::current()->AddDestructionObserver(this);
219 }
220
WaitForWriteOnIOThread()221 void WaitForWriteOnIOThread() {
222 base::AutoLock lock(write_lock_);
223 WaitForWriteOnIOThreadNoLock();
224 }
225
WaitForWriteOnIOThreadNoLock()226 void WaitForWriteOnIOThreadNoLock() {
227 if (pending_write_)
228 return;
229 if (!write_watcher_)
230 return;
231 if (io_task_runner_->RunsTasksOnCurrentThread()) {
232 pending_write_ = true;
233 base::MessageLoopForIO::current()->WatchFileDescriptor(
234 handle_.get().handle, false /* persistent */,
235 base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this);
236 } else {
237 io_task_runner_->PostTask(
238 FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this));
239 }
240 }
241
ShutDownOnIOThread()242 void ShutDownOnIOThread() {
243 base::MessageLoop::current()->RemoveDestructionObserver(this);
244
245 read_watcher_.reset();
246 write_watcher_.reset();
247 if (leak_handle_)
248 ignore_result(handle_.release());
249 handle_.reset();
250 #if defined(OS_MACOSX)
251 handles_to_close_.reset();
252 #endif
253
254 // May destroy the |this| if it was the last reference.
255 self_ = nullptr;
256 }
257
258 // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()259 void WillDestroyCurrentMessageLoop() override {
260 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
261 if (self_)
262 ShutDownOnIOThread();
263 }
264
265 // base::MessageLoopForIO::Watcher:
OnFileCanReadWithoutBlocking(int fd)266 void OnFileCanReadWithoutBlocking(int fd) override {
267 CHECK_EQ(fd, handle_.get().handle);
268
269 bool read_error = false;
270 size_t next_read_size = 0;
271 size_t buffer_capacity = 0;
272 size_t total_bytes_read = 0;
273 size_t bytes_read = 0;
274 do {
275 buffer_capacity = next_read_size;
276 char* buffer = GetReadBuffer(&buffer_capacity);
277 DCHECK_GT(buffer_capacity, 0u);
278
279 ssize_t read_result = PlatformChannelRecvmsg(
280 handle_.get(),
281 buffer,
282 buffer_capacity,
283 &incoming_platform_handles_);
284
285 if (read_result > 0) {
286 bytes_read = static_cast<size_t>(read_result);
287 total_bytes_read += bytes_read;
288 if (!OnReadComplete(bytes_read, &next_read_size)) {
289 read_error = true;
290 break;
291 }
292 } else if (read_result == 0 ||
293 (errno != EAGAIN && errno != EWOULDBLOCK)) {
294 read_error = true;
295 break;
296 }
297 } while (bytes_read == buffer_capacity &&
298 total_bytes_read < kMaxBatchReadCapacity &&
299 next_read_size > 0);
300 if (read_error) {
301 // Stop receiving read notifications.
302 read_watcher_.reset();
303
304 OnError();
305 }
306 }
307
OnFileCanWriteWithoutBlocking(int fd)308 void OnFileCanWriteWithoutBlocking(int fd) override {
309 bool write_error = false;
310 {
311 base::AutoLock lock(write_lock_);
312 pending_write_ = false;
313 if (!FlushOutgoingMessagesNoLock())
314 reject_writes_ = write_error = true;
315 }
316 if (write_error)
317 OnError();
318 }
319
320 // Attempts to write a message directly to the channel. If the full message
321 // cannot be written, it's queued and a wait is initiated to write the message
322 // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)323 bool WriteNoLock(MessageView message_view) {
324 size_t bytes_written = 0;
325 do {
326 message_view.advance_data_offset(bytes_written);
327
328 ssize_t result;
329 ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles();
330 if (handles && handles->size()) {
331 iovec iov = {
332 const_cast<void*>(message_view.data()),
333 message_view.data_num_bytes()
334 };
335 // TODO: Handle lots of handles.
336 result = PlatformChannelSendmsgWithHandles(
337 handle_.get(), &iov, 1, handles->data(), handles->size());
338 if (result >= 0) {
339 #if defined(OS_MACOSX)
340 // There is a bug on OSX which makes it dangerous to close
341 // a file descriptor while it is in transit. So instead we
342 // store the file descriptor in a set and send a message to
343 // the recipient, which is queued AFTER the message that
344 // sent the FD. The recipient will reply to the message,
345 // letting us know that it is now safe to close the file
346 // descriptor. For more information, see:
347 // http://crbug.com/298276
348 std::vector<int> fds;
349 for (auto& handle : *handles)
350 fds.push_back(handle.handle);
351 {
352 base::AutoLock l(handles_to_close_lock_);
353 for (auto& handle : *handles)
354 handles_to_close_->push_back(handle);
355 }
356 MessagePtr fds_message(
357 new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
358 Message::Header::MessageType::HANDLES_SENT));
359 memcpy(fds_message->mutable_payload(), fds.data(),
360 sizeof(fds[0]) * fds.size());
361 outgoing_messages_.emplace_back(std::move(fds_message), 0);
362 handles->clear();
363 #else
364 handles.reset();
365 #endif // defined(OS_MACOSX)
366 }
367 } else {
368 result = PlatformChannelWrite(handle_.get(), message_view.data(),
369 message_view.data_num_bytes());
370 }
371
372 if (result < 0) {
373 if (errno != EAGAIN && errno != EWOULDBLOCK)
374 return false;
375 message_view.SetHandles(std::move(handles));
376 outgoing_messages_.emplace_front(std::move(message_view));
377 WaitForWriteOnIOThreadNoLock();
378 return true;
379 }
380
381 bytes_written = static_cast<size_t>(result);
382 } while (bytes_written < message_view.data_num_bytes());
383
384 return FlushOutgoingMessagesNoLock();
385 }
386
FlushOutgoingMessagesNoLock()387 bool FlushOutgoingMessagesNoLock() {
388 std::deque<MessageView> messages;
389 std::swap(outgoing_messages_, messages);
390
391 while (!messages.empty()) {
392 if (!WriteNoLock(std::move(messages.front())))
393 return false;
394
395 messages.pop_front();
396 if (!outgoing_messages_.empty()) {
397 // The message was requeued by WriteNoLock(), so we have to wait for
398 // pipe to become writable again. Repopulate the message queue and exit.
399 // If sending the message triggered any control messages, they may be
400 // in |outgoing_messages_| in addition to or instead of the message
401 // being sent.
402 std::swap(messages, outgoing_messages_);
403 while (!messages.empty()) {
404 outgoing_messages_.push_front(std::move(messages.back()));
405 messages.pop_back();
406 }
407 return true;
408 }
409 }
410
411 return true;
412 }
413
414 #if defined(OS_MACOSX)
OnControlMessage(Message::Header::MessageType message_type,const void * payload,size_t payload_size,ScopedPlatformHandleVectorPtr handles)415 bool OnControlMessage(Message::Header::MessageType message_type,
416 const void* payload,
417 size_t payload_size,
418 ScopedPlatformHandleVectorPtr handles) override {
419 switch (message_type) {
420 case Message::Header::MessageType::HANDLES_SENT: {
421 if (payload_size == 0)
422 break;
423 MessagePtr message(new Channel::Message(
424 payload_size, 0, Message::Header::MessageType::HANDLES_SENT_ACK));
425 memcpy(message->mutable_payload(), payload, payload_size);
426 Write(std::move(message));
427 return true;
428 }
429
430 case Message::Header::MessageType::HANDLES_SENT_ACK: {
431 size_t num_fds = payload_size / sizeof(int);
432 if (num_fds == 0 || payload_size % sizeof(int) != 0)
433 break;
434
435 const int* fds = reinterpret_cast<const int*>(payload);
436 if (!CloseHandles(fds, num_fds))
437 break;
438 return true;
439 }
440
441 default:
442 break;
443 }
444
445 return false;
446 }
447
448 // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
449 // |fds| does not match a sequence of handles in |handles_to_close_|.
CloseHandles(const int * fds,size_t num_fds)450 bool CloseHandles(const int* fds, size_t num_fds) {
451 base::AutoLock l(handles_to_close_lock_);
452 if (!num_fds)
453 return false;
454
455 auto start =
456 std::find_if(handles_to_close_->begin(), handles_to_close_->end(),
457 [&fds](const PlatformHandle& handle) {
458 return handle.handle == fds[0];
459 });
460 if (start == handles_to_close_->end())
461 return false;
462
463 auto it = start;
464 size_t i = 0;
465 // The FDs in the message should match a sequence of handles in
466 // |handles_to_close_|.
467 for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) {
468 if (it->handle != fds[i])
469 return false;
470
471 it->CloseIfNecessary();
472 }
473 if (i != num_fds)
474 return false;
475
476 handles_to_close_->erase(start, it);
477 return true;
478 }
479 #endif // defined(OS_MACOSX)
480
481 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
482 scoped_refptr<Channel> self_;
483
484 ScopedPlatformHandle handle_;
485 scoped_refptr<base::TaskRunner> io_task_runner_;
486
487 // These watchers must only be accessed on the IO thread.
488 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
489 std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
490
491 std::deque<PlatformHandle> incoming_platform_handles_;
492
493 // Protects |pending_write_| and |outgoing_messages_|.
494 base::Lock write_lock_;
495 bool pending_write_ = false;
496 bool reject_writes_ = false;
497 std::deque<MessageView> outgoing_messages_;
498
499 bool leak_handle_ = false;
500
501 #if defined(OS_MACOSX)
502 base::Lock handles_to_close_lock_;
503 ScopedPlatformHandleVectorPtr handles_to_close_;
504 #endif
505
506 DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
507 };
508
509 } // namespace
510
511 // static
Create(Delegate * delegate,ScopedPlatformHandle platform_handle,scoped_refptr<base::TaskRunner> io_task_runner)512 scoped_refptr<Channel> Channel::Create(
513 Delegate* delegate,
514 ScopedPlatformHandle platform_handle,
515 scoped_refptr<base::TaskRunner> io_task_runner) {
516 return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner);
517 }
518
519 } // namespace edk
520 } // namespace mojo
521