• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/channel.h"
6 
7 #include <lib/fdio/limits.h>
8 #include <lib/fdio/util.h>
9 #include <lib/zx/channel.h>
10 #include <lib/zx/handle.h>
11 #include <zircon/processargs.h>
12 #include <zircon/status.h>
13 #include <zircon/syscalls.h>
14 #include <algorithm>
15 
16 #include "base/bind.h"
17 #include "base/containers/circular_deque.h"
18 #include "base/files/scoped_file.h"
19 #include "base/fuchsia/fuchsia_logging.h"
20 #include "base/location.h"
21 #include "base/macros.h"
22 #include "base/memory/ref_counted.h"
23 #include "base/message_loop/message_loop_current.h"
24 #include "base/message_loop/message_pump_for_io.h"
25 #include "base/stl_util.h"
26 #include "base/synchronization/lock.h"
27 #include "base/task_runner.h"
28 #include "mojo/core/platform_handle_in_transit.h"
29 
30 namespace mojo {
31 namespace core {
32 
33 namespace {
34 
35 const size_t kMaxBatchReadCapacity = 256 * 1024;
36 
UnwrapPlatformHandle(PlatformHandleInTransit handle,Channel::Message::HandleInfoEntry * info_out,std::vector<PlatformHandleInTransit> * handles_out)37 bool UnwrapPlatformHandle(PlatformHandleInTransit handle,
38                           Channel::Message::HandleInfoEntry* info_out,
39                           std::vector<PlatformHandleInTransit>* handles_out) {
40   DCHECK(handle.handle().is_valid());
41 
42   if (!handle.handle().is_valid_fd()) {
43     *info_out = {0u, 0u};
44     handles_out->emplace_back(std::move(handle));
45     return true;
46   }
47 
48   // Each FDIO file descriptor is implemented using one or more native resources
49   // and can be un-wrapped into a set of |handle| and |info| pairs, with |info|
50   // consisting of an FDIO-defined type & arguments (see zircon/processargs.h).
51   //
52   // We try to transfer the FD, but if that fails (for example if the file has
53   // already been dup()d into another FD) we may need to clone.
54   zx_handle_t handles[FDIO_MAX_HANDLES] = {};
55   uint32_t info[FDIO_MAX_HANDLES] = {};
56   zx_status_t result =
57       fdio_transfer_fd(handle.handle().GetFD().get(), 0, handles, info);
58   if (result > 0) {
59     // On success, the fd in |handle| has been transferred and is no longer
60     // valid. Release from the PlatformHandle to avoid close()ing an invalid
61     // an invalid handle.
62     handle.CompleteTransit();
63   } else if (result == ZX_ERR_UNAVAILABLE) {
64     // No luck, try cloning instead.
65     result = fdio_clone_fd(handle.handle().GetFD().get(), 0, handles, info);
66   }
67 
68   if (result <= 0) {
69     ZX_DLOG(ERROR, result) << "fdio_transfer_fd("
70                            << handle.handle().GetFD().get() << ")";
71     return false;
72   }
73   DCHECK_LE(result, FDIO_MAX_HANDLES);
74 
75   // We assume here that only the |PA_HND_TYPE| of the |info| really matters,
76   // and that that is the same for all the underlying handles.
77   *info_out = {PA_HND_TYPE(info[0]), result};
78   for (int i = 0; i < result; ++i) {
79     DCHECK_EQ(PA_HND_TYPE(info[0]), PA_HND_TYPE(info[i]));
80     DCHECK_EQ(0u, PA_HND_SUBTYPE(info[i]));
81     handles_out->emplace_back(
82         PlatformHandleInTransit(PlatformHandle(zx::handle(handles[i]))));
83   }
84 
85   return true;
86 }
87 
WrapPlatformHandles(Channel::Message::HandleInfoEntry info,base::circular_deque<zx::handle> * handles)88 PlatformHandle WrapPlatformHandles(Channel::Message::HandleInfoEntry info,
89                                    base::circular_deque<zx::handle>* handles) {
90   PlatformHandle out_handle;
91   if (!info.type) {
92     out_handle = PlatformHandle(std::move(handles->front()));
93     handles->pop_front();
94   } else {
95     if (info.count > FDIO_MAX_HANDLES)
96       return PlatformHandle();
97 
98     // Fetch the required number of handles from |handles| and set up type info.
99     zx_handle_t fd_handles[FDIO_MAX_HANDLES] = {};
100     uint32_t fd_infos[FDIO_MAX_HANDLES] = {};
101     for (int i = 0; i < info.count; ++i) {
102       fd_handles[i] = (*handles)[i].get();
103       fd_infos[i] = PA_HND(info.type, 0);
104     }
105 
106     // Try to wrap the handles into an FDIO file descriptor.
107     base::ScopedFD out_fd;
108     zx_status_t result =
109         fdio_create_fd(fd_handles, fd_infos, info.count, out_fd.receive());
110     if (result != ZX_OK) {
111       ZX_DLOG(ERROR, result) << "fdio_create_fd";
112       return PlatformHandle();
113     }
114 
115     // The handles are owned by FDIO now, so |release()| them before removing
116     // the entries from |handles|.
117     for (int i = 0; i < info.count; ++i) {
118       ignore_result(handles->front().release());
119       handles->pop_front();
120     }
121 
122     out_handle = PlatformHandle(std::move(out_fd));
123   }
124   return out_handle;
125 }
126 
127 // A view over a Channel::Message object. The write queue uses these since
128 // large messages may need to be sent in chunks.
129 class MessageView {
130  public:
131   // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)132   MessageView(Channel::MessagePtr message, size_t offset)
133       : message_(std::move(message)),
134         offset_(offset),
135         handles_(message_->TakeHandlesForTransport()) {
136     DCHECK_GT(message_->data_num_bytes(), offset_);
137   }
138 
MessageView(MessageView && other)139   MessageView(MessageView&& other) { *this = std::move(other); }
140 
operator =(MessageView && other)141   MessageView& operator=(MessageView&& other) {
142     message_ = std::move(other.message_);
143     offset_ = other.offset_;
144     handles_ = std::move(other.handles_);
145     return *this;
146   }
147 
~MessageView()148   ~MessageView() {}
149 
data() const150   const void* data() const {
151     return static_cast<const char*>(message_->data()) + offset_;
152   }
153 
data_num_bytes() const154   size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
155 
data_offset() const156   size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)157   void advance_data_offset(size_t num_bytes) {
158     DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
159     offset_ += num_bytes;
160   }
161 
TakeHandles()162   std::vector<PlatformHandleInTransit> TakeHandles() {
163     if (handles_.empty())
164       return std::vector<PlatformHandleInTransit>();
165 
166     // We can only pass Fuchsia handles via IPC, so unwrap any FDIO file-
167     // descriptors in |handles_| into the underlying handles, and serialize the
168     // metadata, if any, into the extra header.
169     auto* handles_info = reinterpret_cast<Channel::Message::HandleInfoEntry*>(
170         message_->mutable_extra_header());
171     memset(handles_info, 0, message_->extra_header_size());
172 
173     std::vector<PlatformHandleInTransit> in_handles = std::move(handles_);
174     handles_.reserve(in_handles.size());
175     for (size_t i = 0; i < in_handles.size(); i++) {
176       if (!UnwrapPlatformHandle(std::move(in_handles[i]), &handles_info[i],
177                                 &handles_))
178         return std::vector<PlatformHandleInTransit>();
179     }
180     return std::move(handles_);
181   }
182 
183  private:
184   Channel::MessagePtr message_;
185   size_t offset_;
186   std::vector<PlatformHandleInTransit> handles_;
187 
188   DISALLOW_COPY_AND_ASSIGN(MessageView);
189 };
190 
191 class ChannelFuchsia : public Channel,
192                        public base::MessageLoopCurrent::DestructionObserver,
193                        public base::MessagePumpForIO::ZxHandleWatcher {
194  public:
ChannelFuchsia(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)195   ChannelFuchsia(Delegate* delegate,
196                  ConnectionParams connection_params,
197                  scoped_refptr<base::TaskRunner> io_task_runner)
198       : Channel(delegate),
199         self_(this),
200         handle_(
201             connection_params.TakeEndpoint().TakePlatformHandle().TakeHandle()),
202         io_task_runner_(io_task_runner) {
203     CHECK(handle_.is_valid());
204   }
205 
Start()206   void Start() override {
207     if (io_task_runner_->RunsTasksInCurrentSequence()) {
208       StartOnIOThread();
209     } else {
210       io_task_runner_->PostTask(
211           FROM_HERE, base::BindOnce(&ChannelFuchsia::StartOnIOThread, this));
212     }
213   }
214 
ShutDownImpl()215   void ShutDownImpl() override {
216     // Always shut down asynchronously when called through the public interface.
217     io_task_runner_->PostTask(
218         FROM_HERE, base::BindOnce(&ChannelFuchsia::ShutDownOnIOThread, this));
219   }
220 
Write(MessagePtr message)221   void Write(MessagePtr message) override {
222     bool write_error = false;
223     {
224       base::AutoLock lock(write_lock_);
225       if (reject_writes_)
226         return;
227       if (!WriteNoLock(MessageView(std::move(message), 0)))
228         reject_writes_ = write_error = true;
229     }
230     if (write_error) {
231       // Do not synchronously invoke OnWriteError(). Write() may have been
232       // called by the delegate and we don't want to re-enter it.
233       io_task_runner_->PostTask(
234           FROM_HERE, base::BindOnce(&ChannelFuchsia::OnWriteError, this,
235                                     Error::kDisconnected));
236     }
237   }
238 
LeakHandle()239   void LeakHandle() override {
240     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
241     leak_handle_ = true;
242   }
243 
GetReadPlatformHandles(const void * payload,size_t payload_size,size_t num_handles,const void * extra_header,size_t extra_header_size,std::vector<PlatformHandle> * handles,bool * deferred)244   bool GetReadPlatformHandles(const void* payload,
245                               size_t payload_size,
246                               size_t num_handles,
247                               const void* extra_header,
248                               size_t extra_header_size,
249                               std::vector<PlatformHandle>* handles,
250                               bool* deferred) override {
251     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
252     if (num_handles > std::numeric_limits<uint16_t>::max())
253       return false;
254 
255     // Locate the handle info and verify there is enough of it.
256     if (!extra_header)
257       return false;
258     const auto* handles_info =
259         reinterpret_cast<const Channel::Message::HandleInfoEntry*>(
260             extra_header);
261     size_t handles_info_size = sizeof(handles_info[0]) * num_handles;
262     if (handles_info_size > extra_header_size)
263       return false;
264 
265     // Some caller-supplied handles may be FDIO file-descriptors, which were
266     // un-wrapped to more than one native platform resource handle for transfer.
267     // We may therefore need to expect more than |num_handles| handles to have
268     // been accumulated in |incoming_handles_|, based on the handle info.
269     size_t num_raw_handles = 0u;
270     for (size_t i = 0; i < num_handles; ++i)
271       num_raw_handles += handles_info[i].type ? handles_info[i].count : 1;
272 
273     // If there are too few handles then we're not ready yet, so return true
274     // indicating things are OK, but leave |handles| empty.
275     if (incoming_handles_.size() < num_raw_handles)
276       return true;
277 
278     handles->reserve(num_handles);
279     for (size_t i = 0; i < num_handles; ++i) {
280       handles->emplace_back(
281           WrapPlatformHandles(handles_info[i], &incoming_handles_));
282     }
283     return true;
284   }
285 
286  private:
~ChannelFuchsia()287   ~ChannelFuchsia() override { DCHECK(!read_watch_); }
288 
StartOnIOThread()289   void StartOnIOThread() {
290     DCHECK(!read_watch_);
291 
292     base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
293 
294     read_watch_.reset(
295         new base::MessagePumpForIO::ZxHandleWatchController(FROM_HERE));
296     base::MessageLoopCurrentForIO::Get()->WatchZxHandle(
297         handle_.get(), true /* persistent */,
298         ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, read_watch_.get(), this);
299   }
300 
ShutDownOnIOThread()301   void ShutDownOnIOThread() {
302     base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
303 
304     read_watch_.reset();
305     if (leak_handle_)
306       ignore_result(handle_.release());
307     handle_.reset();
308 
309     // May destroy the |this| if it was the last reference.
310     self_ = nullptr;
311   }
312 
313   // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()314   void WillDestroyCurrentMessageLoop() override {
315     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
316     if (self_)
317       ShutDownOnIOThread();
318   }
319 
320   // base::MessagePumpForIO::ZxHandleWatcher:
OnZxHandleSignalled(zx_handle_t handle,zx_signals_t signals)321   void OnZxHandleSignalled(zx_handle_t handle, zx_signals_t signals) override {
322     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
323     CHECK_EQ(handle, handle_.get());
324     DCHECK((ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED) & signals);
325 
326     // We always try to read message(s), even if ZX_CHANNEL_PEER_CLOSED, since
327     // the peer may have closed while messages were still unread, in the pipe.
328 
329     bool validation_error = false;
330     bool read_error = false;
331     size_t next_read_size = 0;
332     size_t buffer_capacity = 0;
333     size_t total_bytes_read = 0;
334     do {
335       buffer_capacity = next_read_size;
336       char* buffer = GetReadBuffer(&buffer_capacity);
337       DCHECK_GT(buffer_capacity, 0u);
338 
339       uint32_t bytes_read = 0;
340       uint32_t handles_read = 0;
341       zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
342 
343       zx_status_t read_result =
344           handle_.read(0, buffer, buffer_capacity, &bytes_read, handles,
345                        base::size(handles), &handles_read);
346       if (read_result == ZX_OK) {
347         for (size_t i = 0; i < handles_read; ++i) {
348           incoming_handles_.emplace_back(handles[i]);
349         }
350         total_bytes_read += bytes_read;
351         if (!OnReadComplete(bytes_read, &next_read_size)) {
352           read_error = true;
353           validation_error = true;
354           break;
355         }
356       } else if (read_result == ZX_ERR_BUFFER_TOO_SMALL) {
357         DCHECK_LE(handles_read, base::size(handles));
358         next_read_size = bytes_read;
359       } else if (read_result == ZX_ERR_SHOULD_WAIT) {
360         break;
361       } else {
362         ZX_DLOG_IF(ERROR, read_result != ZX_ERR_PEER_CLOSED, read_result)
363             << "zx_channel_read";
364         read_error = true;
365         break;
366       }
367     } while (total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0);
368     if (read_error) {
369       // Stop receiving read notifications.
370       read_watch_.reset();
371       if (validation_error)
372         OnError(Error::kReceivedMalformedData);
373       else
374         OnError(Error::kDisconnected);
375     }
376   }
377 
378   // Attempts to write a message directly to the channel. If the full message
379   // cannot be written, it's queued and a wait is initiated to write the message
380   // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)381   bool WriteNoLock(MessageView message_view) {
382     uint32_t write_bytes = 0;
383     do {
384       message_view.advance_data_offset(write_bytes);
385 
386       std::vector<PlatformHandleInTransit> outgoing_handles =
387           message_view.TakeHandles();
388       zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
389       size_t handles_count = outgoing_handles.size();
390 
391       DCHECK_LE(handles_count, base::size(handles));
392       for (size_t i = 0; i < handles_count; ++i) {
393         DCHECK(outgoing_handles[i].handle().is_valid());
394         handles[i] = outgoing_handles[i].handle().GetHandle().get();
395       }
396 
397       write_bytes = std::min(message_view.data_num_bytes(),
398                              static_cast<size_t>(ZX_CHANNEL_MAX_MSG_BYTES));
399       zx_status_t result = handle_.write(0, message_view.data(), write_bytes,
400                                          handles, handles_count);
401       // zx_channel_write() consumes |handles| whether or not it succeeds, so
402       // release() our copies now, to avoid them being double-closed.
403       for (auto& outgoing_handle : outgoing_handles)
404         outgoing_handle.CompleteTransit();
405 
406       if (result != ZX_OK) {
407         // TODO(fuchsia): Handle ZX_ERR_SHOULD_WAIT flow-control errors, once
408         // the platform starts generating them. See https://crbug.com/754084.
409         ZX_DLOG_IF(ERROR, result != ZX_ERR_PEER_CLOSED, result)
410             << "WriteNoLock(zx_channel_write)";
411         return false;
412       }
413 
414     } while (write_bytes < message_view.data_num_bytes());
415 
416     return true;
417   }
418 
OnWriteError(Error error)419   void OnWriteError(Error error) {
420     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
421     DCHECK(reject_writes_);
422 
423     if (error == Error::kDisconnected) {
424       // If we can't write because the pipe is disconnected then continue
425       // reading to fetch any in-flight messages, relying on end-of-stream to
426       // signal the actual disconnection.
427       if (read_watch_) {
428         // TODO: When we add flow-control for writes, we also need to reset the
429         // write-watcher here.
430         return;
431       }
432     }
433 
434     OnError(error);
435   }
436 
437   // Keeps the Channel alive at least until explicit shutdown on the IO thread.
438   scoped_refptr<Channel> self_;
439 
440   zx::channel handle_;
441   scoped_refptr<base::TaskRunner> io_task_runner_;
442 
443   // These members are only used on the IO thread.
444   std::unique_ptr<base::MessagePumpForIO::ZxHandleWatchController> read_watch_;
445   base::circular_deque<zx::handle> incoming_handles_;
446   bool leak_handle_ = false;
447 
448   base::Lock write_lock_;
449   bool reject_writes_ = false;
450 
451   DISALLOW_COPY_AND_ASSIGN(ChannelFuchsia);
452 };
453 
454 }  // namespace
455 
456 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)457 scoped_refptr<Channel> Channel::Create(
458     Delegate* delegate,
459     ConnectionParams connection_params,
460     scoped_refptr<base::TaskRunner> io_task_runner) {
461   return new ChannelFuchsia(delegate, std::move(connection_params),
462                             std::move(io_task_runner));
463 }
464 
465 }  // namespace core
466 }  // namespace mojo
467