1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // TODO(vtl): Enable this on non-POSIX once we have a non-POSIX implementation.
6 #include "build/build_config.h"
7 #if defined(OS_POSIX)
8
9 #include <stdint.h>
10
11 #include <string>
12
13 #include "base/basictypes.h"
14 #include "base/bind.h"
15 #include "base/callback.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/threading/thread.h"
20 #include "mojo/common/test/multiprocess_test_base.h"
21 #include "mojo/system/channel.h"
22 #include "mojo/system/local_message_pipe_endpoint.h"
23 #include "mojo/system/message_pipe.h"
24 #include "mojo/system/platform_channel.h"
25 #include "mojo/system/proxy_message_pipe_endpoint.h"
26 #include "mojo/system/test_utils.h"
27 #include "mojo/system/waiter.h"
28
29 namespace mojo {
30 namespace system {
31 namespace {
32
33 class IOThreadWrapper {
34 public:
IOThreadWrapper()35 IOThreadWrapper() : io_thread_("io_thread") {}
~IOThreadWrapper()36 ~IOThreadWrapper() {
37 CHECK(!channel_.get());
38 CHECK(!io_thread_.IsRunning());
39 }
40
PostTask(const tracked_objects::Location & from_here,const base::Closure & task)41 void PostTask(const tracked_objects::Location& from_here,
42 const base::Closure& task) {
43 task_runner()->PostTask(from_here, task);
44 }
45
PostTaskAndWait(const tracked_objects::Location & from_here,const base::Closure & task)46 void PostTaskAndWait(const tracked_objects::Location& from_here,
47 const base::Closure& task) {
48 test::PostTaskAndWait(task_runner(), from_here, task);
49 }
50
Init(PlatformChannel * platform_channel,scoped_refptr<MessagePipe> mp)51 void Init(PlatformChannel* platform_channel, scoped_refptr<MessagePipe> mp) {
52 io_thread_.StartWithOptions(
53 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
54 PostTask(FROM_HERE,
55 base::Bind(&IOThreadWrapper::InitOnIOThread,
56 base::Unretained(this),
57 platform_channel, mp));
58 }
59
Shutdown()60 void Shutdown() {
61 PostTaskAndWait(FROM_HERE,
62 base::Bind(&IOThreadWrapper::ShutdownOnIOThread,
63 base::Unretained(this)));
64 io_thread_.Stop();
65 }
66
is_initialized() const67 bool is_initialized() const { return !!channel_.get(); }
68
message_loop()69 base::MessageLoop* message_loop() {
70 return io_thread_.message_loop();
71 }
72
task_runner()73 scoped_refptr<base::TaskRunner> task_runner() {
74 return message_loop()->message_loop_proxy();
75 }
76
77 private:
InitOnIOThread(PlatformChannel * platform_channel,scoped_refptr<MessagePipe> mp)78 void InitOnIOThread(PlatformChannel* platform_channel,
79 scoped_refptr<MessagePipe> mp) {
80 CHECK_EQ(base::MessageLoop::current(), message_loop());
81 CHECK(platform_channel);
82 CHECK(platform_channel->is_valid());
83
84 // Create and initialize |Channel|.
85 channel_ = new Channel();
86 CHECK(channel_->Init(platform_channel->PassHandle()));
87
88 // Attach the message pipe endpoint.
89 // Note: On the "server" (parent process) side, we need not attach the
90 // message pipe endpoint immediately. However, on the "client" (child
91 // process) side, this *must* be done here -- otherwise, the |Channel| may
92 // receive/process messages (which it can do as soon as it's hooked up to
93 // the IO thread message loop, and that message loop runs) before the
94 // message pipe endpoint is attached.
95 CHECK_EQ(channel_->AttachMessagePipeEndpoint(mp, 1),
96 Channel::kBootstrapEndpointId);
97 channel_->RunMessagePipeEndpoint(Channel::kBootstrapEndpointId,
98 Channel::kBootstrapEndpointId);
99 }
100
ShutdownOnIOThread()101 void ShutdownOnIOThread() {
102 CHECK(channel_.get());
103 channel_->Shutdown();
104 channel_ = NULL;
105 }
106
107 base::Thread io_thread_;
108 scoped_refptr<Channel> channel_;
109
110 DISALLOW_COPY_AND_ASSIGN(IOThreadWrapper);
111 };
112
113 class MultiprocessMessagePipeTest : public mojo::test::MultiprocessTestBase {
114 public:
MultiprocessMessagePipeTest()115 MultiprocessMessagePipeTest() {}
~MultiprocessMessagePipeTest()116 virtual ~MultiprocessMessagePipeTest() {}
117
TearDown()118 virtual void TearDown() OVERRIDE {
119 if (io_thread_wrapper_.is_initialized())
120 io_thread_wrapper_.Shutdown();
121 mojo::test::MultiprocessTestBase::TearDown();
122 }
123
Init(scoped_refptr<MessagePipe> mp)124 void Init(scoped_refptr<MessagePipe> mp) {
125 io_thread_wrapper_.Init(platform_server_channel.get(), mp);
126 }
127
128 private:
129 IOThreadWrapper io_thread_wrapper_;
130
131 DISALLOW_COPY_AND_ASSIGN(MultiprocessMessagePipeTest);
132 };
133
WaitIfNecessary(scoped_refptr<MessagePipe> mp,MojoWaitFlags flags)134 MojoResult WaitIfNecessary(scoped_refptr<MessagePipe> mp, MojoWaitFlags flags) {
135 Waiter waiter;
136 waiter.Init();
137
138 MojoResult add_result = mp->AddWaiter(0, &waiter, flags, MOJO_RESULT_OK);
139 if (add_result != MOJO_RESULT_OK) {
140 return (add_result == MOJO_RESULT_ALREADY_EXISTS) ? MOJO_RESULT_OK :
141 add_result;
142 }
143
144 MojoResult wait_result = waiter.Wait(MOJO_DEADLINE_INDEFINITE);
145 mp->RemoveWaiter(0, &waiter);
146 return wait_result;
147 }
148
149 // For each message received, sends a reply message with the same contents
150 // repeated twice, until the other end is closed or it receives "quitquitquit"
151 // (which it doesn't reply to). It'll return the number of messages received,
152 // not including any "quitquitquit" message, modulo 100.
MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho)153 MOJO_MULTIPROCESS_TEST_CHILD_MAIN(EchoEcho) {
154 IOThreadWrapper io_thread_wrapper;
155 PlatformClientChannel* const platform_client_channel =
156 MultiprocessMessagePipeTest::platform_client_channel.get();
157 CHECK(platform_client_channel);
158 CHECK(platform_client_channel->is_valid());
159 scoped_refptr<MessagePipe> mp(new MessagePipe(
160 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
161 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
162 io_thread_wrapper.Init(platform_client_channel, mp);
163
164 const std::string quitquitquit("quitquitquit");
165 int rv = 0;
166 for (;; rv = (rv + 1) % 100) {
167 // Wait for our end of the message pipe to be readable.
168 MojoResult result = WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE);
169 if (result != MOJO_RESULT_OK) {
170 // It was closed, probably.
171 CHECK_EQ(result, MOJO_RESULT_FAILED_PRECONDITION);
172 break;
173 }
174
175 std::string read_buffer(1000, '\0');
176 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
177 CHECK_EQ(mp->ReadMessage(0,
178 &read_buffer[0], &read_buffer_size,
179 NULL, NULL,
180 MOJO_READ_MESSAGE_FLAG_NONE),
181 MOJO_RESULT_OK);
182 read_buffer.resize(read_buffer_size);
183 VLOG(2) << "Child got: " << read_buffer;
184
185 if (read_buffer == quitquitquit) {
186 VLOG(2) << "Child quitting.";
187 break;
188 }
189
190 std::string write_buffer = read_buffer + read_buffer;
191 CHECK_EQ(mp->WriteMessage(0,
192 write_buffer.data(),
193 static_cast<uint32_t>(write_buffer.size()),
194 NULL,
195 MOJO_WRITE_MESSAGE_FLAG_NONE),
196 MOJO_RESULT_OK);
197 }
198
199
200 mp->Close(0);
201 io_thread_wrapper.Shutdown();
202 return rv;
203 }
204
205 // Sends "hello" to child, and expects "hellohello" back.
TEST_F(MultiprocessMessagePipeTest,Basic)206 TEST_F(MultiprocessMessagePipeTest, Basic) {
207 StartChild("EchoEcho");
208
209 scoped_refptr<MessagePipe> mp(new MessagePipe(
210 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
211 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
212 Init(mp);
213
214 std::string hello("hello");
215 EXPECT_EQ(MOJO_RESULT_OK,
216 mp->WriteMessage(0,
217 hello.data(), static_cast<uint32_t>(hello.size()),
218 NULL,
219 MOJO_WRITE_MESSAGE_FLAG_NONE));
220
221 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
222
223 std::string read_buffer(1000, '\0');
224 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
225 CHECK_EQ(mp->ReadMessage(0,
226 &read_buffer[0], &read_buffer_size,
227 NULL, NULL,
228 MOJO_READ_MESSAGE_FLAG_NONE),
229 MOJO_RESULT_OK);
230 read_buffer.resize(read_buffer_size);
231 VLOG(2) << "Parent got: " << read_buffer;
232 EXPECT_EQ(hello + hello, read_buffer);
233
234 mp->Close(0);
235
236 // We sent one message.
237 EXPECT_EQ(1 % 100, WaitForChildShutdown());
238 }
239
240 // Sends a bunch of messages to the child. Expects them "repeated" back. Waits
241 // for the child to close its end before quitting.
TEST_F(MultiprocessMessagePipeTest,QueueMessages)242 TEST_F(MultiprocessMessagePipeTest, QueueMessages) {
243 StartChild("EchoEcho");
244
245 scoped_refptr<MessagePipe> mp(new MessagePipe(
246 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
247 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
248 Init(mp);
249
250 static const size_t kNumMessages = 1001;
251 for (size_t i = 0; i < kNumMessages; i++) {
252 std::string write_buffer(i, 'A' + (i % 26));
253 EXPECT_EQ(MOJO_RESULT_OK,
254 mp->WriteMessage(0,
255 write_buffer.data(),
256 static_cast<uint32_t>(write_buffer.size()),
257 NULL,
258 MOJO_WRITE_MESSAGE_FLAG_NONE));
259 }
260
261 const std::string quitquitquit("quitquitquit");
262 EXPECT_EQ(MOJO_RESULT_OK,
263 mp->WriteMessage(0,
264 quitquitquit.data(),
265 static_cast<uint32_t>(quitquitquit.size()),
266 NULL,
267 MOJO_WRITE_MESSAGE_FLAG_NONE));
268
269 for (size_t i = 0; i < kNumMessages; i++) {
270 EXPECT_EQ(MOJO_RESULT_OK, WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
271
272 std::string read_buffer(kNumMessages * 2, '\0');
273 uint32_t read_buffer_size = static_cast<uint32_t>(read_buffer.size());
274 CHECK_EQ(mp->ReadMessage(0,
275 &read_buffer[0], &read_buffer_size,
276 NULL, NULL,
277 MOJO_READ_MESSAGE_FLAG_NONE),
278 MOJO_RESULT_OK);
279 read_buffer.resize(read_buffer_size);
280
281 EXPECT_EQ(std::string(i * 2, 'A' + (i % 26)), read_buffer);
282 }
283
284 // Wait for it to become readable, which should fail (since we sent
285 // "quitquitquit").
286 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
287 WaitIfNecessary(mp, MOJO_WAIT_FLAG_READABLE));
288
289 mp->Close(0);
290
291 EXPECT_EQ(static_cast<int>(kNumMessages % 100), WaitForChildShutdown());
292 }
293
294 } // namespace
295 } // namespace system
296 } // namespace mojo
297
298 #endif // defined(OS_POSIX)
299