1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/system/wait_set.h"
6
7 #include <set>
8 #include <vector>
9
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/platform_thread.h"
15 #include "base/threading/simple_thread.h"
16 #include "mojo/public/cpp/system/message_pipe.h"
17 #include "mojo/public/cpp/system/wait.h"
18 #include "testing/gtest/include/gtest/gtest.h"
19
20 namespace mojo {
21 namespace {
22
23 using WaitSetTest = testing::Test;
24
WriteMessage(const ScopedMessagePipeHandle & handle,const base::StringPiece & message)25 void WriteMessage(const ScopedMessagePipeHandle& handle,
26 const base::StringPiece& message) {
27 MojoResult rv = WriteMessageRaw(handle.get(), message.data(),
28 static_cast<uint32_t>(message.size()),
29 nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
30 CHECK_EQ(MOJO_RESULT_OK, rv);
31 }
32
ReadMessage(const ScopedMessagePipeHandle & handle)33 std::string ReadMessage(const ScopedMessagePipeHandle& handle) {
34 std::vector<uint8_t> bytes;
35 MojoResult rv = ReadMessageRaw(handle.get(), &bytes, nullptr,
36 MOJO_READ_MESSAGE_FLAG_NONE);
37 CHECK_EQ(MOJO_RESULT_OK, rv);
38 return std::string(bytes.begin(), bytes.end());
39 }
40
41 class ThreadedRunner : public base::SimpleThread {
42 public:
ThreadedRunner(const base::Closure & callback)43 explicit ThreadedRunner(const base::Closure& callback)
44 : SimpleThread("ThreadedRunner"), callback_(callback) {}
~ThreadedRunner()45 ~ThreadedRunner() override { Join(); }
46
Run()47 void Run() override { callback_.Run(); }
48
49 private:
50 const base::Closure callback_;
51
52 DISALLOW_COPY_AND_ASSIGN(ThreadedRunner);
53 };
54
TEST_F(WaitSetTest,Satisfied)55 TEST_F(WaitSetTest, Satisfied) {
56 WaitSet wait_set;
57 MessagePipe p;
58
59 const char kTestMessage1[] = "hello wake up";
60
61 // Watch only one handle and write to the other.
62
63 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
64 WriteMessage(p.handle0, kTestMessage1);
65
66 size_t num_ready_handles = 2;
67 Handle ready_handles[2];
68 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
69 HandleSignalsState hss[2];
70 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
71
72 EXPECT_EQ(1u, num_ready_handles);
73 EXPECT_EQ(p.handle1.get(), ready_handles[0]);
74 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
75 EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
76
77 wait_set.RemoveHandle(p.handle1.get());
78
79 // Now watch only the other handle and write to the first one.
80
81 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
82 WriteMessage(p.handle1, kTestMessage1);
83
84 num_ready_handles = 2;
85 ready_results[0] = MOJO_RESULT_UNKNOWN;
86 ready_results[1] = MOJO_RESULT_UNKNOWN;
87 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
88
89 EXPECT_EQ(1u, num_ready_handles);
90 EXPECT_EQ(p.handle0.get(), ready_handles[0]);
91 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
92 EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
93
94 // Now wait on both of them.
95 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
96
97 num_ready_handles = 2;
98 ready_results[0] = MOJO_RESULT_UNKNOWN;
99 ready_results[1] = MOJO_RESULT_UNKNOWN;
100 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
101 EXPECT_EQ(2u, num_ready_handles);
102 EXPECT_TRUE((ready_handles[0] == p.handle0.get() &&
103 ready_handles[1] == p.handle1.get()) ||
104 (ready_handles[0] == p.handle1.get() &&
105 ready_handles[1] == p.handle0.get()));
106 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
107 EXPECT_EQ(MOJO_RESULT_OK, ready_results[1]);
108 EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
109 EXPECT_TRUE(hss[1].readable() && hss[1].writable() && !hss[1].peer_closed());
110
111 // Wait on both again, but with only enough output space for one result.
112 num_ready_handles = 1;
113 ready_results[0] = MOJO_RESULT_UNKNOWN;
114 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
115 EXPECT_EQ(1u, num_ready_handles);
116 EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
117 ready_handles[0] == p.handle1.get());
118 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
119
120 // Remove the ready handle from the set and wait one more time.
121 EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));
122
123 num_ready_handles = 1;
124 ready_results[0] = MOJO_RESULT_UNKNOWN;
125 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
126 EXPECT_EQ(1u, num_ready_handles);
127 EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
128 ready_handles[0] == p.handle1.get());
129 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
130
131 EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));
132
133 // The wait set should be empty now. Nothing to wait on.
134 num_ready_handles = 2;
135 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
136 EXPECT_EQ(0u, num_ready_handles);
137 }
138
TEST_F(WaitSetTest,Unsatisfiable)139 TEST_F(WaitSetTest, Unsatisfiable) {
140 MessagePipe p, q;
141 WaitSet wait_set;
142
143 wait_set.AddHandle(q.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
144 wait_set.AddHandle(q.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
145 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
146
147 size_t num_ready_handles = 2;
148 Handle ready_handles[2];
149 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
150
151 p.handle1.reset();
152 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
153 EXPECT_EQ(1u, num_ready_handles);
154 EXPECT_EQ(p.handle0.get(), ready_handles[0]);
155 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ready_results[0]);
156 }
157
TEST_F(WaitSetTest,CloseWhileWaiting)158 TEST_F(WaitSetTest, CloseWhileWaiting) {
159 MessagePipe p;
160 WaitSet wait_set;
161
162 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
163
164 const Handle handle0_value = p.handle0.get();
165 ThreadedRunner close_after_delay(base::Bind(
166 [](ScopedMessagePipeHandle* handle) {
167 // Wait a little while, then close the handle.
168 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
169 handle->reset();
170 },
171 &p.handle0));
172 close_after_delay.Start();
173
174 size_t num_ready_handles = 2;
175 Handle ready_handles[2];
176 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
177 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
178 EXPECT_EQ(1u, num_ready_handles);
179 EXPECT_EQ(handle0_value, ready_handles[0]);
180 EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_results[0]);
181
182 EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
183 }
184
TEST_F(WaitSetTest,CloseBeforeWaiting)185 TEST_F(WaitSetTest, CloseBeforeWaiting) {
186 MessagePipe p;
187 WaitSet wait_set;
188
189 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
190 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
191
192 Handle handle0_value = p.handle0.get();
193 Handle handle1_value = p.handle1.get();
194
195 p.handle0.reset();
196 p.handle1.reset();
197
198 // Ensure that the WaitSet user is always made aware of all cancellations even
199 // if they happen while not waiting, or they have to be returned over the span
200 // of multiple Wait() calls due to insufficient output storage.
201
202 size_t num_ready_handles = 1;
203 Handle ready_handle;
204 MojoResult ready_result = MOJO_RESULT_UNKNOWN;
205 wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
206 EXPECT_EQ(1u, num_ready_handles);
207 EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
208 EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
209 EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
210
211 wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
212 EXPECT_EQ(1u, num_ready_handles);
213 EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
214 EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
215 EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
216
217 // Nothing more to wait on.
218 wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
219 EXPECT_EQ(0u, num_ready_handles);
220 }
221
TEST_F(WaitSetTest,SatisfiedThenUnsatisfied)222 TEST_F(WaitSetTest, SatisfiedThenUnsatisfied) {
223 MessagePipe p;
224 WaitSet wait_set;
225
226 wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
227 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
228
229 const char kTestMessage1[] = "testing testing testing";
230 WriteMessage(p.handle0, kTestMessage1);
231
232 size_t num_ready_handles = 2;
233 Handle ready_handles[2];
234 MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
235 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
236 EXPECT_EQ(1u, num_ready_handles);
237 EXPECT_EQ(p.handle1.get(), ready_handles[0]);
238 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
239
240 EXPECT_EQ(kTestMessage1, ReadMessage(p.handle1));
241
242 ThreadedRunner write_after_delay(base::Bind(
243 [](ScopedMessagePipeHandle* handle) {
244 // Wait a little while, then write a message.
245 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
246 WriteMessage(*handle, "wakey wakey");
247 },
248 &p.handle1));
249 write_after_delay.Start();
250
251 num_ready_handles = 2;
252 wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
253 EXPECT_EQ(1u, num_ready_handles);
254 EXPECT_EQ(p.handle0.get(), ready_handles[0]);
255 EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
256 }
257
TEST_F(WaitSetTest,EventOnly)258 TEST_F(WaitSetTest, EventOnly) {
259 base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
260 base::WaitableEvent::InitialState::SIGNALED);
261 WaitSet wait_set;
262 wait_set.AddEvent(&event);
263
264 base::WaitableEvent* ready_event = nullptr;
265 size_t num_ready_handles = 1;
266 Handle ready_handle;
267 MojoResult ready_result = MOJO_RESULT_UNKNOWN;
268 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
269 EXPECT_EQ(0u, num_ready_handles);
270 EXPECT_EQ(&event, ready_event);
271 }
272
TEST_F(WaitSetTest,EventAndHandle)273 TEST_F(WaitSetTest, EventAndHandle) {
274 const char kTestMessage[] = "hello hello";
275
276 MessagePipe p;
277 WriteMessage(p.handle0, kTestMessage);
278
279 base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
280 base::WaitableEvent::InitialState::NOT_SIGNALED);
281
282 WaitSet wait_set;
283 wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
284 wait_set.AddEvent(&event);
285
286 base::WaitableEvent* ready_event = nullptr;
287 size_t num_ready_handles = 1;
288 Handle ready_handle;
289 MojoResult ready_result = MOJO_RESULT_UNKNOWN;
290 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
291 EXPECT_EQ(1u, num_ready_handles);
292 EXPECT_EQ(nullptr, ready_event);
293 EXPECT_EQ(p.handle1.get(), ready_handle);
294 EXPECT_EQ(MOJO_RESULT_OK, ready_result);
295
296 EXPECT_EQ(kTestMessage, ReadMessage(p.handle1));
297
298 ThreadedRunner signal_after_delay(base::Bind(
299 [](base::WaitableEvent* event) {
300 // Wait a little while, then close the handle.
301 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
302 event->Signal();
303 },
304 &event));
305 signal_after_delay.Start();
306
307 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
308 EXPECT_EQ(0u, num_ready_handles);
309 EXPECT_EQ(&event, ready_event);
310 }
311
TEST_F(WaitSetTest,NoStarvation)312 TEST_F(WaitSetTest, NoStarvation) {
313 const char kTestMessage[] = "wait for it";
314 const size_t kNumTestPipes = 50;
315 const size_t kNumTestEvents = 10;
316
317 // Create a bunch of handles and events which are always ready and add them
318 // to a shared WaitSet.
319
320 WaitSet wait_set;
321
322 MessagePipe pipes[kNumTestPipes];
323 for (size_t i = 0; i < kNumTestPipes; ++i) {
324 WriteMessage(pipes[i].handle0, kTestMessage);
325 Wait(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
326
327 WriteMessage(pipes[i].handle1, kTestMessage);
328 Wait(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
329
330 wait_set.AddHandle(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
331 wait_set.AddHandle(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
332 }
333
334 std::vector<std::unique_ptr<base::WaitableEvent>> events(kNumTestEvents);
335 for (auto& event_ptr : events) {
336 event_ptr = std::make_unique<base::WaitableEvent>(
337 base::WaitableEvent::ResetPolicy::MANUAL,
338 base::WaitableEvent::InitialState::NOT_SIGNALED);
339 event_ptr->Signal();
340 wait_set.AddEvent(event_ptr.get());
341 }
342
343 // Now verify that all handle and event signals are deteceted within a finite
344 // number of consecutive Wait() calls. Do it a few times for good measure.
345 for (size_t i = 0; i < 3; ++i) {
346 std::set<base::WaitableEvent*> ready_events;
347 std::set<Handle> ready_handles;
348 while (ready_events.size() < kNumTestEvents ||
349 ready_handles.size() < kNumTestPipes * 2) {
350 base::WaitableEvent* ready_event = nullptr;
351 size_t num_ready_handles = 1;
352 Handle ready_handle;
353 MojoResult ready_result = MOJO_RESULT_UNKNOWN;
354 wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle,
355 &ready_result);
356 if (ready_event)
357 ready_events.insert(ready_event);
358
359 if (num_ready_handles) {
360 EXPECT_EQ(1u, num_ready_handles);
361 EXPECT_EQ(MOJO_RESULT_OK, ready_result);
362 ready_handles.insert(ready_handle);
363 }
364 }
365 }
366 }
367
368 } // namespace
369 } // namespace mojo
370