1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_kqueue.h"
6
7 #include <mach/mach.h>
8 #include <mach/message.h>
9
10 #include <utility>
11
12 #include "base/functional/bind.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/memory/raw_ptr.h"
15 #include "base/memory/scoped_refptr.h"
16 #include "base/run_loop.h"
17 #include "base/task/single_thread_task_executor.h"
18 #include "base/task/single_thread_task_runner.h"
19 #include "testing/gtest/include/gtest/gtest.h"
20
21 namespace base {
22 namespace {
23
24 class MessagePumpKqueueTest : public testing::Test {
25 public:
MessagePumpKqueueTest()26 MessagePumpKqueueTest()
27 : pump_(new MessagePumpKqueue()), executor_(WrapUnique(pump_.get())) {}
28
pump()29 MessagePumpKqueue* pump() { return pump_; }
30
CreatePortPair(mac::ScopedMachReceiveRight * receive,mac::ScopedMachSendRight * send)31 static void CreatePortPair(mac::ScopedMachReceiveRight* receive,
32 mac::ScopedMachSendRight* send) {
33 mach_port_options_t options{};
34 options.flags = MPO_INSERT_SEND_RIGHT;
35 mac::ScopedMachReceiveRight port;
36 kern_return_t kr = mach_port_construct(
37 mach_task_self(), &options, 0,
38 mac::ScopedMachReceiveRight::Receiver(*receive).get());
39 ASSERT_EQ(kr, KERN_SUCCESS);
40 *send = mac::ScopedMachSendRight(receive->get());
41 }
42
SendEmptyMessage(mach_port_t remote_port,mach_msg_id_t msgid)43 static mach_msg_return_t SendEmptyMessage(mach_port_t remote_port,
44 mach_msg_id_t msgid) {
45 mach_msg_empty_send_t message{};
46 message.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
47 message.header.msgh_size = sizeof(message);
48 message.header.msgh_remote_port = remote_port;
49 message.header.msgh_id = msgid;
50 return mach_msg_send(&message.header);
51 }
52
53 private:
54 raw_ptr<MessagePumpKqueue> pump_; // Weak, owned by |executor_|.
55 SingleThreadTaskExecutor executor_;
56 };
57
58 class PortWatcher : public MessagePumpKqueue::MachPortWatcher {
59 public:
PortWatcher(RepeatingClosure callback)60 PortWatcher(RepeatingClosure callback) : callback_(std::move(callback)) {}
~PortWatcher()61 ~PortWatcher() override {}
62
OnMachMessageReceived(mach_port_t port)63 void OnMachMessageReceived(mach_port_t port) override {
64 mach_msg_empty_rcv_t message{};
65 kern_return_t kr = mach_msg(&message.header, MACH_RCV_MSG, 0,
66 sizeof(message), port, 0, MACH_PORT_NULL);
67 ASSERT_EQ(kr, KERN_SUCCESS);
68
69 messages_.push_back(message.header);
70
71 callback_.Run();
72 }
73
74 std::vector<mach_msg_header_t> messages_;
75
76 private:
77 RepeatingClosure callback_;
78 };
79
TEST_F(MessagePumpKqueueTest,MachPortBasicWatch)80 TEST_F(MessagePumpKqueueTest, MachPortBasicWatch) {
81 mac::ScopedMachReceiveRight port;
82 mac::ScopedMachSendRight send_right;
83 CreatePortPair(&port, &send_right);
84
85 mach_msg_id_t msgid = 'helo';
86
87 RunLoop run_loop;
88 PortWatcher watcher(run_loop.QuitClosure());
89 MessagePumpKqueue::MachPortWatchController controller(FROM_HERE);
90
91 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
92 FROM_HERE, BindOnce(
93 [](mach_port_t port, mach_msg_id_t msgid, RunLoop* loop) {
94 mach_msg_return_t kr = SendEmptyMessage(port, msgid);
95 EXPECT_EQ(kr, KERN_SUCCESS);
96 if (kr != KERN_SUCCESS) {
97 loop->Quit();
98 }
99 },
100 port.get(), msgid, Unretained(&run_loop)));
101
102 pump()->WatchMachReceivePort(port.get(), &controller, &watcher);
103
104 run_loop.Run();
105
106 ASSERT_EQ(1u, watcher.messages_.size());
107 EXPECT_EQ(port.get(), watcher.messages_[0].msgh_local_port);
108 EXPECT_EQ(msgid, watcher.messages_[0].msgh_id);
109 }
110
TEST_F(MessagePumpKqueueTest,MachPortStopWatching)111 TEST_F(MessagePumpKqueueTest, MachPortStopWatching) {
112 mac::ScopedMachReceiveRight port;
113 mac::ScopedMachSendRight send_right;
114 CreatePortPair(&port, &send_right);
115
116 RunLoop run_loop;
117 PortWatcher watcher(run_loop.QuitClosure());
118 MessagePumpKqueue::MachPortWatchController controller(FROM_HERE);
119
120 pump()->WatchMachReceivePort(port.get(), &controller, &watcher);
121
122 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
123 FROM_HERE,
124 BindOnce(
125 [](MessagePumpKqueue::MachPortWatchController* controller) {
126 controller->StopWatchingMachPort();
127 },
128 Unretained(&controller)));
129
130 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
131 FROM_HERE, BindOnce(
132 [](mach_port_t port) {
133 EXPECT_EQ(KERN_SUCCESS, SendEmptyMessage(port, 100));
134 },
135 port.get()));
136
137 run_loop.RunUntilIdle();
138
139 EXPECT_EQ(0u, watcher.messages_.size());
140 }
141
TEST_F(MessagePumpKqueueTest,MultipleMachWatchers)142 TEST_F(MessagePumpKqueueTest, MultipleMachWatchers) {
143 mac::ScopedMachReceiveRight port1, port2;
144 mac::ScopedMachSendRight send_right1, send_right2;
145 CreatePortPair(&port1, &send_right1);
146 CreatePortPair(&port2, &send_right2);
147
148 RunLoop run_loop;
149
150 int port1_count = 0, port2_count = 0;
151
152 // Whenever port1 receives a message, it will send to port2.
153 // Whenever port2 receives a message, it will send to port1.
154 // When port2 has sent 3 messages to port1, it will stop.
155
156 PortWatcher watcher1(BindRepeating(
157 [](mach_port_t port2, int* port2_count, RunLoop* loop) {
158 mach_msg_id_t id = (0x2 << 16) | ++(*port2_count);
159 mach_msg_return_t kr = SendEmptyMessage(port2, id);
160 EXPECT_EQ(kr, KERN_SUCCESS);
161 if (kr != KERN_SUCCESS) {
162 loop->Quit();
163 }
164 },
165 port2.get(), &port2_count, &run_loop));
166 MessagePumpKqueue::MachPortWatchController controller1(FROM_HERE);
167
168 PortWatcher watcher2(BindRepeating(
169 [](mach_port_t port1, int* port1_count, RunLoop* loop) {
170 if (*port1_count == 3) {
171 loop->Quit();
172 return;
173 }
174 mach_msg_id_t id = (0x1 << 16) | ++(*port1_count);
175 mach_msg_return_t kr = SendEmptyMessage(port1, id);
176 EXPECT_EQ(kr, KERN_SUCCESS);
177 if (kr != KERN_SUCCESS) {
178 loop->Quit();
179 }
180 },
181 port1.get(), &port1_count, &run_loop));
182 MessagePumpKqueue::MachPortWatchController controller2(FROM_HERE);
183
184 pump()->WatchMachReceivePort(port1.get(), &controller1, &watcher1);
185 pump()->WatchMachReceivePort(port2.get(), &controller2, &watcher2);
186
187 // Start ping-ponging with by sending the first message to port1.
188 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
189 FROM_HERE, BindOnce(
190 [](mach_port_t port1) {
191 ASSERT_EQ(KERN_SUCCESS,
192 SendEmptyMessage(port1, 0xf000f));
193 },
194 port1.get()));
195
196 run_loop.Run();
197
198 ASSERT_EQ(4u, watcher1.messages_.size());
199 ASSERT_EQ(4u, watcher2.messages_.size());
200
201 EXPECT_EQ(0xf000f, watcher1.messages_[0].msgh_id);
202 EXPECT_EQ(0x10001, watcher1.messages_[1].msgh_id);
203 EXPECT_EQ(0x10002, watcher1.messages_[2].msgh_id);
204 EXPECT_EQ(0x10003, watcher1.messages_[3].msgh_id);
205
206 EXPECT_EQ(0x20001, watcher2.messages_[0].msgh_id);
207 EXPECT_EQ(0x20002, watcher2.messages_[1].msgh_id);
208 EXPECT_EQ(0x20003, watcher2.messages_[2].msgh_id);
209 EXPECT_EQ(0x20004, watcher2.messages_[3].msgh_id);
210 }
211
212 } // namespace
213 } // namespace base
214