• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // TODO(vtl): Enable this on non-POSIX once we have a non-POSIX implementation.
6 #include "build/build_config.h"
7 #if defined(OS_POSIX)
8 
9 #include <stdint.h>
10 
11 #include <string>
12 
13 #include "base/basictypes.h"
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/threading/thread.h"
20 #include "mojo/common/test/multiprocess_test_base.h"
21 #include "mojo/system/channel.h"
22 #include "mojo/system/local_message_pipe_endpoint.h"
23 #include "mojo/system/message_pipe.h"
24 #include "mojo/system/platform_channel.h"
25 #include "mojo/system/proxy_message_pipe_endpoint.h"
26 #include "mojo/system/test_utils.h"
27 #include "mojo/system/waiter.h"
28 
29 namespace mojo {
30 namespace system {
31 namespace {
32 
33 class IOThreadWrapper {
34  public:
IOThreadWrapper()35   IOThreadWrapper() : io_thread_("io_thread") {}
~IOThreadWrapper()36   ~IOThreadWrapper() {
37     CHECK(!channel_.get());
38     CHECK(!io_thread_.IsRunning());
39   }
40 
PostTask(const tracked_objects::Location & from_here,const base::Closure & task)41   void PostTask(const tracked_objects::Location& from_here,
42                 const base::Closure& task) {
43     task_runner()->PostTask(from_here, task);
44   }
45 
PostTaskAndWait(const tracked_objects::Location & from_here,const base::Closure & task)46   void PostTaskAndWait(const tracked_objects::Location& from_here,
47                        const base::Closure& task) {
48     test::PostTaskAndWait(task_runner(), from_here, task);
49   }
50 
Init(PlatformChannel * platform_channel,scoped_refptr<MessagePipe> mp)51   void Init(PlatformChannel* platform_channel, scoped_refptr<MessagePipe> mp) {
52     io_thread_.StartWithOptions(
53         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
54     PostTask(FROM_HERE,
55              base::Bind(&IOThreadWrapper::InitOnIOThread,
56                         base::Unretained(this),
57                         platform_channel, mp));
58   }
59 
Shutdown()60   void Shutdown() {
61     PostTaskAndWait(FROM_HERE,
62                     base::Bind(&IOThreadWrapper::ShutdownOnIOThread,
63                                base::Unretained(this)));
64     io_thread_.Stop();
65   }
66 
is_initialized() const67   bool is_initialized() const { return !!channel_.get(); }
68 
message_loop()69   base::MessageLoop* message_loop() {
70     return io_thread_.message_loop();
71   }
72 
task_runner()73   scoped_refptr<base::TaskRunner> task_runner() {
74     return message_loop()->message_loop_proxy();
75   }
76 
77  private:
InitOnIOThread(PlatformChannel * platform_channel,scoped_refptr<MessagePipe> mp)78   void InitOnIOThread(PlatformChannel* platform_channel,
79                       scoped_refptr<MessagePipe> mp) {
80     CHECK_EQ(base::MessageLoop::current(), message_loop());
81     CHECK(platform_channel);
82     CHECK(platform_channel->is_valid());
83 
84     // Create and initialize |Channel|.
85     channel_ = new Channel();
86     CHECK(channel_->Init(platform_channel->PassHandle()));
87 
88     // Attach the message pipe endpoint.
89     // Note: On the "server" (parent process) side, we need not attach the
90     // message pipe endpoint immediately. However, on the "client" (child
91     // process) side, this *must* be done here -- otherwise, the |Channel| may
92     // receive/process messages (which it can do as soon as it's hooked up to
93     // the IO thread message loop, and that message loop runs) before the
94     // message pipe endpoint is attached.
95     CHECK_EQ(channel_->AttachMessagePipeEndpoint(mp, 1),
96              Channel::kBootstrapEndpointId);
97     channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
98                                      Channel::kBootstrapEndpointId);
99   }
100 
ShutdownOnIOThread()101   void ShutdownOnIOThread() {
102     CHECK(channel_.get());
103     channel_->Shutdown();
104     channel_ = NULL;
105   }
106 
107   base::Thread io_thread_;
108   scoped_refptr<Channel> channel_;
109 
110   DISALLOW_COPY_AND_ASSIGN(IOThreadWrapper);
111 };
112 
113 class MultiprocessMessagePipeTest : public mojo::test::MultiprocessTestBase {
114  public:
MultiprocessMessagePipeTest()115   MultiprocessMessagePipeTest() {}
~MultiprocessMessagePipeTest()116   virtual ~MultiprocessMessagePipeTest() {}
117 
TearDown()118   virtual void TearDown() OVERRIDE {
119     if (io_thread_wrapper_.is_initialized())
120       io_thread_wrapper_.Shutdown();
121     mojo::test::MultiprocessTestBase::TearDown();
122   }
123 
Init(scoped_refptr<MessagePipe> mp)124   void Init(scoped_refptr<MessagePipe> mp) {
125     io_thread_wrapper_.Init(platform_server_channel.get(), mp);
126   }
127 
128  private:
129   IOThreadWrapper io_thread_wrapper_;
130 
131   DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest);
132 };
133 
WaitIfNecessary(scoped_refptr<MessagePipe> mp,MojoWaitFlags flags)134 MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, MojoWaitFlags flags) {
135   Waiter waiter;
136   waiter.Init();
137 
138   MojoResult add_result = mp->AddWaiter(0, &waiter, flags, MOJO_RESULT_OK);
139   if (add_result != MOJO_RESULT_OK) {
140     return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK :
141                                                         add_result;
142   }
143 
144   MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE);
145   mp->RemoveWaiter(0, &waiter);
146   return wait_result;
147 }
148 
149 // For each message received, sends a reply message with the same contents
150 // repeated twice, until the other end is closed or it receives "quitquitquit"
151 // (which it doesn't reply to). It'll return the number of messages received,
152 // not including any "quitquitquit" message, modulo 100.
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho)153 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
154   IOThreadWrapper io_thread_wrapper;
155   PlatformClientChannel* const platform_client_channel =
156       MultiprocessMessagePipeTest::platform_client_channel.get();
157   CHECK(platform_client_channel);
158   CHECK(platform_client_channel->is_valid());
159   scoped_refptr<MessagePipe> mp(new MessagePipe(
160       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
161       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
162   io_thread_wrapper.Init(platform_client_channel, mp);
163 
164   const std::string quitquitquit("quitquitquit");
165   int rv = 0;
166   for (;; rv = (rv + 1) % 100) {
167     // Wait for our end of the message pipe to be readable.
168     MojoResult result = WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE);
169     if (result != MOJO_RESULT_OK) {
170       // It was closed, probably.
171       CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION);
172       break;
173     }
174 
175     std::string read_buffer(1000, '\0');
176     uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
177     CHECK_EQ(mp->ReadMessage(0,
178                              &read_buffer[0], &read_buffer_size,
179                              NULL, NULL,
180                              MOJO_READ_MESSAGE_FLAG_NONE),
181              MOJO_RESULT_OK);
182     read_buffer.resize(read_buffer_size);
183     VLOG(2) << "Child got: " << read_buffer;
184 
185     if (read_buffer == quitquitquit) {
186       VLOG(2) << "Child quitting.";
187       break;
188     }
189 
190     std::string write_buffer = read_buffer + read_buffer;
191     CHECK_EQ(mp->WriteMessage(0,
192                               write_buffer.data(),
193                               static_cast<uint32_t>(write_buffer.size()),
194                               NULL,
195                               MOJO_WRITE_MESSAGE_FLAG_NONE),
196              MOJO_RESULT_OK);
197   }
198 
199 
200   mp->Close(0);
201   io_thread_wrapper.Shutdown();
202   return rv;
203 }
204 
205 // Sends "hello" to child, and expects "hellohello" back.
TEST_F(MultiprocessMessagePipeTest,Basic)206 TEST_F(MultiprocessMessagePipeTest, Basic) {
207   StartChild("EchoEcho");
208 
209   scoped_refptr<MessagePipe> mp(new MessagePipe(
210       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
211       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
212   Init(mp);
213 
214   std::string hello("hello");
215   EXPECT_EQ(MOJO_RESULT_OK,
216             mp->WriteMessage(0,
217                              hello.data(), static_cast<uint32_t>(hello.size()),
218                              NULL,
219                              MOJO_WRITE_MESSAGE_FLAG_NONE));
220 
221   EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
222 
223   std::string read_buffer(1000, '\0');
224   uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
225   CHECK_EQ(mp->ReadMessage(0,
226                            &read_buffer[0], &read_buffer_size,
227                            NULL, NULL,
228                            MOJO_READ_MESSAGE_FLAG_NONE),
229            MOJO_RESULT_OK);
230   read_buffer.resize(read_buffer_size);
231   VLOG(2) << "Parent got: " << read_buffer;
232   EXPECT_EQ(hello + hello, read_buffer);
233 
234   mp->Close(0);
235 
236   // We sent one message.
237   EXPECT_EQ(1 % 100, WaitForChildShutdown());
238 }
239 
240 // Sends a bunch of messages to the child. Expects them "repeated" back. Waits
241 // for the child to close its end before quitting.
TEST_F(MultiprocessMessagePipeTest,QueueMessages)242 TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
243   StartChild("EchoEcho");
244 
245   scoped_refptr<MessagePipe> mp(new MessagePipe(
246       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
247       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
248   Init(mp);
249 
250   static const size_t kNumMessages = 1001;
251   for (size_t i = 0; i < kNumMessages; i++) {
252     std::string write_buffer(i, 'A' + (i % 26));
253     EXPECT_EQ(MOJO_RESULT_OK,
254               mp->WriteMessage(0,
255                                write_buffer.data(),
256                                static_cast<uint32_t>(write_buffer.size()),
257                                NULL,
258                                MOJO_WRITE_MESSAGE_FLAG_NONE));
259   }
260 
261   const std::string quitquitquit("quitquitquit");
262   EXPECT_EQ(MOJO_RESULT_OK,
263             mp->WriteMessage(0,
264                              quitquitquit.data(),
265                              static_cast<uint32_t>(quitquitquit.size()),
266                              NULL,
267                              MOJO_WRITE_MESSAGE_FLAG_NONE));
268 
269   for (size_t i = 0; i < kNumMessages; i++) {
270     EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
271 
272     std::string read_buffer(kNumMessages * 2, '\0');
273     uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
274     CHECK_EQ(mp->ReadMessage(0,
275                              &read_buffer[0], &read_buffer_size,
276                              NULL, NULL,
277                              MOJO_READ_MESSAGE_FLAG_NONE),
278              MOJO_RESULT_OK);
279     read_buffer.resize(read_buffer_size);
280 
281     EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer);
282   }
283 
284   // Wait for it to become readable, which should fail (since we sent
285   // "quitquitquit").
286   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
287             WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
288 
289   mp->Close(0);
290 
291   EXPECT_EQ(static_cast<int>(kNumMessages % 100), WaitForChildShutdown());
292 }
293 
294 }  // namespace
295 }  // namespace system
296 }  // namespace mojo
297 
298 #endif  // defined(OS_POSIX)
299