1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/channel.h"
6
7 #include <lib/fdio/limits.h>
8 #include <lib/fdio/util.h>
9 #include <lib/zx/channel.h>
10 #include <lib/zx/handle.h>
11 #include <zircon/processargs.h>
12 #include <zircon/status.h>
13 #include <zircon/syscalls.h>
14 #include <algorithm>
15
16 #include "base/bind.h"
17 #include "base/containers/circular_deque.h"
18 #include "base/files/scoped_file.h"
19 #include "base/fuchsia/fuchsia_logging.h"
20 #include "base/location.h"
21 #include "base/macros.h"
22 #include "base/memory/ref_counted.h"
23 #include "base/message_loop/message_loop_current.h"
24 #include "base/message_loop/message_pump_for_io.h"
25 #include "base/stl_util.h"
26 #include "base/synchronization/lock.h"
27 #include "base/task_runner.h"
28 #include "mojo/core/platform_handle_in_transit.h"
29
30 namespace mojo {
31 namespace core {
32
33 namespace {
34
35 const size_t kMaxBatchReadCapacity = 256 * 1024;
36
UnwrapPlatformHandle(PlatformHandleInTransit handle,Channel::Message::HandleInfoEntry * info_out,std::vector<PlatformHandleInTransit> * handles_out)37 bool UnwrapPlatformHandle(PlatformHandleInTransit handle,
38 Channel::Message::HandleInfoEntry* info_out,
39 std::vector<PlatformHandleInTransit>* handles_out) {
40 DCHECK(handle.handle().is_valid());
41
42 if (!handle.handle().is_valid_fd()) {
43 *info_out = {0u, 0u};
44 handles_out->emplace_back(std::move(handle));
45 return true;
46 }
47
48 // Each FDIO file descriptor is implemented using one or more native resources
49 // and can be un-wrapped into a set of |handle| and |info| pairs, with |info|
50 // consisting of an FDIO-defined type & arguments (see zircon/processargs.h).
51 //
52 // We try to transfer the FD, but if that fails (for example if the file has
53 // already been dup()d into another FD) we may need to clone.
54 zx_handle_t handles[FDIO_MAX_HANDLES] = {};
55 uint32_t info[FDIO_MAX_HANDLES] = {};
56 zx_status_t result =
57 fdio_transfer_fd(handle.handle().GetFD().get(), 0, handles, info);
58 if (result > 0) {
59 // On success, the fd in |handle| has been transferred and is no longer
60 // valid. Release from the PlatformHandle to avoid close()ing an invalid
61 // an invalid handle.
62 handle.CompleteTransit();
63 } else if (result == ZX_ERR_UNAVAILABLE) {
64 // No luck, try cloning instead.
65 result = fdio_clone_fd(handle.handle().GetFD().get(), 0, handles, info);
66 }
67
68 if (result <= 0) {
69 ZX_DLOG(ERROR, result) << "fdio_transfer_fd("
70 << handle.handle().GetFD().get() << ")";
71 return false;
72 }
73 DCHECK_LE(result, FDIO_MAX_HANDLES);
74
75 // We assume here that only the |PA_HND_TYPE| of the |info| really matters,
76 // and that that is the same for all the underlying handles.
77 *info_out = {PA_HND_TYPE(info[0]), result};
78 for (int i = 0; i < result; ++i) {
79 DCHECK_EQ(PA_HND_TYPE(info[0]), PA_HND_TYPE(info[i]));
80 DCHECK_EQ(0u, PA_HND_SUBTYPE(info[i]));
81 handles_out->emplace_back(
82 PlatformHandleInTransit(PlatformHandle(zx::handle(handles[i]))));
83 }
84
85 return true;
86 }
87
WrapPlatformHandles(Channel::Message::HandleInfoEntry info,base::circular_deque<zx::handle> * handles)88 PlatformHandle WrapPlatformHandles(Channel::Message::HandleInfoEntry info,
89 base::circular_deque<zx::handle>* handles) {
90 PlatformHandle out_handle;
91 if (!info.type) {
92 out_handle = PlatformHandle(std::move(handles->front()));
93 handles->pop_front();
94 } else {
95 if (info.count > FDIO_MAX_HANDLES)
96 return PlatformHandle();
97
98 // Fetch the required number of handles from |handles| and set up type info.
99 zx_handle_t fd_handles[FDIO_MAX_HANDLES] = {};
100 uint32_t fd_infos[FDIO_MAX_HANDLES] = {};
101 for (int i = 0; i < info.count; ++i) {
102 fd_handles[i] = (*handles)[i].get();
103 fd_infos[i] = PA_HND(info.type, 0);
104 }
105
106 // Try to wrap the handles into an FDIO file descriptor.
107 base::ScopedFD out_fd;
108 zx_status_t result =
109 fdio_create_fd(fd_handles, fd_infos, info.count, out_fd.receive());
110 if (result != ZX_OK) {
111 ZX_DLOG(ERROR, result) << "fdio_create_fd";
112 return PlatformHandle();
113 }
114
115 // The handles are owned by FDIO now, so |release()| them before removing
116 // the entries from |handles|.
117 for (int i = 0; i < info.count; ++i) {
118 ignore_result(handles->front().release());
119 handles->pop_front();
120 }
121
122 out_handle = PlatformHandle(std::move(out_fd));
123 }
124 return out_handle;
125 }
126
127 // A view over a Channel::Message object. The write queue uses these since
128 // large messages may need to be sent in chunks.
129 class MessageView {
130 public:
131 // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)132 MessageView(Channel::MessagePtr message, size_t offset)
133 : message_(std::move(message)),
134 offset_(offset),
135 handles_(message_->TakeHandlesForTransport()) {
136 DCHECK_GT(message_->data_num_bytes(), offset_);
137 }
138
MessageView(MessageView && other)139 MessageView(MessageView&& other) { *this = std::move(other); }
140
operator =(MessageView && other)141 MessageView& operator=(MessageView&& other) {
142 message_ = std::move(other.message_);
143 offset_ = other.offset_;
144 handles_ = std::move(other.handles_);
145 return *this;
146 }
147
~MessageView()148 ~MessageView() {}
149
data() const150 const void* data() const {
151 return static_cast<const char*>(message_->data()) + offset_;
152 }
153
data_num_bytes() const154 size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
155
data_offset() const156 size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)157 void advance_data_offset(size_t num_bytes) {
158 DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
159 offset_ += num_bytes;
160 }
161
TakeHandles()162 std::vector<PlatformHandleInTransit> TakeHandles() {
163 if (handles_.empty())
164 return std::vector<PlatformHandleInTransit>();
165
166 // We can only pass Fuchsia handles via IPC, so unwrap any FDIO file-
167 // descriptors in |handles_| into the underlying handles, and serialize the
168 // metadata, if any, into the extra header.
169 auto* handles_info = reinterpret_cast<Channel::Message::HandleInfoEntry*>(
170 message_->mutable_extra_header());
171 memset(handles_info, 0, message_->extra_header_size());
172
173 std::vector<PlatformHandleInTransit> in_handles = std::move(handles_);
174 handles_.reserve(in_handles.size());
175 for (size_t i = 0; i < in_handles.size(); i++) {
176 if (!UnwrapPlatformHandle(std::move(in_handles[i]), &handles_info[i],
177 &handles_))
178 return std::vector<PlatformHandleInTransit>();
179 }
180 return std::move(handles_);
181 }
182
183 private:
184 Channel::MessagePtr message_;
185 size_t offset_;
186 std::vector<PlatformHandleInTransit> handles_;
187
188 DISALLOW_COPY_AND_ASSIGN(MessageView);
189 };
190
191 class ChannelFuchsia : public Channel,
192 public base::MessageLoopCurrent::DestructionObserver,
193 public base::MessagePumpForIO::ZxHandleWatcher {
194 public:
ChannelFuchsia(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)195 ChannelFuchsia(Delegate* delegate,
196 ConnectionParams connection_params,
197 scoped_refptr<base::TaskRunner> io_task_runner)
198 : Channel(delegate),
199 self_(this),
200 handle_(
201 connection_params.TakeEndpoint().TakePlatformHandle().TakeHandle()),
202 io_task_runner_(io_task_runner) {
203 CHECK(handle_.is_valid());
204 }
205
Start()206 void Start() override {
207 if (io_task_runner_->RunsTasksInCurrentSequence()) {
208 StartOnIOThread();
209 } else {
210 io_task_runner_->PostTask(
211 FROM_HERE, base::BindOnce(&ChannelFuchsia::StartOnIOThread, this));
212 }
213 }
214
ShutDownImpl()215 void ShutDownImpl() override {
216 // Always shut down asynchronously when called through the public interface.
217 io_task_runner_->PostTask(
218 FROM_HERE, base::BindOnce(&ChannelFuchsia::ShutDownOnIOThread, this));
219 }
220
Write(MessagePtr message)221 void Write(MessagePtr message) override {
222 bool write_error = false;
223 {
224 base::AutoLock lock(write_lock_);
225 if (reject_writes_)
226 return;
227 if (!WriteNoLock(MessageView(std::move(message), 0)))
228 reject_writes_ = write_error = true;
229 }
230 if (write_error) {
231 // Do not synchronously invoke OnWriteError(). Write() may have been
232 // called by the delegate and we don't want to re-enter it.
233 io_task_runner_->PostTask(
234 FROM_HERE, base::BindOnce(&ChannelFuchsia::OnWriteError, this,
235 Error::kDisconnected));
236 }
237 }
238
LeakHandle()239 void LeakHandle() override {
240 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
241 leak_handle_ = true;
242 }
243
GetReadPlatformHandles(const void * payload,size_t payload_size,size_t num_handles,const void * extra_header,size_t extra_header_size,std::vector<PlatformHandle> * handles,bool * deferred)244 bool GetReadPlatformHandles(const void* payload,
245 size_t payload_size,
246 size_t num_handles,
247 const void* extra_header,
248 size_t extra_header_size,
249 std::vector<PlatformHandle>* handles,
250 bool* deferred) override {
251 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
252 if (num_handles > std::numeric_limits<uint16_t>::max())
253 return false;
254
255 // Locate the handle info and verify there is enough of it.
256 if (!extra_header)
257 return false;
258 const auto* handles_info =
259 reinterpret_cast<const Channel::Message::HandleInfoEntry*>(
260 extra_header);
261 size_t handles_info_size = sizeof(handles_info[0]) * num_handles;
262 if (handles_info_size > extra_header_size)
263 return false;
264
265 // Some caller-supplied handles may be FDIO file-descriptors, which were
266 // un-wrapped to more than one native platform resource handle for transfer.
267 // We may therefore need to expect more than |num_handles| handles to have
268 // been accumulated in |incoming_handles_|, based on the handle info.
269 size_t num_raw_handles = 0u;
270 for (size_t i = 0; i < num_handles; ++i)
271 num_raw_handles += handles_info[i].type ? handles_info[i].count : 1;
272
273 // If there are too few handles then we're not ready yet, so return true
274 // indicating things are OK, but leave |handles| empty.
275 if (incoming_handles_.size() < num_raw_handles)
276 return true;
277
278 handles->reserve(num_handles);
279 for (size_t i = 0; i < num_handles; ++i) {
280 handles->emplace_back(
281 WrapPlatformHandles(handles_info[i], &incoming_handles_));
282 }
283 return true;
284 }
285
286 private:
~ChannelFuchsia()287 ~ChannelFuchsia() override { DCHECK(!read_watch_); }
288
StartOnIOThread()289 void StartOnIOThread() {
290 DCHECK(!read_watch_);
291
292 base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
293
294 read_watch_.reset(
295 new base::MessagePumpForIO::ZxHandleWatchController(FROM_HERE));
296 base::MessageLoopCurrentForIO::Get()->WatchZxHandle(
297 handle_.get(), true /* persistent */,
298 ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, read_watch_.get(), this);
299 }
300
ShutDownOnIOThread()301 void ShutDownOnIOThread() {
302 base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
303
304 read_watch_.reset();
305 if (leak_handle_)
306 ignore_result(handle_.release());
307 handle_.reset();
308
309 // May destroy the |this| if it was the last reference.
310 self_ = nullptr;
311 }
312
313 // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()314 void WillDestroyCurrentMessageLoop() override {
315 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
316 if (self_)
317 ShutDownOnIOThread();
318 }
319
320 // base::MessagePumpForIO::ZxHandleWatcher:
OnZxHandleSignalled(zx_handle_t handle,zx_signals_t signals)321 void OnZxHandleSignalled(zx_handle_t handle, zx_signals_t signals) override {
322 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
323 CHECK_EQ(handle, handle_.get());
324 DCHECK((ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED) & signals);
325
326 // We always try to read message(s), even if ZX_CHANNEL_PEER_CLOSED, since
327 // the peer may have closed while messages were still unread, in the pipe.
328
329 bool validation_error = false;
330 bool read_error = false;
331 size_t next_read_size = 0;
332 size_t buffer_capacity = 0;
333 size_t total_bytes_read = 0;
334 do {
335 buffer_capacity = next_read_size;
336 char* buffer = GetReadBuffer(&buffer_capacity);
337 DCHECK_GT(buffer_capacity, 0u);
338
339 uint32_t bytes_read = 0;
340 uint32_t handles_read = 0;
341 zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
342
343 zx_status_t read_result =
344 handle_.read(0, buffer, buffer_capacity, &bytes_read, handles,
345 base::size(handles), &handles_read);
346 if (read_result == ZX_OK) {
347 for (size_t i = 0; i < handles_read; ++i) {
348 incoming_handles_.emplace_back(handles[i]);
349 }
350 total_bytes_read += bytes_read;
351 if (!OnReadComplete(bytes_read, &next_read_size)) {
352 read_error = true;
353 validation_error = true;
354 break;
355 }
356 } else if (read_result == ZX_ERR_BUFFER_TOO_SMALL) {
357 DCHECK_LE(handles_read, base::size(handles));
358 next_read_size = bytes_read;
359 } else if (read_result == ZX_ERR_SHOULD_WAIT) {
360 break;
361 } else {
362 ZX_DLOG_IF(ERROR, read_result != ZX_ERR_PEER_CLOSED, read_result)
363 << "zx_channel_read";
364 read_error = true;
365 break;
366 }
367 } while (total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0);
368 if (read_error) {
369 // Stop receiving read notifications.
370 read_watch_.reset();
371 if (validation_error)
372 OnError(Error::kReceivedMalformedData);
373 else
374 OnError(Error::kDisconnected);
375 }
376 }
377
378 // Attempts to write a message directly to the channel. If the full message
379 // cannot be written, it's queued and a wait is initiated to write the message
380 // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)381 bool WriteNoLock(MessageView message_view) {
382 uint32_t write_bytes = 0;
383 do {
384 message_view.advance_data_offset(write_bytes);
385
386 std::vector<PlatformHandleInTransit> outgoing_handles =
387 message_view.TakeHandles();
388 zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
389 size_t handles_count = outgoing_handles.size();
390
391 DCHECK_LE(handles_count, base::size(handles));
392 for (size_t i = 0; i < handles_count; ++i) {
393 DCHECK(outgoing_handles[i].handle().is_valid());
394 handles[i] = outgoing_handles[i].handle().GetHandle().get();
395 }
396
397 write_bytes = std::min(message_view.data_num_bytes(),
398 static_cast<size_t>(ZX_CHANNEL_MAX_MSG_BYTES));
399 zx_status_t result = handle_.write(0, message_view.data(), write_bytes,
400 handles, handles_count);
401 // zx_channel_write() consumes |handles| whether or not it succeeds, so
402 // release() our copies now, to avoid them being double-closed.
403 for (auto& outgoing_handle : outgoing_handles)
404 outgoing_handle.CompleteTransit();
405
406 if (result != ZX_OK) {
407 // TODO(fuchsia): Handle ZX_ERR_SHOULD_WAIT flow-control errors, once
408 // the platform starts generating them. See https://crbug.com/754084.
409 ZX_DLOG_IF(ERROR, result != ZX_ERR_PEER_CLOSED, result)
410 << "WriteNoLock(zx_channel_write)";
411 return false;
412 }
413
414 } while (write_bytes < message_view.data_num_bytes());
415
416 return true;
417 }
418
OnWriteError(Error error)419 void OnWriteError(Error error) {
420 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
421 DCHECK(reject_writes_);
422
423 if (error == Error::kDisconnected) {
424 // If we can't write because the pipe is disconnected then continue
425 // reading to fetch any in-flight messages, relying on end-of-stream to
426 // signal the actual disconnection.
427 if (read_watch_) {
428 // TODO: When we add flow-control for writes, we also need to reset the
429 // write-watcher here.
430 return;
431 }
432 }
433
434 OnError(error);
435 }
436
437 // Keeps the Channel alive at least until explicit shutdown on the IO thread.
438 scoped_refptr<Channel> self_;
439
440 zx::channel handle_;
441 scoped_refptr<base::TaskRunner> io_task_runner_;
442
443 // These members are only used on the IO thread.
444 std::unique_ptr<base::MessagePumpForIO::ZxHandleWatchController> read_watch_;
445 base::circular_deque<zx::handle> incoming_handles_;
446 bool leak_handle_ = false;
447
448 base::Lock write_lock_;
449 bool reject_writes_ = false;
450
451 DISALLOW_COPY_AND_ASSIGN(ChannelFuchsia);
452 };
453
454 } // namespace
455
456 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)457 scoped_refptr<Channel> Channel::Create(
458 Delegate* delegate,
459 ConnectionParams connection_params,
460 scoped_refptr<base::TaskRunner> io_task_runner) {
461 return new ChannelFuchsia(delegate, std::move(connection_params),
462 std::move(io_task_runner));
463 }
464
465 } // namespace core
466 } // namespace mojo
467