1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/channel.h"
6
7 #include <errno.h>
8 #include <sys/socket.h>
9
10 #include <algorithm>
11 #include <limits>
12 #include <memory>
13
14 #include "base/bind.h"
15 #include "base/containers/queue.h"
16 #include "base/location.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/message_loop/message_loop_current.h"
20 #include "base/message_loop/message_pump_for_io.h"
21 #include "base/synchronization/lock.h"
22 #include "base/task_runner.h"
23 #include "build/build_config.h"
24 #include "mojo/core/core.h"
25 #include "mojo/public/cpp/platform/socket_utils_posix.h"
26
27 #if !defined(OS_NACL)
28 #include <sys/uio.h>
29 #endif
30
31 #if defined(OS_MACOSX) && !defined(OS_IOS)
32 #include "mojo/core/mach_port_relay.h"
33 #endif
34
35 namespace mojo {
36 namespace core {
37
38 namespace {
39
40 const size_t kMaxBatchReadCapacity = 256 * 1024;
41
42 // A view over a Channel::Message object. The write queue uses these since
43 // large messages may need to be sent in chunks.
44 class MessageView {
45 public:
46 // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)47 MessageView(Channel::MessagePtr message, size_t offset)
48 : message_(std::move(message)),
49 offset_(offset),
50 handles_(message_->TakeHandlesForTransport()) {
51 DCHECK_GT(message_->data_num_bytes(), offset_);
52 }
53
MessageView(MessageView && other)54 MessageView(MessageView&& other) { *this = std::move(other); }
55
operator =(MessageView && other)56 MessageView& operator=(MessageView&& other) {
57 message_ = std::move(other.message_);
58 offset_ = other.offset_;
59 handles_ = std::move(other.handles_);
60 return *this;
61 }
62
~MessageView()63 ~MessageView() {}
64
data() const65 const void* data() const {
66 return static_cast<const char*>(message_->data()) + offset_;
67 }
68
data_num_bytes() const69 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
70
data_offset() const71 size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)72 void advance_data_offset(size_t num_bytes) {
73 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
74 offset_ += num_bytes;
75 }
76
TakeHandles()77 std::vector<PlatformHandleInTransit> TakeHandles() {
78 return std::move(handles_);
79 }
TakeMessage()80 Channel::MessagePtr TakeMessage() { return std::move(message_); }
81
SetHandles(std::vector<PlatformHandleInTransit> handles)82 void SetHandles(std::vector<PlatformHandleInTransit> handles) {
83 handles_ = std::move(handles);
84 }
85
86 private:
87 Channel::MessagePtr message_;
88 size_t offset_;
89 std::vector<PlatformHandleInTransit> handles_;
90
91 DISALLOW_COPY_AND_ASSIGN(MessageView);
92 };
93
94 class ChannelPosix : public Channel,
95 #if defined(OS_MACOSX) && !defined(OS_IOS)
96 public MachPortRelay::Observer,
97 #endif
98 public base::MessageLoopCurrent::DestructionObserver,
99 public base::MessagePumpForIO::FdWatcher {
100 public:
ChannelPosix(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)101 ChannelPosix(Delegate* delegate,
102 ConnectionParams connection_params,
103 scoped_refptr<base::TaskRunner> io_task_runner)
104 : Channel(delegate), self_(this), io_task_runner_(io_task_runner) {
105 if (connection_params.server_endpoint().is_valid())
106 server_ = connection_params.TakeServerEndpoint();
107 else
108 socket_ = connection_params.TakeEndpoint().TakePlatformHandle().TakeFD();
109
110 CHECK(server_.is_valid() || socket_.is_valid());
111 }
112
Start()113 void Start() override {
114 #if defined(OS_MACOSX) && !defined(OS_IOS)
115 auto* relay = Core::Get()->GetMachPortRelay();
116 if (relay) {
117 // We should only have a relay if we know the remote process handle,
118 // because that means we're in the broker process.
119 relay->AddObserver(this);
120 }
121 #endif
122
123 if (io_task_runner_->RunsTasksInCurrentSequence()) {
124 StartOnIOThread();
125 } else {
126 io_task_runner_->PostTask(
127 FROM_HERE, base::BindOnce(&ChannelPosix::StartOnIOThread, this));
128 }
129 }
130
ShutDownImpl()131 void ShutDownImpl() override {
132 // Always shut down asynchronously when called through the public interface.
133 io_task_runner_->PostTask(
134 FROM_HERE, base::BindOnce(&ChannelPosix::ShutDownOnIOThread, this));
135 }
136
Write(MessagePtr message)137 void Write(MessagePtr message) override {
138 #if defined(OS_MACOSX) && !defined(OS_IOS)
139 // If this message has Mach ports and we have a MachPortRelay, use the relay
140 // to rewrite the ports as receive rights from which the send right can be
141 // read. See |MachPortRelay::SendPortsToProcess()|.
142 //
143 // Note that if we don't have a relay, the receiving process must, and they
144 // must also have the ability to extract a send right from the ports that
145 // are already attached.
146 MachPortRelay* relay = Core::Get()->GetMachPortRelay();
147 if (relay && remote_process().is_valid() && message->has_mach_ports()) {
148 if (relay->port_provider()->TaskForPid(remote_process().get()) ==
149 MACH_PORT_NULL) {
150 // We also need to have a task port for the remote process before we can
151 // send it any other ports. If we don't have one yet, queue the message
152 // until OnProcessReady() is invoked.
153 base::AutoLock lock(task_port_wait_lock_);
154 pending_outgoing_with_mach_ports_.emplace_back(std::move(message));
155 return;
156 }
157
158 relay->SendPortsToProcess(message.get(), remote_process().get());
159 }
160 #endif
161
162 bool write_error = false;
163 {
164 base::AutoLock lock(write_lock_);
165 if (reject_writes_)
166 return;
167 if (outgoing_messages_.empty()) {
168 if (!WriteNoLock(MessageView(std::move(message), 0)))
169 reject_writes_ = write_error = true;
170 } else {
171 outgoing_messages_.emplace_back(std::move(message), 0);
172 }
173 }
174 if (write_error) {
175 // Invoke OnWriteError() asynchronously on the IO thread, in case Write()
176 // was called by the delegate, in which case we should not re-enter it.
177 io_task_runner_->PostTask(
178 FROM_HERE, base::BindOnce(&ChannelPosix::OnWriteError, this,
179 Error::kDisconnected));
180 }
181 }
182
LeakHandle()183 void LeakHandle() override {
184 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
185 leak_handle_ = true;
186 }
187
GetReadPlatformHandles(const void * payload,size_t payload_size,size_t num_handles,const void * extra_header,size_t extra_header_size,std::vector<PlatformHandle> * handles,bool * deferred)188 bool GetReadPlatformHandles(const void* payload,
189 size_t payload_size,
190 size_t num_handles,
191 const void* extra_header,
192 size_t extra_header_size,
193 std::vector<PlatformHandle>* handles,
194 bool* deferred) override {
195 if (num_handles > std::numeric_limits<uint16_t>::max())
196 return false;
197 #if defined(OS_MACOSX) && !defined(OS_IOS)
198 // On OSX, we can have mach ports which are located in the extra header
199 // section.
200 using MachPortsEntry = Channel::Message::MachPortsEntry;
201 using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
202 if (extra_header_size <
203 sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)) {
204 return false;
205 }
206 const MachPortsExtraHeader* mach_ports_header =
207 reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
208 size_t num_mach_ports = mach_ports_header->num_ports;
209 if (num_mach_ports > num_handles)
210 return false;
211 if (incoming_fds_.size() + num_mach_ports < num_handles)
212 return true;
213
214 std::vector<PlatformHandleInTransit> handles_in_transit(num_handles);
215 const MachPortsEntry* mach_ports = mach_ports_header->entries;
216
217 // If we know the remote process handle, we assume all incoming Mach ports
218 // are send right references owned by the remote process. Otherwise they're
219 // receive ports we can use to read a send right.
220 const bool extract_send_rights = remote_process().is_valid();
221 for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
222 if (mach_port_index < num_mach_ports &&
223 mach_ports[mach_port_index].index == i) {
224 mach_port_t port_name =
225 static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port);
226 if (extract_send_rights) {
227 handles_in_transit[i] =
228 PlatformHandleInTransit::CreateForMachPortName(port_name);
229 } else {
230 handles_in_transit[i] = PlatformHandleInTransit(
231 PlatformHandle(MachPortRelay::ReceiveSendRight(
232 base::mac::ScopedMachReceiveRight(port_name))));
233 }
234 mach_port_index++;
235 } else {
236 if (incoming_fds_.empty())
237 return false;
238 handles_in_transit[i] = PlatformHandleInTransit(
239 PlatformHandle(std::move(incoming_fds_.front())));
240 incoming_fds_.pop_front();
241 }
242 }
243 if (extract_send_rights && num_mach_ports) {
244 MachPortRelay* relay = Core::Get()->GetMachPortRelay();
245 DCHECK(relay);
246 // Extracting send rights requires that we have a task port for the
247 // remote process, which we may not yet have.
248 if (relay->port_provider()->TaskForPid(remote_process().get()) !=
249 MACH_PORT_NULL) {
250 // We do have a task port, so extract the send rights immediately.
251 for (auto& handle : handles_in_transit) {
252 if (handle.is_mach_port_name()) {
253 handle = PlatformHandleInTransit(PlatformHandle(relay->ExtractPort(
254 handle.mach_port_name(), remote_process().get())));
255 }
256 }
257 } else {
258 // No task port, we have to defer this message.
259 *deferred = true;
260 base::AutoLock lock(task_port_wait_lock_);
261 std::vector<uint8_t> data(payload_size);
262 memcpy(data.data(), payload, payload_size);
263 pending_incoming_with_mach_ports_.emplace_back(
264 std::move(data), std::move(handles_in_transit));
265 return true;
266 }
267 }
268
269 handles->resize(handles_in_transit.size());
270 for (size_t i = 0; i < handles->size(); ++i)
271 handles->at(i) = handles_in_transit[i].TakeHandle();
272 #else
273 if (incoming_fds_.size() < num_handles)
274 return true;
275
276 handles->resize(num_handles);
277 for (size_t i = 0; i < num_handles; ++i) {
278 handles->at(i) = PlatformHandle(std::move(incoming_fds_.front()));
279 incoming_fds_.pop_front();
280 }
281 #endif
282
283 return true;
284 }
285
286 private:
~ChannelPosix()287 ~ChannelPosix() override {
288 DCHECK(!read_watcher_);
289 DCHECK(!write_watcher_);
290 }
291
StartOnIOThread()292 void StartOnIOThread() {
293 DCHECK(!read_watcher_);
294 DCHECK(!write_watcher_);
295 read_watcher_.reset(
296 new base::MessagePumpForIO::FdWatchController(FROM_HERE));
297 base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
298 if (server_.is_valid()) {
299 base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
300 server_.platform_handle().GetFD().get(), false /* persistent */,
301 base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
302 } else {
303 write_watcher_.reset(
304 new base::MessagePumpForIO::FdWatchController(FROM_HERE));
305 base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
306 socket_.get(), true /* persistent */,
307 base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
308 base::AutoLock lock(write_lock_);
309 FlushOutgoingMessagesNoLock();
310 }
311 }
312
WaitForWriteOnIOThread()313 void WaitForWriteOnIOThread() {
314 base::AutoLock lock(write_lock_);
315 WaitForWriteOnIOThreadNoLock();
316 }
317
WaitForWriteOnIOThreadNoLock()318 void WaitForWriteOnIOThreadNoLock() {
319 if (pending_write_)
320 return;
321 if (!write_watcher_)
322 return;
323 if (io_task_runner_->RunsTasksInCurrentSequence()) {
324 pending_write_ = true;
325 base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
326 socket_.get(), false /* persistent */,
327 base::MessagePumpForIO::WATCH_WRITE, write_watcher_.get(), this);
328 } else {
329 io_task_runner_->PostTask(
330 FROM_HERE,
331 base::BindOnce(&ChannelPosix::WaitForWriteOnIOThread, this));
332 }
333 }
334
ShutDownOnIOThread()335 void ShutDownOnIOThread() {
336 base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
337
338 read_watcher_.reset();
339 write_watcher_.reset();
340 if (leak_handle_) {
341 ignore_result(socket_.release());
342 server_.TakePlatformHandle().release();
343 } else {
344 socket_.reset();
345 ignore_result(server_.TakePlatformHandle());
346 }
347 #if defined(OS_MACOSX)
348 fds_to_close_.clear();
349 #endif
350
351 #if defined(OS_MACOSX) && !defined(OS_IOS)
352 auto* relay = Core::Get()->GetMachPortRelay();
353 if (relay)
354 relay->RemoveObserver(this);
355 #endif
356
357 // May destroy the |this| if it was the last reference.
358 self_ = nullptr;
359 }
360
361 #if defined(OS_MACOSX) && !defined(OS_IOS)
362 // MachPortRelay::Observer:
OnProcessReady(base::ProcessHandle process)363 void OnProcessReady(base::ProcessHandle process) override {
364 if (process != remote_process().get())
365 return;
366
367 io_task_runner_->PostTask(
368 FROM_HERE,
369 base::BindOnce(
370 &ChannelPosix::FlushPendingMessagesWithMachPortsOnIOThread, this));
371 }
372
FlushPendingMessagesWithMachPortsOnIOThread()373 void FlushPendingMessagesWithMachPortsOnIOThread() {
374 // We have a task port for the remote process. Now we can send or accept
375 // any pending messages with Mach ports.
376 std::vector<RawIncomingMessage> incoming;
377 std::vector<MessagePtr> outgoing;
378 {
379 base::AutoLock lock(task_port_wait_lock_);
380 if (reject_incoming_messages_with_mach_ports_)
381 return;
382 std::swap(pending_incoming_with_mach_ports_, incoming);
383 std::swap(pending_outgoing_with_mach_ports_, outgoing);
384 }
385
386 DCHECK(remote_process().is_valid());
387 base::ProcessHandle process = remote_process().get();
388 MachPortRelay* relay = Core::Get()->GetMachPortRelay();
389 DCHECK(relay);
390 for (auto& message : incoming) {
391 Channel::Delegate* d = delegate();
392 if (!d)
393 break;
394 std::vector<PlatformHandle> handles(message.handles.size());
395 for (size_t i = 0; i < message.handles.size(); ++i) {
396 if (message.handles[i].is_mach_port_name()) {
397 handles[i] = PlatformHandle(
398 relay->ExtractPort(message.handles[i].mach_port_name(), process));
399 } else {
400 DCHECK(!message.handles[i].owning_process().is_valid());
401 handles[i] = message.handles[i].TakeHandle();
402 }
403 }
404 d->OnChannelMessage(message.data.data(), message.data.size(),
405 std::move(handles));
406 }
407
408 for (auto& message : outgoing) {
409 relay->SendPortsToProcess(message.get(), process);
410 Write(std::move(message));
411 }
412 }
413 #endif
414
415 // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()416 void WillDestroyCurrentMessageLoop() override {
417 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
418 if (self_)
419 ShutDownOnIOThread();
420 }
421
422 // base::MessagePumpForIO::FdWatcher:
OnFileCanReadWithoutBlocking(int fd)423 void OnFileCanReadWithoutBlocking(int fd) override {
424 if (server_.is_valid()) {
425 CHECK_EQ(fd, server_.platform_handle().GetFD().get());
426 #if !defined(OS_NACL)
427 read_watcher_.reset();
428 base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
429
430 AcceptSocketConnection(server_.platform_handle().GetFD().get(), &socket_);
431 ignore_result(server_.TakePlatformHandle());
432 if (!socket_.is_valid()) {
433 OnError(Error::kConnectionFailed);
434 return;
435 }
436 StartOnIOThread();
437 #else
438 NOTREACHED();
439 #endif
440 return;
441 }
442 CHECK_EQ(fd, socket_.get());
443
444 bool validation_error = false;
445 bool read_error = false;
446 size_t next_read_size = 0;
447 size_t buffer_capacity = 0;
448 size_t total_bytes_read = 0;
449 size_t bytes_read = 0;
450 do {
451 buffer_capacity = next_read_size;
452 char* buffer = GetReadBuffer(&buffer_capacity);
453 DCHECK_GT(buffer_capacity, 0u);
454
455 std::vector<base::ScopedFD> incoming_fds;
456 ssize_t read_result =
457 SocketRecvmsg(socket_.get(), buffer, buffer_capacity, &incoming_fds);
458 for (auto& fd : incoming_fds)
459 incoming_fds_.emplace_back(std::move(fd));
460
461 if (read_result > 0) {
462 bytes_read = static_cast<size_t>(read_result);
463 total_bytes_read += bytes_read;
464 if (!OnReadComplete(bytes_read, &next_read_size)) {
465 read_error = true;
466 validation_error = true;
467 break;
468 }
469 } else if (read_result == 0 ||
470 (errno != EAGAIN && errno != EWOULDBLOCK)) {
471 read_error = true;
472 break;
473 }
474 } while (bytes_read == buffer_capacity &&
475 total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0);
476 if (read_error) {
477 // Stop receiving read notifications.
478 read_watcher_.reset();
479 if (validation_error)
480 OnError(Error::kReceivedMalformedData);
481 else
482 OnError(Error::kDisconnected);
483 }
484 }
485
OnFileCanWriteWithoutBlocking(int fd)486 void OnFileCanWriteWithoutBlocking(int fd) override {
487 bool write_error = false;
488 {
489 base::AutoLock lock(write_lock_);
490 pending_write_ = false;
491 if (!FlushOutgoingMessagesNoLock())
492 reject_writes_ = write_error = true;
493 }
494 if (write_error)
495 OnWriteError(Error::kDisconnected);
496 }
497
498 // Attempts to write a message directly to the channel. If the full message
499 // cannot be written, it's queued and a wait is initiated to write the message
500 // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)501 bool WriteNoLock(MessageView message_view) {
502 if (server_.is_valid()) {
503 outgoing_messages_.emplace_front(std::move(message_view));
504 return true;
505 }
506 size_t bytes_written = 0;
507 do {
508 message_view.advance_data_offset(bytes_written);
509
510 ssize_t result;
511 std::vector<PlatformHandleInTransit> handles = message_view.TakeHandles();
512 if (!handles.empty()) {
513 iovec iov = {const_cast<void*>(message_view.data()),
514 message_view.data_num_bytes()};
515 std::vector<base::ScopedFD> fds(handles.size());
516 for (size_t i = 0; i < handles.size(); ++i)
517 fds[i] = handles[i].TakeHandle().TakeFD();
518 // TODO: Handle lots of handles.
519 result = SendmsgWithHandles(socket_.get(), &iov, 1, fds);
520 if (result >= 0) {
521 #if defined(OS_MACOSX)
522 // There is a bug on OSX which makes it dangerous to close
523 // a file descriptor while it is in transit. So instead we
524 // store the file descriptor in a set and send a message to
525 // the recipient, which is queued AFTER the message that
526 // sent the FD. The recipient will reply to the message,
527 // letting us know that it is now safe to close the file
528 // descriptor. For more information, see:
529 // http://crbug.com/298276
530 MessagePtr fds_message(
531 new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
532 Message::MessageType::HANDLES_SENT));
533 memcpy(fds_message->mutable_payload(), fds.data(),
534 sizeof(fds[0]) * fds.size());
535 outgoing_messages_.emplace_back(std::move(fds_message), 0);
536 {
537 base::AutoLock l(fds_to_close_lock_);
538 for (auto& fd : fds)
539 fds_to_close_.emplace_back(std::move(fd));
540 }
541 #endif // defined(OS_MACOSX)
542 } else {
543 // Message transmission failed, so pull the FDs back into |handles|
544 // so they can be held by the Message again.
545 for (size_t i = 0; i < fds.size(); ++i) {
546 handles[i] =
547 PlatformHandleInTransit(PlatformHandle(std::move(fds[i])));
548 }
549 }
550 } else {
551 result = SocketWrite(socket_.get(), message_view.data(),
552 message_view.data_num_bytes());
553 }
554
555 if (result < 0) {
556 if (errno != EAGAIN &&
557 errno != EWOULDBLOCK
558 #if defined(OS_MACOSX)
559 // On OS X if sendmsg() is trying to send fds between processes and
560 // there isn't enough room in the output buffer to send the fd
561 // structure over atomically then EMSGSIZE is returned.
562 //
563 // EMSGSIZE presents a problem since the system APIs can only call
564 // us when there's room in the socket buffer and not when there is
565 // "enough" room.
566 //
567 // The current behavior is to return to the event loop when EMSGSIZE
568 // is received and hopefull service another FD. This is however
569 // still technically a busy wait since the event loop will call us
570 // right back until the receiver has read enough data to allow
571 // passing the FD over atomically.
572 && errno != EMSGSIZE
573 #endif
574 ) {
575 return false;
576 }
577 message_view.SetHandles(std::move(handles));
578 outgoing_messages_.emplace_front(std::move(message_view));
579 WaitForWriteOnIOThreadNoLock();
580 return true;
581 }
582
583 bytes_written = static_cast<size_t>(result);
584 } while (bytes_written < message_view.data_num_bytes());
585
586 return FlushOutgoingMessagesNoLock();
587 }
588
FlushOutgoingMessagesNoLock()589 bool FlushOutgoingMessagesNoLock() {
590 base::circular_deque<MessageView> messages;
591 std::swap(outgoing_messages_, messages);
592
593 while (!messages.empty()) {
594 if (!WriteNoLock(std::move(messages.front())))
595 return false;
596
597 messages.pop_front();
598 if (!outgoing_messages_.empty()) {
599 // The message was requeued by WriteNoLock(), so we have to wait for
600 // pipe to become writable again. Repopulate the message queue and exit.
601 // If sending the message triggered any control messages, they may be
602 // in |outgoing_messages_| in addition to or instead of the message
603 // being sent.
604 std::swap(messages, outgoing_messages_);
605 while (!messages.empty()) {
606 outgoing_messages_.push_front(std::move(messages.back()));
607 messages.pop_back();
608 }
609 return true;
610 }
611 }
612
613 return true;
614 }
615
616 #if defined(OS_MACOSX)
OnControlMessage(Message::MessageType message_type,const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)617 bool OnControlMessage(Message::MessageType message_type,
618 const void* payload,
619 size_t payload_size,
620 std::vector<PlatformHandle> handles) override {
621 switch (message_type) {
622 case Message::MessageType::HANDLES_SENT: {
623 if (payload_size == 0)
624 break;
625 MessagePtr message(new Channel::Message(
626 payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
627 memcpy(message->mutable_payload(), payload, payload_size);
628 Write(std::move(message));
629 return true;
630 }
631
632 case Message::MessageType::HANDLES_SENT_ACK: {
633 size_t num_fds = payload_size / sizeof(int);
634 if (num_fds == 0 || payload_size % sizeof(int) != 0)
635 break;
636
637 const int* fds = reinterpret_cast<const int*>(payload);
638 if (!CloseHandles(fds, num_fds))
639 break;
640 return true;
641 }
642
643 default:
644 break;
645 }
646
647 return false;
648 }
649
650 // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
651 // |fds| does not match a sequence of handles in |fds_to_close_|.
CloseHandles(const int * fds,size_t num_fds)652 bool CloseHandles(const int* fds, size_t num_fds) {
653 base::AutoLock l(fds_to_close_lock_);
654 if (!num_fds)
655 return false;
656
657 auto start = std::find_if(
658 fds_to_close_.begin(), fds_to_close_.end(),
659 [&fds](const base::ScopedFD& fd) { return fd.get() == fds[0]; });
660 if (start == fds_to_close_.end())
661 return false;
662
663 auto it = start;
664 size_t i = 0;
665 // The FDs in the message should match a sequence of handles in
666 // |fds_to_close_|.
667 // TODO(wez): Consider making |fds_to_close_| a circular_deque<>
668 // for greater efficiency? Or assign a unique Id to each FD-containing
669 // message, and map that to a vector of FDs to close, to avoid the
670 // need for this traversal? Id could even be the first FD in the message.
671 for (; i < num_fds && it != fds_to_close_.end(); i++, ++it) {
672 if (it->get() != fds[i])
673 return false;
674 }
675 if (i != num_fds)
676 return false;
677
678 // Close the FDs by erase()ing their ScopedFDs.
679 fds_to_close_.erase(start, it);
680 return true;
681 }
682 #endif // defined(OS_MACOSX)
683
OnWriteError(Error error)684 void OnWriteError(Error error) {
685 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
686 DCHECK(reject_writes_);
687
688 if (error == Error::kDisconnected) {
689 // If we can't write because the pipe is disconnected then continue
690 // reading to fetch any in-flight messages, relying on end-of-stream to
691 // signal the actual disconnection.
692 if (read_watcher_) {
693 write_watcher_.reset();
694 return;
695 }
696 }
697
698 OnError(error);
699 }
700
701 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
702 scoped_refptr<Channel> self_;
703
704 // We may be initialized with a server socket, in which case this will be
705 // valid until it accepts an incoming connection.
706 PlatformChannelServerEndpoint server_;
707
708 // The socket over which to communicate. May be passed in at construction time
709 // or accepted over |server_|.
710 base::ScopedFD socket_;
711
712 scoped_refptr<base::TaskRunner> io_task_runner_;
713
714 // These watchers must only be accessed on the IO thread.
715 std::unique_ptr<base::MessagePumpForIO::FdWatchController> read_watcher_;
716 std::unique_ptr<base::MessagePumpForIO::FdWatchController> write_watcher_;
717
718 base::circular_deque<base::ScopedFD> incoming_fds_;
719
720 // Protects |pending_write_| and |outgoing_messages_|.
721 base::Lock write_lock_;
722 bool pending_write_ = false;
723 bool reject_writes_ = false;
724 base::circular_deque<MessageView> outgoing_messages_;
725
726 bool leak_handle_ = false;
727
728 #if defined(OS_MACOSX)
729 base::Lock fds_to_close_lock_;
730 std::vector<base::ScopedFD> fds_to_close_;
731 #if !defined(OS_IOS)
732 // Guards access to the send/receive queues below. These are messages that
733 // can't be fully accepted from or dispatched to the Channel user yet because
734 // we're still waiting on a task port for the remote process.
735 struct RawIncomingMessage {
RawIncomingMessagemojo::core::__anon7bef9a390111::ChannelPosix::RawIncomingMessage736 RawIncomingMessage(std::vector<uint8_t> data,
737 std::vector<PlatformHandleInTransit> handles)
738 : data(std::move(data)), handles(std::move(handles)) {}
739 RawIncomingMessage(RawIncomingMessage&&) = default;
740 ~RawIncomingMessage() = default;
741
742 std::vector<uint8_t> data;
743 std::vector<PlatformHandleInTransit> handles;
744 };
745
746 base::Lock task_port_wait_lock_;
747 bool reject_incoming_messages_with_mach_ports_ = false;
748 std::vector<MessagePtr> pending_outgoing_with_mach_ports_;
749 std::vector<RawIncomingMessage> pending_incoming_with_mach_ports_;
750 #endif // !defined(OS_IOS)
751 #endif // defined(OS_MACOSX)
752
753 DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
754 };
755
756 } // namespace
757
758 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)759 scoped_refptr<Channel> Channel::Create(
760 Delegate* delegate,
761 ConnectionParams connection_params,
762 scoped_refptr<base::TaskRunner> io_task_runner) {
763 return new ChannelPosix(delegate, std::move(connection_params),
764 io_task_runner);
765 }
766
767 } // namespace core
768 } // namespace mojo
769