• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef IPC_IPC_CHANNEL_PROXY_H_
6 #define IPC_IPC_CHANNEL_PROXY_H_
7 
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <string>
13 #include <vector>
14 
15 #include "base/callback.h"
16 #include "base/component_export.h"
17 #include "base/memory/ref_counted.h"
18 #include "base/sequence_checker.h"
19 #include "base/synchronization/lock.h"
20 #include "build/build_config.h"
21 #include "ipc/ipc_channel.h"
22 #include "ipc/ipc_channel_handle.h"
23 #include "ipc/ipc_listener.h"
24 #include "ipc/ipc_sender.h"
25 #include "mojo/public/cpp/bindings/associated_interface_ptr.h"
26 #include "mojo/public/cpp/bindings/associated_interface_request.h"
27 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
28 #include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h"
29 
30 namespace base {
31 class SingleThreadTaskRunner;
32 }
33 
34 namespace IPC {
35 
36 class ChannelFactory;
37 class MessageFilter;
38 class MessageFilterRouter;
39 
40 //-----------------------------------------------------------------------------
41 // IPC::ChannelProxy
42 //
43 // This class is a helper class that is useful when you wish to run an IPC
44 // channel on a background thread.  It provides you with the option of either
45 // handling IPC messages on that background thread or having them dispatched to
46 // your main thread (the thread on which the IPC::ChannelProxy is created).
47 //
48 // The API for an IPC::ChannelProxy is very similar to that of an IPC::Channel.
49 // When you send a message to an IPC::ChannelProxy, the message is routed to
50 // the background thread, where it is then passed to the IPC::Channel's Send
51 // method.  This means that you can send a message from your thread and your
52 // message will be sent over the IPC channel when possible instead of being
53 // delayed until your thread returns to its message loop.  (Often IPC messages
54 // will queue up on the IPC::Channel when there is a lot of traffic, and the
55 // channel will not get cycles to flush its message queue until the thread, on
56 // which it is running, returns to its message loop.)
57 //
58 // An IPC::ChannelProxy can have a MessageFilter associated with it, which will
59 // be notified of incoming messages on the IPC::Channel's thread.  This gives
60 // the consumer of IPC::ChannelProxy the ability to respond to incoming
61 // messages on this background thread instead of on their own thread, which may
62 // be bogged down with other processing.  The result can be greatly improved
63 // latency for messages that can be handled on a background thread.
64 //
65 // The consumer of IPC::ChannelProxy is responsible for allocating the Thread
66 // instance where the IPC::Channel will be created and operated.
67 //
68 // Thread-safe send
69 //
70 // If a particular |Channel| implementation has a thread-safe |Send()| operation
71 // then ChannelProxy skips the inter-thread hop and calls |Send()| directly. In
72 // this case the |channel_| variable is touched by multiple threads so
73 // |channel_lifetime_lock_| is used to protect it. The locking overhead is only
74 // paid if the underlying channel supports thread-safe |Send|.
75 //
COMPONENT_EXPORT(IPC)76 class COMPONENT_EXPORT(IPC) ChannelProxy : public Sender {
77  public:
78 #if defined(ENABLE_IPC_FUZZER)
79   // Interface for a filter to be imposed on outgoing messages which can
80   // re-write the message. Used for testing.
81   class OutgoingMessageFilter {
82    public:
83     virtual Message* Rewrite(Message* message) = 0;
84   };
85 #endif
86 
87   // Initializes a channel proxy.  The channel_handle and mode parameters are
88   // passed directly to the underlying IPC::Channel.  The listener is called on
89   // the thread that creates the ChannelProxy.  The filter's OnMessageReceived
90   // method is called on the thread where the IPC::Channel is running.  The
91   // filter may be null if the consumer is not interested in handling messages
92   // on the background thread.  Any message not handled by the filter will be
93   // dispatched to the listener.  The given task runner correspond to a thread
94   // on which IPC::Channel is created and used (e.g. IO thread).
95   static std::unique_ptr<ChannelProxy> Create(
96       const IPC::ChannelHandle& channel_handle,
97       Channel::Mode mode,
98       Listener* listener,
99       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
100       const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
101 
102   static std::unique_ptr<ChannelProxy> Create(
103       std::unique_ptr<ChannelFactory> factory,
104       Listener* listener,
105       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
106       const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
107 
108   // Constructs a ChannelProxy without initializing it.
109   ChannelProxy(
110       Listener* listener,
111       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
112       const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner);
113 
114   ~ChannelProxy() override;
115 
116   // Initializes the channel proxy. Only call this once to initialize a channel
117   // proxy that was not initialized in its constructor. If |create_pipe_now| is
118   // true, the pipe is created synchronously. Otherwise it's created on the IO
119   // thread.
120   void Init(const IPC::ChannelHandle& channel_handle,
121             Channel::Mode mode,
122             bool create_pipe_now);
123   void Init(std::unique_ptr<ChannelFactory> factory,
124             bool create_pipe_now);
125 
126   // Pause the channel. Subsequent calls to Send() will be internally queued
127   // until Unpause() is called. Queued messages will not be sent until the
128   // channel is flushed.
129   void Pause();
130 
131   // Unpause the channel. If |flush| is true the channel will be flushed as soon
132   // as it's unpaused (see Flush() below.) Otherwise you must explicitly call
133   // Flush() to flush messages which were queued while the channel was paused.
134   void Unpause(bool flush);
135 
136   // Flush the channel. This sends any messages which were queued before calling
137   // Connect. Only useful if Unpause(false) was called previously.
138   void Flush();
139 
140   // Close the IPC::Channel.  This operation completes asynchronously, once the
141   // background thread processes the command to close the channel.  It is ok to
142   // call this method multiple times.  Redundant calls are ignored.
143   //
144   // WARNING: MessageFilter objects held by the ChannelProxy is also
145   // released asynchronously, and it may in fact have its final reference
146   // released on the background thread.  The caller should be careful to deal
147   // with / allow for this possibility.
148   void Close();
149 
150   // Send a message asynchronously.  The message is routed to the background
151   // thread where it is passed to the IPC::Channel's Send method.
152   bool Send(Message* message) override;
153 
154   // Used to intercept messages as they are received on the background thread.
155   //
156   // Ordinarily, messages sent to the ChannelProxy are routed to the matching
157   // listener on the worker thread.  This API allows code to intercept messages
158   // before they are sent to the worker thread.
159   // If you call this before the target process is launched, then you're
160   // guaranteed to not miss any messages.  But if you call this anytime after,
161   // then some messages might be missed since the filter is added internally on
162   // the IO thread.
163   void AddFilter(MessageFilter* filter);
164   void RemoveFilter(MessageFilter* filter);
165 
166   using GenericAssociatedInterfaceFactory =
167       base::Callback<void(mojo::ScopedInterfaceEndpointHandle)>;
168 
169   // Adds a generic associated interface factory to bind incoming interface
170   // requests directly on the IO thread. MUST be called either before Init() or
171   // before the remote end of the Channel is able to send messages (e.g. before
172   // its process is launched.)
173   void AddGenericAssociatedInterfaceForIOThread(
174       const std::string& name,
175       const GenericAssociatedInterfaceFactory& factory);
176 
177   template <typename Interface>
178   using AssociatedInterfaceFactory =
179       base::Callback<void(mojo::AssociatedInterfaceRequest<Interface>)>;
180 
181   // Helper to bind an IO-thread associated interface factory, inferring the
182   // interface name from the callback argument's type. MUST be called before
183   // Init().
184   template <typename Interface>
185   void AddAssociatedInterfaceForIOThread(
186       const AssociatedInterfaceFactory<Interface>& factory) {
187     AddGenericAssociatedInterfaceForIOThread(
188         Interface::Name_,
189         base::Bind(&ChannelProxy::BindAssociatedInterfaceRequest<Interface>,
190                    factory));
191   }
192 
193   // Requests an associated interface from the remote endpoint.
194   void GetGenericRemoteAssociatedInterface(
195       const std::string& name,
196       mojo::ScopedInterfaceEndpointHandle handle);
197 
198   // Template helper to request associated interfaces from the remote endpoint.
199   template <typename Interface>
200   void GetRemoteAssociatedInterface(
201       mojo::AssociatedInterfacePtr<Interface>* proxy) {
202     auto request = mojo::MakeRequest(proxy);
203     GetGenericRemoteAssociatedInterface(Interface::Name_, request.PassHandle());
204   }
205 
206 #if defined(ENABLE_IPC_FUZZER)
207   void set_outgoing_message_filter(OutgoingMessageFilter* filter) {
208     outgoing_message_filter_ = filter;
209   }
210 #endif
211 
212   // Creates a ThreadSafeAssociatedInterfacePtr for |Interface|. This object
213   // may be used to send messages on the interface from any thread and those
214   // messages will remain ordered with respect to other messages sent on the
215   // same thread over other ThreadSafeAssociatedInterfacePtrs associated with
216   // the same Channel.
217   template <typename Interface>
218   void GetThreadSafeRemoteAssociatedInterface(
219       scoped_refptr<mojo::ThreadSafeAssociatedInterfacePtr<Interface>>*
220           out_ptr) {
221     mojo::AssociatedInterfacePtrInfo<Interface> ptr_info;
222     auto request = mojo::MakeRequest(&ptr_info);
223     GetGenericRemoteAssociatedInterface(Interface::Name_, request.PassHandle());
224     *out_ptr = mojo::ThreadSafeAssociatedInterfacePtr<Interface>::Create(
225         std::move(ptr_info), ipc_task_runner());
226   }
227 
228   base::SingleThreadTaskRunner* ipc_task_runner() const {
229     return context_->ipc_task_runner();
230   }
231 
232   const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner_refptr()
233       const {
234     return context_->ipc_task_runner_refptr();
235   }
236 
237   // Called to clear the pointer to the IPC task runner when it's going away.
238   void ClearIPCTaskRunner();
239 
240  protected:
241   class Context;
242   // A subclass uses this constructor if it needs to add more information
243   // to the internal state.
244   explicit ChannelProxy(Context* context);
245 
246   // Used internally to hold state that is referenced on the IPC thread.
247   class Context : public base::RefCountedThreadSafe<Context>,
248                   public Listener {
249    public:
250     Context(Listener* listener,
251             const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
252             const scoped_refptr<base::SingleThreadTaskRunner>&
253                 listener_task_runner);
254     void ClearIPCTaskRunner();
255     base::SingleThreadTaskRunner* ipc_task_runner() const {
256       return ipc_task_runner_.get();
257     }
258     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner_refptr()
259         const {
260       return ipc_task_runner_;
261     }
262 
263     scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner() {
264       return listener_task_runner_;
265     }
266 
267     // Dispatches a message on the listener thread.
268     void OnDispatchMessage(const Message& message);
269 
270     // Sends |message| from appropriate thread.
271     void Send(Message* message);
272 
273    protected:
274     friend class base::RefCountedThreadSafe<Context>;
275     ~Context() override;
276 
277     // IPC::Listener methods:
278     bool OnMessageReceived(const Message& message) override;
279     void OnChannelConnected(int32_t peer_pid) override;
280     void OnChannelError() override;
281     void OnAssociatedInterfaceRequest(
282         const std::string& interface_name,
283         mojo::ScopedInterfaceEndpointHandle handle) override;
284 
285     // Like OnMessageReceived but doesn't try the filters.
286     bool OnMessageReceivedNoFilter(const Message& message);
287 
288     // Gives the filters a chance at processing |message|.
289     // Returns true if the message was processed, false otherwise.
290     bool TryFilters(const Message& message);
291 
292     void PauseChannel();
293     void UnpauseChannel(bool flush);
294     void FlushChannel();
295 
296     // Like Open and Close, but called on the IPC thread.
297     virtual void OnChannelOpened();
298     virtual void OnChannelClosed();
299 
300     // Called on the consumers thread when the ChannelProxy is closed.  At that
301     // point the consumer is telling us that they don't want to receive any
302     // more messages, so we honor that wish by forgetting them!
303     virtual void Clear();
304 
305    private:
306     friend class ChannelProxy;
307     friend class IpcSecurityTestUtil;
308 
309     // Create the Channel
310     void CreateChannel(std::unique_ptr<ChannelFactory> factory);
311 
312     // Methods called on the IO thread.
313     void OnSendMessage(std::unique_ptr<Message> message_ptr);
314     void OnAddFilter();
315     void OnRemoveFilter(MessageFilter* filter);
316 
317     // Methods called on the listener thread.
318     void AddFilter(MessageFilter* filter);
319     void OnDispatchConnected();
320     void OnDispatchError();
321     void OnDispatchBadMessage(const Message& message);
322     void OnDispatchAssociatedInterfaceRequest(
323         const std::string& interface_name,
324         mojo::ScopedInterfaceEndpointHandle handle);
325 
326     void ClearChannel();
327 
328     mojom::Channel& thread_safe_channel() {
329       return thread_safe_channel_->proxy();
330     }
331 
332     void AddGenericAssociatedInterfaceForIOThread(
333         const std::string& name,
334         const GenericAssociatedInterfaceFactory& factory);
335 
336     scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
337     Listener* listener_;
338 
339     // List of filters.  This is only accessed on the IPC thread.
340     std::vector<scoped_refptr<MessageFilter> > filters_;
341     scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
342 
343     // Note, channel_ may be set on the Listener thread or the IPC thread.
344     // But once it has been set, it must only be read or cleared on the IPC
345     // thread.
346     // One exception is the thread-safe send. See the class comment.
347     std::unique_ptr<Channel> channel_;
348     bool channel_connected_called_;
349 
350     // Lock for |channel_| value. This is only relevant in the context of
351     // thread-safe send.
352     base::Lock channel_lifetime_lock_;
353 
354     // Routes a given message to a proper subset of |filters_|, depending
355     // on which message classes a filter might support.
356     std::unique_ptr<MessageFilterRouter> message_filter_router_;
357 
358     // Holds filters between the AddFilter call on the listerner thread and the
359     // IPC thread when they're added to filters_.
360     std::vector<scoped_refptr<MessageFilter> > pending_filters_;
361     // Lock for pending_filters_.
362     base::Lock pending_filters_lock_;
363 
364     // Cached copy of the peer process ID. Set on IPC but read on both IPC and
365     // listener threads.
366     base::ProcessId peer_pid_;
367     base::Lock peer_pid_lock_;
368 
369     // A thread-safe mojom::Channel interface we use to make remote interface
370     // requests from the proxy thread.
371     std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
372         thread_safe_channel_;
373 
374     // Holds associated interface binders added by
375     // AddGenericAssociatedInterfaceForIOThread until the underlying channel has
376     // been initialized.
377     base::Lock pending_io_thread_interfaces_lock_;
378     std::vector<std::pair<std::string, GenericAssociatedInterfaceFactory>>
379         pending_io_thread_interfaces_;
380   };
381 
382   Context* context() { return context_.get(); }
383 
384 #if defined(ENABLE_IPC_FUZZER)
385   OutgoingMessageFilter* outgoing_message_filter() const {
386     return outgoing_message_filter_;
387   }
388 #endif
389 
390   bool did_init() const { return did_init_; }
391 
392   // A Send() which doesn't DCHECK if the message is synchronous.
393   void SendInternal(Message* message);
394 
395  private:
396   friend class IpcSecurityTestUtil;
397 
398   template <typename Interface>
399   static void BindAssociatedInterfaceRequest(
400       const AssociatedInterfaceFactory<Interface>& factory,
401       mojo::ScopedInterfaceEndpointHandle handle) {
402     factory.Run(mojo::AssociatedInterfaceRequest<Interface>(std::move(handle)));
403   }
404 
405   // Always called once immediately after Init.
406   virtual void OnChannelInit();
407 
408   // By maintaining this indirection (ref-counted) to our internal state, we
409   // can safely be destroyed while the background thread continues to do stuff
410   // that involves this data.
411   scoped_refptr<Context> context_;
412 
413   // Whether the channel has been initialized.
414   bool did_init_;
415 
416 #if defined(ENABLE_IPC_FUZZER)
417   OutgoingMessageFilter* outgoing_message_filter_;
418 #endif
419 
420   SEQUENCE_CHECKER(sequence_checker_);
421 };
422 
423 }  // namespace IPC
424 
425 #endif  // IPC_IPC_CHANNEL_PROXY_H_
426