1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/ext/base/unix_socket.h"
18
19 #include <signal.h>
20 #include <sys/types.h>
21 #include <list>
22 #include <thread>
23
24 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
25 #include <sys/mman.h>
26 #include <sys/socket.h>
27 #include <sys/un.h>
28 #endif
29
30 #include "perfetto/base/build_config.h"
31 #include "perfetto/base/logging.h"
32 #include "perfetto/ext/base/file_utils.h"
33 #include "perfetto/ext/base/pipe.h"
34 #include "perfetto/ext/base/temp_file.h"
35 #include "perfetto/ext/base/utils.h"
36 #include "src/base/test/test_task_runner.h"
37 #include "src/ipc/test/test_socket.h"
38 #include "test/gtest_and_gmock.h"
39
40 namespace perfetto {
41 namespace base {
42 namespace {
43
44 using ::testing::_;
45 using ::testing::AtLeast;
46 using ::testing::Invoke;
47 using ::testing::InvokeWithoutArgs;
48 using ::testing::Mock;
49
50 ipc::TestSocket kTestSocket{"unix_socket_unittest"};
51
52 class MockEventListener : public UnixSocket::EventListener {
53 public:
54 MOCK_METHOD2(OnNewIncomingConnection, void(UnixSocket*, UnixSocket*));
55 MOCK_METHOD2(OnConnect, void(UnixSocket*, bool));
56 MOCK_METHOD1(OnDisconnect, void(UnixSocket*));
57 MOCK_METHOD1(OnDataAvailable, void(UnixSocket*));
58
59 // GMock doesn't support mocking methods with non-copiable args.
OnNewIncomingConnection(UnixSocket * self,std::unique_ptr<UnixSocket> new_connection)60 void OnNewIncomingConnection(
61 UnixSocket* self,
62 std::unique_ptr<UnixSocket> new_connection) override {
63 incoming_connections_.emplace_back(std::move(new_connection));
64 OnNewIncomingConnection(self, incoming_connections_.back().get());
65 }
66
GetIncomingConnection()67 std::unique_ptr<UnixSocket> GetIncomingConnection() {
68 if (incoming_connections_.empty())
69 return nullptr;
70 std::unique_ptr<UnixSocket> sock = std::move(incoming_connections_.front());
71 incoming_connections_.pop_front();
72 return sock;
73 }
74
75 private:
76 std::list<std::unique_ptr<UnixSocket>> incoming_connections_;
77 };
78
79 class UnixSocketTest : public ::testing::Test {
80 protected:
SetUp()81 void SetUp() override { kTestSocket.Destroy(); }
TearDown()82 void TearDown() override { kTestSocket.Destroy(); }
83
84 TestTaskRunner task_runner_;
85 MockEventListener event_listener_;
86 };
87
TEST_F(UnixSocketTest,ConnectionFailureIfUnreachable)88 TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
89 auto cli =
90 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
91 kTestSocket.family(), SockType::kStream);
92 ASSERT_FALSE(cli->is_connected());
93 auto checkpoint = task_runner_.CreateCheckpoint("failure");
94 EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
95 .WillOnce(InvokeWithoutArgs(checkpoint));
96 task_runner_.RunUntilCheckpoint("failure");
97 }
98
99 // Both server and client should see an OnDisconnect() if the server drops
100 // incoming connections immediately as they are created.
TEST_F(UnixSocketTest,ConnectionImmediatelyDroppedByServer)101 TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
102 auto srv =
103 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
104 kTestSocket.family(), SockType::kStream);
105 ASSERT_TRUE(srv->is_listening());
106
107 // The server will immediately shutdown the connection upon
108 // OnNewIncomingConnection().
109 auto srv_did_shutdown = task_runner_.CreateCheckpoint("srv_did_shutdown");
110 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
111 .WillOnce(
112 Invoke([this, srv_did_shutdown](UnixSocket*, UnixSocket* new_conn) {
113 EXPECT_CALL(event_listener_, OnDisconnect(new_conn));
114 new_conn->Shutdown(true);
115 srv_did_shutdown();
116 }));
117
118 auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
119 auto cli =
120 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
121 kTestSocket.family(), SockType::kStream);
122 EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
123 .WillOnce(InvokeWithoutArgs(checkpoint));
124 task_runner_.RunUntilCheckpoint("cli_connected");
125 task_runner_.RunUntilCheckpoint("srv_did_shutdown");
126
127 // Trying to send something will trigger the disconnection notification.
128 auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
129 EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
130 .WillOnce(InvokeWithoutArgs(cli_disconnected));
131
132 // On Windows the first send immediately after the disconnection succeeds, the
133 // kernel will detect the disconnection only later.
134 cli->Send(".");
135 EXPECT_FALSE(cli->Send("should_fail_both_on_win_and_unix"));
136 task_runner_.RunUntilCheckpoint("cli_disconnected");
137 }
138
TEST_F(UnixSocketTest,ClientAndServerExchangeData)139 TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
140 auto srv =
141 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
142 kTestSocket.family(), SockType::kStream);
143 ASSERT_TRUE(srv->is_listening());
144
145 auto cli =
146 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
147 kTestSocket.family(), SockType::kStream);
148 auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
149 EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
150 .WillOnce(InvokeWithoutArgs(cli_connected));
151 auto srv_conn_seen = task_runner_.CreateCheckpoint("srv_conn_seen");
152 auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
153 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
154 .WillOnce(Invoke([this, srv_conn_seen, srv_disconnected](
155 UnixSocket*, UnixSocket* srv_conn) {
156 EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
157 .WillOnce(InvokeWithoutArgs(srv_disconnected));
158 srv_conn_seen();
159 }));
160 task_runner_.RunUntilCheckpoint("srv_conn_seen");
161 task_runner_.RunUntilCheckpoint("cli_connected");
162
163 auto srv_conn = event_listener_.GetIncomingConnection();
164 ASSERT_TRUE(srv_conn);
165 ASSERT_TRUE(cli->is_connected());
166
167 auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
168 EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
169 .WillOnce(Invoke([cli_did_recv](UnixSocket* s) {
170 ASSERT_EQ("srv>cli", s->ReceiveString());
171 cli_did_recv();
172 }));
173
174 auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
175 EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
176 .WillOnce(Invoke([srv_did_recv](UnixSocket* s) {
177 ASSERT_EQ("cli>srv", s->ReceiveString());
178 srv_did_recv();
179 }));
180 ASSERT_TRUE(cli->Send("cli>srv"));
181 ASSERT_TRUE(srv_conn->Send("srv>cli"));
182 task_runner_.RunUntilCheckpoint("cli_did_recv");
183 task_runner_.RunUntilCheckpoint("srv_did_recv");
184
185 // Check that Send/Receive() fails gracefully once the socket is closed.
186 auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
187 EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
188 .WillOnce(InvokeWithoutArgs(cli_disconnected));
189 cli->Shutdown(true);
190 char msg[4];
191 ASSERT_EQ(0u, cli->Receive(&msg, sizeof(msg)));
192 ASSERT_EQ("", cli->ReceiveString());
193 ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
194 ASSERT_EQ("", srv_conn->ReceiveString());
195 ASSERT_FALSE(cli->Send("foo"));
196 ASSERT_FALSE(srv_conn->Send("bar"));
197 srv->Shutdown(true);
198 task_runner_.RunUntilCheckpoint("cli_disconnected");
199 task_runner_.RunUntilCheckpoint("srv_disconnected");
200 }
201
TEST_F(UnixSocketTest,ListenWithPassedSocketHandle)202 TEST_F(UnixSocketTest, ListenWithPassedSocketHandle) {
203 auto sock_raw =
204 UnixSocketRaw::CreateMayFail(kTestSocket.family(), SockType::kStream);
205 ASSERT_TRUE(sock_raw.Bind(kTestSocket.name()));
206 auto fd = sock_raw.ReleaseFd();
207 auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_,
208 kTestSocket.family(), SockType::kStream);
209 ASSERT_TRUE(srv->is_listening());
210
211 auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
212 auto cli =
213 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
214 kTestSocket.family(), SockType::kStream);
215 EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
216 .WillOnce(InvokeWithoutArgs(cli_connected));
217 auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
218 auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
219 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
220 .WillOnce(Invoke([this, srv_connected, srv_disconnected](
221 UnixSocket*, UnixSocket* srv_conn) {
222 // An empty OnDataAvailable might be raised to signal the EOF state.
223 EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
224 .WillRepeatedly(
225 InvokeWithoutArgs([srv_conn] { srv_conn->ReceiveString(); }));
226 EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
227 .WillOnce(InvokeWithoutArgs(srv_disconnected));
228 srv_connected();
229 }));
230 task_runner_.RunUntilCheckpoint("srv_connected");
231 task_runner_.RunUntilCheckpoint("cli_connected");
232 ASSERT_TRUE(cli->is_connected());
233 cli.reset();
234 task_runner_.RunUntilCheckpoint("srv_disconnected");
235 }
236
237 // Mostly a stress tests. Connects kNumClients clients to the same server and
238 // tests that all can exchange data and can see the expected sequence of events.
TEST_F(UnixSocketTest,SeveralClients)239 TEST_F(UnixSocketTest, SeveralClients) {
240 auto srv =
241 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
242 kTestSocket.family(), SockType::kStream);
243 ASSERT_TRUE(srv->is_listening());
244 constexpr size_t kNumClients = 32;
245 std::unique_ptr<UnixSocket> cli[kNumClients];
246
247 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
248 .Times(kNumClients)
249 .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
250 EXPECT_CALL(event_listener_, OnDataAvailable(s))
251 .WillOnce(Invoke([](UnixSocket* t) {
252 ASSERT_EQ("PING", t->ReceiveString());
253 ASSERT_TRUE(t->Send("PONG"));
254 }));
255 }));
256
257 for (size_t i = 0; i < kNumClients; i++) {
258 cli[i] =
259 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
260 kTestSocket.family(), SockType::kStream);
261 EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
262 .WillOnce(Invoke([](UnixSocket* s, bool success) {
263 ASSERT_TRUE(success);
264 ASSERT_TRUE(s->Send("PING"));
265 }));
266
267 auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
268 EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
269 .WillOnce(Invoke([checkpoint](UnixSocket* s) {
270 ASSERT_EQ("PONG", s->ReceiveString());
271 checkpoint();
272 }));
273 }
274
275 for (size_t i = 0; i < kNumClients; i++) {
276 task_runner_.RunUntilCheckpoint(std::to_string(i));
277 ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
278 }
279 }
280
TEST_F(UnixSocketTest,BlockingSend)281 TEST_F(UnixSocketTest, BlockingSend) {
282 auto srv =
283 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
284 kTestSocket.family(), SockType::kStream);
285 ASSERT_TRUE(srv->is_listening());
286
287 auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
288 size_t total_bytes_received = 0;
289 static constexpr size_t kTotalBytes = 1024 * 1024 * 4;
290 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
291 .WillOnce(Invoke([this, &total_bytes_received, all_frames_done](
292 UnixSocket*, UnixSocket* srv_conn) {
293 EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
294 .WillRepeatedly(
295 Invoke([&total_bytes_received, all_frames_done](UnixSocket* s) {
296 char buf[1024];
297 size_t res = s->Receive(buf, sizeof(buf));
298 total_bytes_received += res;
299 if (total_bytes_received == kTotalBytes)
300 all_frames_done();
301 }));
302 }));
303
304 // Override default timeout as this test can take time on the emulator.
305 static constexpr int kTimeoutMs = 60000 * 3;
306
307 // Perform the blocking send form another thread.
308 std::thread tx_thread([] {
309 TestTaskRunner tx_task_runner;
310 MockEventListener tx_events;
311 auto cli =
312 UnixSocket::Connect(kTestSocket.name(), &tx_events, &tx_task_runner,
313 kTestSocket.family(), SockType::kStream);
314
315 auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
316 EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
317 .WillOnce(InvokeWithoutArgs(cli_connected));
318 tx_task_runner.RunUntilCheckpoint("cli_connected");
319
320 auto all_sent = tx_task_runner.CreateCheckpoint("all_sent");
321 char buf[1024 * 32] = {};
322 tx_task_runner.PostTask([&cli, &buf, all_sent] {
323 for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
324 cli->Send(buf, sizeof(buf));
325 all_sent();
326 });
327 tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
328 });
329
330 task_runner_.RunUntilCheckpoint("all_frames_done", kTimeoutMs);
331 tx_thread.join();
332 }
333
334 // Regression test for b/76155349 . If the receiver end disconnects while the
335 // sender is in the middle of a large send(), the socket should gracefully give
336 // up (i.e. Shutdown()) but not crash.
TEST_F(UnixSocketTest,ReceiverDisconnectsDuringSend)337 TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
338 auto srv =
339 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
340 kTestSocket.family(), SockType::kStream);
341 ASSERT_TRUE(srv->is_listening());
342 static constexpr int kTimeoutMs = 30000;
343
344 auto receive_done = task_runner_.CreateCheckpoint("receive_done");
345 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
346 .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
347 EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
348 .WillOnce(Invoke([receive_done](UnixSocket* s) {
349 char buf[1024];
350 size_t res = s->Receive(buf, sizeof(buf));
351 ASSERT_EQ(1024u, res);
352 s->Shutdown(false /*notify*/);
353 receive_done();
354 }));
355 }));
356
357 // Perform the blocking send form another thread.
358 std::thread tx_thread([] {
359 TestTaskRunner tx_task_runner;
360 MockEventListener tx_events;
361 auto cli =
362 UnixSocket::Connect(kTestSocket.name(), &tx_events, &tx_task_runner,
363 kTestSocket.family(), SockType::kStream);
364
365 auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
366 EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
367 .WillOnce(InvokeWithoutArgs(cli_connected));
368 tx_task_runner.RunUntilCheckpoint("cli_connected");
369
370 auto send_done = tx_task_runner.CreateCheckpoint("send_done");
371 static constexpr size_t kBufSize = 32 * 1024 * 1024;
372 std::unique_ptr<char[]> buf(new char[kBufSize]());
373 tx_task_runner.PostTask([&cli, &buf, send_done] {
374 cli->Send(buf.get(), kBufSize);
375 send_done();
376 });
377
378 tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
379 });
380 task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
381 tx_thread.join();
382 }
383
TEST_F(UnixSocketTest,ReleaseSocket)384 TEST_F(UnixSocketTest, ReleaseSocket) {
385 auto srv =
386 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
387 kTestSocket.family(), SockType::kStream);
388 ASSERT_TRUE(srv->is_listening());
389 auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
390 UnixSocket* peer = nullptr;
391 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
392 .WillOnce(
393 Invoke([srv_connected, &peer](UnixSocket*, UnixSocket* new_conn) {
394 peer = new_conn;
395 srv_connected();
396 }));
397
398 auto cli =
399 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
400 kTestSocket.family(), SockType::kStream);
401 auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
402 EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
403 .WillOnce(InvokeWithoutArgs(cli_connected));
404 task_runner_.RunUntilCheckpoint("srv_connected");
405 task_runner_.RunUntilCheckpoint("cli_connected");
406 srv->Shutdown(true);
407
408 cli->Send("test");
409
410 ASSERT_NE(peer, nullptr);
411 auto raw_sock = peer->ReleaseSocket();
412
413 EXPECT_CALL(event_listener_, OnDataAvailable(_)).Times(0);
414 task_runner_.RunUntilIdle();
415
416 char buf[sizeof("test")];
417 ASSERT_TRUE(raw_sock);
418 ASSERT_EQ(raw_sock.Receive(buf, sizeof(buf)),
419 static_cast<ssize_t>(sizeof(buf)));
420 ASSERT_STREQ(buf, "test");
421 }
422
TEST_F(UnixSocketTest,TcpStream)423 TEST_F(UnixSocketTest, TcpStream) {
424 char host_and_port[32];
425 int attempt = 0;
426 std::unique_ptr<UnixSocket> srv;
427
428 // Try listening on a random port. Some ports might be taken by other syste
429 // services. Do a bunch of attempts on different ports before giving up.
430 do {
431 sprintf(host_and_port, "127.0.0.1:%d", 10000 + (rand() % 10000));
432 srv = UnixSocket::Listen(host_and_port, &event_listener_, &task_runner_,
433 SockFamily::kInet, SockType::kStream);
434 } while ((!srv || !srv->is_listening()) && attempt++ < 10);
435 ASSERT_TRUE(srv->is_listening());
436
437 constexpr size_t kNumClients = 3;
438 std::unique_ptr<UnixSocket> cli[kNumClients];
439 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
440 .Times(kNumClients)
441 .WillRepeatedly(Invoke([&](UnixSocket*, UnixSocket* s) {
442 // OnDisconnect() might spuriously happen depending on the dtor order.
443 EXPECT_CALL(event_listener_, OnDisconnect(s)).Times(AtLeast(0));
444 EXPECT_CALL(event_listener_, OnDataAvailable(s))
445 .WillRepeatedly(Invoke([](UnixSocket* cli_sock) {
446 cli_sock->ReceiveString(); // Read connection EOF;
447 }));
448 ASSERT_TRUE(s->Send("welcome"));
449 }));
450
451 for (size_t i = 0; i < kNumClients; i++) {
452 cli[i] = UnixSocket::Connect(host_and_port, &event_listener_, &task_runner_,
453 SockFamily::kInet, SockType::kStream);
454 auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
455 EXPECT_CALL(event_listener_, OnDisconnect(cli[i].get())).Times(AtLeast(0));
456 EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true));
457 EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
458 .WillRepeatedly(Invoke([checkpoint](UnixSocket* s) {
459 auto str = s->ReceiveString();
460 if (str == "")
461 return; // Connection EOF.
462 ASSERT_EQ("welcome", str);
463 checkpoint();
464 }));
465 }
466
467 for (size_t i = 0; i < kNumClients; i++) {
468 task_runner_.RunUntilCheckpoint(std::to_string(i));
469 ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
470 }
471 }
472
473 // ---------------------------------
474 // Posix-only tests below this point
475 // ---------------------------------
476
477 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
478
479 // Tests the SockPeerCredMode::kIgnore logic.
TEST_F(UnixSocketTest,IgnorePeerCredentials)480 TEST_F(UnixSocketTest, IgnorePeerCredentials) {
481 auto srv =
482 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
483 kTestSocket.family(), SockType::kStream);
484 ASSERT_TRUE(srv->is_listening());
485 auto cli1_connected = task_runner_.CreateCheckpoint("cli1_connected");
486 auto cli1 = UnixSocket::Connect(kTestSocket.name(), &event_listener_,
487 &task_runner_, kTestSocket.family(),
488 SockType::kStream, SockPeerCredMode::kIgnore);
489 EXPECT_CALL(event_listener_, OnConnect(cli1.get(), true))
490 .WillOnce(InvokeWithoutArgs(cli1_connected));
491
492 auto cli2_connected = task_runner_.CreateCheckpoint("cli2_connected");
493 auto cli2 = UnixSocket::Connect(
494 kTestSocket.name(), &event_listener_, &task_runner_, kTestSocket.family(),
495 SockType::kStream, SockPeerCredMode::kReadOnConnect);
496 EXPECT_CALL(event_listener_, OnConnect(cli2.get(), true))
497 .WillOnce(InvokeWithoutArgs(cli2_connected));
498
499 task_runner_.RunUntilCheckpoint("cli1_connected");
500 task_runner_.RunUntilCheckpoint("cli2_connected");
501
502 ASSERT_EQ(cli1->peer_uid_posix(/*skip_check_for_testing=*/true), kInvalidUid);
503 ASSERT_EQ(cli2->peer_uid_posix(), geteuid());
504 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
505 PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
506 ASSERT_EQ(cli1->peer_pid_linux(/*skip_check_for_testing=*/true), kInvalidPid);
507 ASSERT_EQ(cli2->peer_pid_linux(), getpid());
508 #endif
509 }
510
511 // Checks that the peer_uid() is retained after the client disconnects. The IPC
512 // layer needs to rely on this to validate messages received immediately before
513 // a client disconnects.
TEST_F(UnixSocketTest,PeerCredentialsRetainedAfterDisconnect)514 TEST_F(UnixSocketTest, PeerCredentialsRetainedAfterDisconnect) {
515 auto srv =
516 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
517 kTestSocket.family(), SockType::kStream);
518 ASSERT_TRUE(srv->is_listening());
519 UnixSocket* srv_client_conn = nullptr;
520 auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
521 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
522 .WillOnce(Invoke([&srv_client_conn, srv_connected](UnixSocket*,
523 UnixSocket* srv_conn) {
524 srv_client_conn = srv_conn;
525 EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_conn->peer_uid_posix()));
526 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
527 PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
528 EXPECT_EQ(getpid(), static_cast<pid_t>(srv_conn->peer_pid_linux()));
529 #endif
530 srv_connected();
531 }));
532 auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
533 auto cli =
534 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
535 kTestSocket.family(), SockType::kStream);
536 EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
537 .WillOnce(InvokeWithoutArgs(cli_connected));
538
539 task_runner_.RunUntilCheckpoint("cli_connected");
540 task_runner_.RunUntilCheckpoint("srv_connected");
541 ASSERT_NE(nullptr, srv_client_conn);
542 ASSERT_TRUE(srv_client_conn->is_connected());
543
544 auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
545 EXPECT_CALL(event_listener_, OnDisconnect(srv_client_conn))
546 .WillOnce(InvokeWithoutArgs(cli_disconnected));
547
548 // TODO(primiano): when the a peer disconnects, the other end receives a
549 // spurious OnDataAvailable() that needs to be acked with a Receive() to read
550 // the EOF. See b/69536434.
551 EXPECT_CALL(event_listener_, OnDataAvailable(srv_client_conn))
552 .WillOnce(Invoke([](UnixSocket* sock) { sock->ReceiveString(); }));
553
554 cli.reset();
555 task_runner_.RunUntilCheckpoint("cli_disconnected");
556 ASSERT_FALSE(srv_client_conn->is_connected());
557 EXPECT_EQ(geteuid(),
558 static_cast<uint32_t>(srv_client_conn->peer_uid_posix()));
559 #if PERFETTO_BUILDFLAG(PERFETTO_OS_LINUX) || \
560 PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
561 EXPECT_EQ(getpid(), static_cast<pid_t>(srv_client_conn->peer_pid_linux()));
562 #endif
563 }
564
TEST_F(UnixSocketTest,ClientAndServerExchangeFDs)565 TEST_F(UnixSocketTest, ClientAndServerExchangeFDs) {
566 static constexpr char cli_str[] = "cli>srv";
567 static constexpr char srv_str[] = "srv>cli";
568 auto srv =
569 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
570 kTestSocket.family(), SockType::kStream);
571 ASSERT_TRUE(srv->is_listening());
572
573 auto cli =
574 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
575 kTestSocket.family(), SockType::kStream);
576 EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
577 auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
578 auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
579 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
580 .WillOnce(Invoke([this, cli_connected, srv_disconnected](
581 UnixSocket*, UnixSocket* srv_conn) {
582 EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
583 .WillOnce(InvokeWithoutArgs(srv_disconnected));
584 cli_connected();
585 }));
586 task_runner_.RunUntilCheckpoint("cli_connected");
587
588 auto srv_conn = event_listener_.GetIncomingConnection();
589 ASSERT_TRUE(srv_conn);
590 ASSERT_TRUE(cli->is_connected());
591
592 ScopedFile null_fd(base::OpenFile("/dev/null", O_RDONLY));
593 ScopedFile zero_fd(base::OpenFile("/dev/zero", O_RDONLY));
594
595 auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
596 EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
597 .WillRepeatedly(Invoke([cli_did_recv](UnixSocket* s) {
598 ScopedFile fd_buf[3];
599 char buf[sizeof(cli_str)];
600 if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
601 return;
602 ASSERT_STREQ(srv_str, buf);
603 ASSERT_NE(*fd_buf[0], -1);
604 ASSERT_NE(*fd_buf[1], -1);
605 ASSERT_EQ(*fd_buf[2], -1);
606
607 char rd_buf[1];
608 // /dev/null
609 ASSERT_EQ(read(*fd_buf[0], rd_buf, sizeof(rd_buf)), 0);
610 // /dev/zero
611 ASSERT_EQ(read(*fd_buf[1], rd_buf, sizeof(rd_buf)), 1);
612 cli_did_recv();
613 }));
614
615 auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
616 EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
617 .WillRepeatedly(Invoke([srv_did_recv](UnixSocket* s) {
618 ScopedFile fd_buf[3];
619 char buf[sizeof(srv_str)];
620 if (!s->Receive(buf, sizeof(buf), fd_buf, ArraySize(fd_buf)))
621 return;
622 ASSERT_STREQ(cli_str, buf);
623 ASSERT_NE(*fd_buf[0], -1);
624 ASSERT_NE(*fd_buf[1], -1);
625 ASSERT_EQ(*fd_buf[2], -1);
626
627 char rd_buf[1];
628 // /dev/null
629 ASSERT_EQ(read(*fd_buf[0], rd_buf, sizeof(rd_buf)), 0);
630 // /dev/zero
631 ASSERT_EQ(read(*fd_buf[1], rd_buf, sizeof(rd_buf)), 1);
632 srv_did_recv();
633 }));
634
635 int buf_fd[2] = {null_fd.get(), zero_fd.get()};
636
637 ASSERT_TRUE(
638 cli->Send(cli_str, sizeof(cli_str), buf_fd, base::ArraySize(buf_fd)));
639 ASSERT_TRUE(srv_conn->Send(srv_str, sizeof(srv_str), buf_fd,
640 base::ArraySize(buf_fd)));
641 task_runner_.RunUntilCheckpoint("srv_did_recv");
642 task_runner_.RunUntilCheckpoint("cli_did_recv");
643
644 auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
645 EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
646 .WillOnce(InvokeWithoutArgs(cli_disconnected));
647 cli->Shutdown(true);
648 srv->Shutdown(true);
649 task_runner_.RunUntilCheckpoint("srv_disconnected");
650 task_runner_.RunUntilCheckpoint("cli_disconnected");
651 }
652
653 // Creates two processes. The server process creates a file and passes it over
654 // the socket to the client. Both processes mmap the file in shared mode and
655 // check that they see the same contents.
TEST_F(UnixSocketTest,SharedMemory)656 TEST_F(UnixSocketTest, SharedMemory) {
657 Pipe pipe = Pipe::Create();
658 pid_t pid = fork();
659 ASSERT_GE(pid, 0);
660 constexpr size_t kTmpSize = 4096;
661
662 if (pid == 0) {
663 // Child process.
664 TempFile scoped_tmp = TempFile::CreateUnlinked();
665 int tmp_fd = scoped_tmp.fd();
666 ASSERT_FALSE(ftruncate(tmp_fd, kTmpSize));
667 char* mem = reinterpret_cast<char*>(
668 mmap(nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, tmp_fd, 0));
669 ASSERT_NE(nullptr, mem);
670 memcpy(mem, "shm rocks", 10);
671
672 auto srv =
673 UnixSocket::Listen(kTestSocket.name(), &event_listener_, &task_runner_,
674 kTestSocket.family(), SockType::kStream);
675 ASSERT_TRUE(srv->is_listening());
676 // Signal the other process that it can connect.
677 ASSERT_EQ(1, base::WriteAll(*pipe.wr, ".", 1));
678 auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_server");
679 EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
680 .WillOnce(Invoke(
681 [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
682 ASSERT_EQ(geteuid(),
683 static_cast<uint32_t>(new_conn->peer_uid_posix()));
684 ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
685 // Wait for the client to change this again.
686 EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
687 .WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
688 ASSERT_EQ("change notify", s->ReceiveString());
689 ASSERT_STREQ("rock more", mem);
690 checkpoint();
691 }));
692 }));
693 task_runner_.RunUntilCheckpoint("change_seen_by_server");
694 ASSERT_TRUE(Mock::VerifyAndClearExpectations(&event_listener_));
695 _exit(0);
696 } else {
697 char sync_cmd = '\0';
698 ASSERT_EQ(1, PERFETTO_EINTR(read(*pipe.rd, &sync_cmd, 1)));
699 ASSERT_EQ('.', sync_cmd);
700 auto cli =
701 UnixSocket::Connect(kTestSocket.name(), &event_listener_, &task_runner_,
702 kTestSocket.family(), SockType::kStream);
703 EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
704 auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
705 EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
706 .WillOnce(Invoke([checkpoint](UnixSocket* s) {
707 char msg[32];
708 ScopedFile fd;
709 ASSERT_EQ(5u, s->Receive(msg, sizeof(msg), &fd));
710 ASSERT_STREQ("txfd", msg);
711 ASSERT_TRUE(fd);
712 char* mem = reinterpret_cast<char*>(mmap(
713 nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, *fd, 0));
714 ASSERT_NE(nullptr, mem);
715 mem[9] = '\0'; // Just to get a clean error in case of test failure.
716 ASSERT_STREQ("shm rocks", mem);
717
718 // Now change the shared memory and ping the other process.
719 memcpy(mem, "rock more", 10);
720 ASSERT_TRUE(s->Send("change notify"));
721 checkpoint();
722 }));
723 task_runner_.RunUntilCheckpoint("change_seen_by_client");
724 int st = 0;
725 PERFETTO_EINTR(waitpid(pid, &st, 0));
726 ASSERT_FALSE(WIFSIGNALED(st)) << "Server died with signal " << WTERMSIG(st);
727 EXPECT_TRUE(WIFEXITED(st));
728 ASSERT_EQ(0, WEXITSTATUS(st));
729 }
730 }
731
TEST_F(UnixSocketTest,ShiftMsgHdrSendPartialFirst)732 TEST_F(UnixSocketTest, ShiftMsgHdrSendPartialFirst) {
733 // Send a part of the first iov, then send the rest.
734 struct iovec iov[2] = {};
735 char hello[] = "hello";
736 char world[] = "world";
737 iov[0].iov_base = &hello[0];
738 iov[0].iov_len = base::ArraySize(hello);
739
740 iov[1].iov_base = &world[0];
741 iov[1].iov_len = base::ArraySize(world);
742
743 struct msghdr hdr = {};
744 hdr.msg_iov = iov;
745 hdr.msg_iovlen = base::ArraySize(iov);
746
747 UnixSocketRaw::ShiftMsgHdrPosix(1, &hdr);
748 EXPECT_NE(hdr.msg_iov, nullptr);
749 EXPECT_EQ(hdr.msg_iov[0].iov_base, &hello[1]);
750 EXPECT_EQ(hdr.msg_iov[1].iov_base, &world[0]);
751 EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 2);
752 EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "ello");
753 EXPECT_EQ(iov[0].iov_len, base::ArraySize(hello) - 1);
754
755 UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(hello) - 1, &hdr);
756 EXPECT_EQ(hdr.msg_iov, &iov[1]);
757 EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 1);
758 EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), world);
759 EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world));
760
761 UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(world), &hdr);
762 EXPECT_EQ(hdr.msg_iov, nullptr);
763 EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
764 }
765
TEST_F(UnixSocketTest,ShiftMsgHdrSendFirstAndPartial)766 TEST_F(UnixSocketTest, ShiftMsgHdrSendFirstAndPartial) {
767 // Send first iov and part of the second iov, then send the rest.
768 struct iovec iov[2] = {};
769 char hello[] = "hello";
770 char world[] = "world";
771 iov[0].iov_base = &hello[0];
772 iov[0].iov_len = base::ArraySize(hello);
773
774 iov[1].iov_base = &world[0];
775 iov[1].iov_len = base::ArraySize(world);
776
777 struct msghdr hdr = {};
778 hdr.msg_iov = iov;
779 hdr.msg_iovlen = base::ArraySize(iov);
780
781 UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(hello) + 1, &hdr);
782 EXPECT_NE(hdr.msg_iov, nullptr);
783 EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 1);
784 EXPECT_STREQ(reinterpret_cast<char*>(hdr.msg_iov[0].iov_base), "orld");
785 EXPECT_EQ(hdr.msg_iov[0].iov_len, base::ArraySize(world) - 1);
786
787 UnixSocketRaw::ShiftMsgHdrPosix(base::ArraySize(world) - 1, &hdr);
788 EXPECT_EQ(hdr.msg_iov, nullptr);
789 EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
790 }
791
TEST_F(UnixSocketTest,ShiftMsgHdrSendEverything)792 TEST_F(UnixSocketTest, ShiftMsgHdrSendEverything) {
793 // Send everything at once.
794 struct iovec iov[2] = {};
795 char hello[] = "hello";
796 char world[] = "world";
797 iov[0].iov_base = &hello[0];
798 iov[0].iov_len = base::ArraySize(hello);
799
800 iov[1].iov_base = &world[0];
801 iov[1].iov_len = base::ArraySize(world);
802
803 struct msghdr hdr = {};
804 hdr.msg_iov = iov;
805 hdr.msg_iovlen = base::ArraySize(iov);
806
807 UnixSocketRaw::ShiftMsgHdrPosix(
808 base::ArraySize(world) + base::ArraySize(hello), &hdr);
809 EXPECT_EQ(hdr.msg_iov, nullptr);
810 EXPECT_EQ(static_cast<int>(hdr.msg_iovlen), 0);
811 }
812
813 // For use in PartialSendMsgAll template argument. Cannot be a lambda.
RollbackSigaction(const struct sigaction * act)814 int RollbackSigaction(const struct sigaction* act) {
815 return sigaction(SIGWINCH, act, nullptr);
816 }
817
TEST_F(UnixSocketTest,PartialSendMsgAll)818 TEST_F(UnixSocketTest, PartialSendMsgAll) {
819 UnixSocketRaw send_sock;
820 UnixSocketRaw recv_sock;
821 std::tie(send_sock, recv_sock) =
822 UnixSocketRaw::CreatePairPosix(kTestSocket.family(), SockType::kStream);
823 ASSERT_TRUE(send_sock);
824 ASSERT_TRUE(recv_sock);
825
826 // Set bufsize to minimum.
827 int bufsize = 1024;
828 ASSERT_EQ(setsockopt(send_sock.fd(), SOL_SOCKET, SO_SNDBUF, &bufsize,
829 sizeof(bufsize)),
830 0);
831 ASSERT_EQ(setsockopt(recv_sock.fd(), SOL_SOCKET, SO_RCVBUF, &bufsize,
832 sizeof(bufsize)),
833 0);
834
835 // Send something larger than send + recv kernel buffers combined to make
836 // sendmsg block.
837 char send_buf[8192];
838 // Make MSAN happy.
839 for (size_t i = 0; i < sizeof(send_buf); ++i)
840 send_buf[i] = static_cast<char>(i % 256);
841 char recv_buf[sizeof(send_buf)];
842
843 // Need to install signal handler to cause the interrupt to happen.
844 // man 3 pthread_kill:
845 // Signal dispositions are process-wide: if a signal handler is
846 // installed, the handler will be invoked in the thread thread, but if
847 // the disposition of the signal is "stop", "continue", or "terminate",
848 // this action will affect the whole process.
849 struct sigaction oldact;
850 struct sigaction newact = {};
851 newact.sa_handler = [](int) {};
852 ASSERT_EQ(sigaction(SIGWINCH, &newact, &oldact), 0);
853 base::ScopedResource<const struct sigaction*, RollbackSigaction, nullptr>
854 rollback(&oldact);
855
856 auto blocked_thread = pthread_self();
857 std::thread th([blocked_thread, &recv_sock, &recv_buf] {
858 ssize_t rd = PERFETTO_EINTR(read(recv_sock.fd(), recv_buf, 1));
859 ASSERT_EQ(rd, 1);
860 // We are now sure the other thread is in sendmsg, interrupt send.
861 ASSERT_EQ(pthread_kill(blocked_thread, SIGWINCH), 0);
862 // Drain the socket to allow SendMsgAllPosix to succeed.
863 size_t offset = 1;
864 while (offset < sizeof(recv_buf)) {
865 rd = PERFETTO_EINTR(
866 read(recv_sock.fd(), recv_buf + offset, sizeof(recv_buf) - offset));
867 ASSERT_GE(rd, 0);
868 offset += static_cast<size_t>(rd);
869 }
870 });
871
872 // Test sending the send_buf in several chunks as an iov to exercise the
873 // more complicated code-paths of SendMsgAllPosix.
874 struct msghdr hdr = {};
875 struct iovec iov[4];
876 static_assert(sizeof(send_buf) % base::ArraySize(iov) == 0,
877 "Cannot split buffer into even pieces.");
878 constexpr size_t kChunkSize = sizeof(send_buf) / base::ArraySize(iov);
879 for (size_t i = 0; i < base::ArraySize(iov); ++i) {
880 iov[i].iov_base = send_buf + i * kChunkSize;
881 iov[i].iov_len = kChunkSize;
882 }
883 hdr.msg_iov = iov;
884 hdr.msg_iovlen = base::ArraySize(iov);
885
886 ASSERT_EQ(send_sock.SendMsgAllPosix(&hdr),
887 static_cast<ssize_t>(sizeof(send_buf)));
888 send_sock.Shutdown();
889 th.join();
890 // Make sure the re-entry logic was actually triggered.
891 ASSERT_EQ(hdr.msg_iov, nullptr);
892 ASSERT_EQ(memcmp(send_buf, recv_buf, sizeof(send_buf)), 0);
893 }
894 #endif // !OS_WIN
895
896 } // namespace
897 } // namespace base
898 } // namespace perfetto
899