• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/ipc/unix_socket.h"
18 
19 #include <sys/mman.h>
20 
21 #include <list>
22 #include <thread>
23 
24 #include "gmock/gmock.h"
25 #include "gtest/gtest.h"
26 #include "perfetto/base/build_config.h"
27 #include "perfetto/base/logging.h"
28 #include "perfetto/base/temp_file.h"
29 #include "perfetto/base/utils.h"
30 #include "src/base/test/test_task_runner.h"
31 #include "src/ipc/test/test_socket.h"
32 
33 namespace perfetto {
34 namespace ipc {
35 namespace {
36 
37 using ::testing::_;
38 using ::testing::AtLeast;
39 using ::testing::Invoke;
40 using ::testing::InvokeWithoutArgs;
41 using ::testing::Mock;
42 
43 constexpr char kSocketName[] = TEST_SOCK_NAME("unix_socket_unittest");
44 
45 class MockEventListener : public UnixSocket::EventListener {
46  public:
47   MOCK_METHOD2(OnNewIncomingConnection, void(UnixSocket*, UnixSocket*));
48   MOCK_METHOD2(OnConnect, void(UnixSocket*, bool));
49   MOCK_METHOD1(OnDisconnect, void(UnixSocket*));
50   MOCK_METHOD1(OnDataAvailable, void(UnixSocket*));
51 
52   // GMock doesn't support mocking methods with non-copiable args.
OnNewIncomingConnection(UnixSocket * self,std::unique_ptr<UnixSocket> new_connection)53   void OnNewIncomingConnection(
54       UnixSocket* self,
55       std::unique_ptr<UnixSocket> new_connection) override {
56     incoming_connections_.emplace_back(std::move(new_connection));
57     OnNewIncomingConnection(self, incoming_connections_.back().get());
58   }
59 
GetIncomingConnection()60   std::unique_ptr<UnixSocket> GetIncomingConnection() {
61     if (incoming_connections_.empty())
62       return nullptr;
63     std::unique_ptr<UnixSocket> sock = std::move(incoming_connections_.front());
64     incoming_connections_.pop_front();
65     return sock;
66   }
67 
68  private:
69   std::list<std::unique_ptr<UnixSocket>> incoming_connections_;
70 };
71 
72 class UnixSocketTest : public ::testing::Test {
73  protected:
SetUp()74   void SetUp() override { DESTROY_TEST_SOCK(kSocketName); }
TearDown()75   void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
76 
77   base::TestTaskRunner task_runner_;
78   MockEventListener event_listener_;
79 };
80 
TEST_F(UnixSocketTest,ConnectionFailureIfUnreachable)81 TEST_F(UnixSocketTest, ConnectionFailureIfUnreachable) {
82   auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
83   ASSERT_FALSE(cli->is_connected());
84   auto checkpoint = task_runner_.CreateCheckpoint("failure");
85   EXPECT_CALL(event_listener_, OnConnect(cli.get(), false))
86       .WillOnce(InvokeWithoutArgs(checkpoint));
87   task_runner_.RunUntilCheckpoint("failure");
88 }
89 
90 // Both server and client should see an OnDisconnect() if the server drops
91 // incoming connections immediately as they are created.
TEST_F(UnixSocketTest,ConnectionImmediatelyDroppedByServer)92 TEST_F(UnixSocketTest, ConnectionImmediatelyDroppedByServer) {
93   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
94   ASSERT_TRUE(srv->is_listening());
95 
96   // The server will immediately shutdown the connection upon
97   // OnNewIncomingConnection().
98   auto srv_did_shutdown = task_runner_.CreateCheckpoint("srv_did_shutdown");
99   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
100       .WillOnce(
101           Invoke([this, srv_did_shutdown](UnixSocket*, UnixSocket* new_conn) {
102             EXPECT_CALL(event_listener_, OnDisconnect(new_conn));
103             new_conn->Shutdown(true);
104             srv_did_shutdown();
105           }));
106 
107   auto checkpoint = task_runner_.CreateCheckpoint("cli_connected");
108   auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
109   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
110       .WillOnce(InvokeWithoutArgs(checkpoint));
111   task_runner_.RunUntilCheckpoint("cli_connected");
112   task_runner_.RunUntilCheckpoint("srv_did_shutdown");
113 
114   // Trying to send something will trigger the disconnection notification.
115   auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
116   EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
117       .WillOnce(InvokeWithoutArgs(cli_disconnected));
118   EXPECT_FALSE(cli->Send("whatever"));
119   task_runner_.RunUntilCheckpoint("cli_disconnected");
120 }
121 
TEST_F(UnixSocketTest,ClientAndServerExchangeData)122 TEST_F(UnixSocketTest, ClientAndServerExchangeData) {
123   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
124   ASSERT_TRUE(srv->is_listening());
125 
126   auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
127   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
128   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
129   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
130   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
131       .WillOnce(Invoke([this, cli_connected, srv_disconnected](
132                            UnixSocket*, UnixSocket* srv_conn) {
133         EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
134             .WillOnce(InvokeWithoutArgs(srv_disconnected));
135         cli_connected();
136       }));
137   task_runner_.RunUntilCheckpoint("cli_connected");
138 
139   auto srv_conn = event_listener_.GetIncomingConnection();
140   ASSERT_TRUE(srv_conn);
141   ASSERT_TRUE(cli->is_connected());
142 
143   auto cli_did_recv = task_runner_.CreateCheckpoint("cli_did_recv");
144   EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
145       .WillOnce(Invoke([cli_did_recv](UnixSocket* s) {
146         ASSERT_EQ("srv>cli", s->ReceiveString());
147         cli_did_recv();
148       }));
149 
150   auto srv_did_recv = task_runner_.CreateCheckpoint("srv_did_recv");
151   EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn.get()))
152       .WillOnce(Invoke([srv_did_recv](UnixSocket* s) {
153         ASSERT_EQ("cli>srv", s->ReceiveString());
154         srv_did_recv();
155       }));
156   ASSERT_TRUE(cli->Send("cli>srv"));
157   ASSERT_TRUE(srv_conn->Send("srv>cli"));
158   task_runner_.RunUntilCheckpoint("cli_did_recv");
159   task_runner_.RunUntilCheckpoint("srv_did_recv");
160 
161   // Check that Send/Receive() fails gracefully once the socket is closed.
162   auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
163   EXPECT_CALL(event_listener_, OnDisconnect(cli.get()))
164       .WillOnce(InvokeWithoutArgs(cli_disconnected));
165   cli->Shutdown(true);
166   char msg[4];
167   ASSERT_EQ(0u, cli->Receive(&msg, sizeof(msg)));
168   ASSERT_EQ("", cli->ReceiveString());
169   ASSERT_EQ(0u, srv_conn->Receive(&msg, sizeof(msg)));
170   ASSERT_EQ("", srv_conn->ReceiveString());
171   ASSERT_FALSE(cli->Send("foo"));
172   ASSERT_FALSE(srv_conn->Send("bar"));
173   srv->Shutdown(true);
174   task_runner_.RunUntilCheckpoint("cli_disconnected");
175   task_runner_.RunUntilCheckpoint("srv_disconnected");
176 }
177 
TEST_F(UnixSocketTest,ListenWithPassedFileDescriptor)178 TEST_F(UnixSocketTest, ListenWithPassedFileDescriptor) {
179   auto fd = UnixSocket::CreateAndBind(kSocketName);
180   auto srv = UnixSocket::Listen(std::move(fd), &event_listener_, &task_runner_);
181   ASSERT_TRUE(srv->is_listening());
182 
183   auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
184   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
185   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
186   auto srv_disconnected = task_runner_.CreateCheckpoint("srv_disconnected");
187   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
188       .WillOnce(Invoke([this, cli_connected, srv_disconnected](
189                            UnixSocket*, UnixSocket* srv_conn) {
190         // Read the EOF state.
191         EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
192             .WillOnce(
193                 InvokeWithoutArgs([srv_conn] { srv_conn->ReceiveString(); }));
194         EXPECT_CALL(event_listener_, OnDisconnect(srv_conn))
195             .WillOnce(InvokeWithoutArgs(srv_disconnected));
196         cli_connected();
197       }));
198   task_runner_.RunUntilCheckpoint("cli_connected");
199   ASSERT_TRUE(cli->is_connected());
200   cli.reset();
201   task_runner_.RunUntilCheckpoint("srv_disconnected");
202 }
203 
204 // Mostly a stress tests. Connects kNumClients clients to the same server and
205 // tests that all can exchange data and can see the expected sequence of events.
TEST_F(UnixSocketTest,SeveralClients)206 TEST_F(UnixSocketTest, SeveralClients) {
207   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
208   ASSERT_TRUE(srv->is_listening());
209   constexpr size_t kNumClients = 32;
210   std::unique_ptr<UnixSocket> cli[kNumClients];
211 
212   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
213       .Times(kNumClients)
214       .WillRepeatedly(Invoke([this](UnixSocket*, UnixSocket* s) {
215         EXPECT_CALL(event_listener_, OnDataAvailable(s))
216             .WillOnce(Invoke([](UnixSocket* t) {
217               ASSERT_EQ("PING", t->ReceiveString());
218               ASSERT_TRUE(t->Send("PONG"));
219             }));
220       }));
221 
222   for (size_t i = 0; i < kNumClients; i++) {
223     cli[i] = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
224     EXPECT_CALL(event_listener_, OnConnect(cli[i].get(), true))
225         .WillOnce(Invoke([](UnixSocket* s, bool success) {
226           ASSERT_TRUE(success);
227           ASSERT_TRUE(s->Send("PING"));
228         }));
229 
230     auto checkpoint = task_runner_.CreateCheckpoint(std::to_string(i));
231     EXPECT_CALL(event_listener_, OnDataAvailable(cli[i].get()))
232         .WillOnce(Invoke([checkpoint](UnixSocket* s) {
233           ASSERT_EQ("PONG", s->ReceiveString());
234           checkpoint();
235         }));
236   }
237 
238   for (size_t i = 0; i < kNumClients; i++) {
239     task_runner_.RunUntilCheckpoint(std::to_string(i));
240     ASSERT_TRUE(Mock::VerifyAndClearExpectations(cli[i].get()));
241   }
242 }
243 
244 // Creates two processes. The server process creates a file and passes it over
245 // the socket to the client. Both processes mmap the file in shared mode and
246 // check that they see the same contents.
TEST_F(UnixSocketTest,SharedMemory)247 TEST_F(UnixSocketTest, SharedMemory) {
248   int pipes[2];
249   ASSERT_EQ(0, pipe(pipes));
250 
251   pid_t pid = fork();
252   ASSERT_GE(pid, 0);
253   constexpr size_t kTmpSize = 4096;
254 
255   if (pid == 0) {
256     // Child process.
257     base::TempFile scoped_tmp = base::TempFile::CreateUnlinked();
258     int tmp_fd = scoped_tmp.fd();
259     ASSERT_FALSE(ftruncate(tmp_fd, kTmpSize));
260     char* mem = reinterpret_cast<char*>(
261         mmap(nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, tmp_fd, 0));
262     ASSERT_NE(nullptr, mem);
263     memcpy(mem, "shm rocks", 10);
264 
265     auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
266     ASSERT_TRUE(srv->is_listening());
267     // Signal the other process that it can connect.
268     ASSERT_EQ(1, PERFETTO_EINTR(write(pipes[1], ".", 1)));
269     auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_server");
270     EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
271         .WillOnce(Invoke(
272             [this, tmp_fd, checkpoint, mem](UnixSocket*, UnixSocket* new_conn) {
273               ASSERT_EQ(geteuid(), static_cast<uint32_t>(new_conn->peer_uid()));
274               ASSERT_TRUE(new_conn->Send("txfd", 5, tmp_fd));
275               // Wait for the client to change this again.
276               EXPECT_CALL(event_listener_, OnDataAvailable(new_conn))
277                   .WillOnce(Invoke([checkpoint, mem](UnixSocket* s) {
278                     ASSERT_EQ("change notify", s->ReceiveString());
279                     ASSERT_STREQ("rock more", mem);
280                     checkpoint();
281                   }));
282             }));
283     task_runner_.RunUntilCheckpoint("change_seen_by_server");
284     ASSERT_TRUE(Mock::VerifyAndClearExpectations(&event_listener_));
285     _exit(0);
286   } else {
287     char sync_cmd = '\0';
288     ASSERT_EQ(1, PERFETTO_EINTR(read(pipes[0], &sync_cmd, 1)));
289     ASSERT_EQ('.', sync_cmd);
290     auto cli =
291         UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
292     EXPECT_CALL(event_listener_, OnConnect(cli.get(), true));
293     auto checkpoint = task_runner_.CreateCheckpoint("change_seen_by_client");
294     EXPECT_CALL(event_listener_, OnDataAvailable(cli.get()))
295         .WillOnce(Invoke([checkpoint](UnixSocket* s) {
296           char msg[32];
297           base::ScopedFile fd;
298           ASSERT_EQ(5u, s->Receive(msg, sizeof(msg), &fd));
299           ASSERT_STREQ("txfd", msg);
300           ASSERT_TRUE(fd);
301           char* mem = reinterpret_cast<char*>(mmap(
302               nullptr, kTmpSize, PROT_READ | PROT_WRITE, MAP_SHARED, *fd, 0));
303           ASSERT_NE(nullptr, mem);
304           mem[9] = '\0';  // Just to get a clean error in case of test failure.
305           ASSERT_STREQ("shm rocks", mem);
306 
307           // Now change the shared memory and ping the other process.
308           memcpy(mem, "rock more", 10);
309           ASSERT_TRUE(s->Send("change notify"));
310           checkpoint();
311         }));
312     task_runner_.RunUntilCheckpoint("change_seen_by_client");
313     int st = 0;
314     PERFETTO_EINTR(waitpid(pid, &st, 0));
315     ASSERT_FALSE(WIFSIGNALED(st)) << "Server died with signal " << WTERMSIG(st);
316     EXPECT_TRUE(WIFEXITED(st));
317     ASSERT_EQ(0, WEXITSTATUS(st));
318   }
319 }
320 
321 constexpr size_t kAtomicWrites_FrameSize = 1123;
AtomicWrites_SendAttempt(UnixSocket * s,base::TaskRunner * task_runner,int num_frame)322 bool AtomicWrites_SendAttempt(UnixSocket* s,
323                               base::TaskRunner* task_runner,
324                               int num_frame) {
325   char buf[kAtomicWrites_FrameSize];
326   memset(buf, static_cast<char>(num_frame), sizeof(buf));
327   if (s->Send(buf, sizeof(buf)))
328     return true;
329   task_runner->PostTask(
330       std::bind(&AtomicWrites_SendAttempt, s, task_runner, num_frame));
331   return false;
332 }
333 
334 // Creates a client-server pair. The client sends continuously data to the
335 // server. Upon each Send() attempt, the client sends a buffer which is memset()
336 // with a unique number (0 to kNumFrames). We are deliberately trying to fill
337 // the socket output buffer, so we expect some of these Send()s to fail.
338 // The client is extremely aggressive and, when a Send() fails, just keeps
339 // re-posting it with the same unique number. The server verifies that we
340 // receive one and exactly one of each buffers, without any gaps or truncation.
TEST_F(UnixSocketTest,SendIsAtomic)341 TEST_F(UnixSocketTest, SendIsAtomic) {
342   static constexpr int kNumFrames = 127;
343 
344   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
345   ASSERT_TRUE(srv->is_listening());
346 
347   auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
348 
349   auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
350   std::set<int> received_iterations;
351   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
352       .WillOnce(Invoke([this, &received_iterations, all_frames_done](
353                            UnixSocket*, UnixSocket* srv_conn) {
354         EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
355             .WillRepeatedly(
356                 Invoke([&received_iterations, all_frames_done](UnixSocket* s) {
357                   char buf[kAtomicWrites_FrameSize];
358                   size_t res = s->Receive(buf, sizeof(buf));
359                   if (res == 0)
360                     return;  // Spurious select(), could happen.
361                   ASSERT_EQ(kAtomicWrites_FrameSize, res);
362                   // Check that we didn't get two truncated frames.
363                   for (size_t i = 0; i < sizeof(buf); i++)
364                     ASSERT_EQ(buf[0], buf[i]);
365                   ASSERT_EQ(0u, received_iterations.count(buf[0]));
366                   received_iterations.insert(buf[0]);
367                   if (received_iterations.size() == kNumFrames)
368                     all_frames_done();
369                 }));
370       }));
371 
372   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
373   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
374       .WillOnce(InvokeWithoutArgs(cli_connected));
375   task_runner_.RunUntilCheckpoint("cli_connected");
376   ASSERT_TRUE(cli->is_connected());
377   ASSERT_EQ(geteuid(), static_cast<uint32_t>(cli->peer_uid()));
378 
379   bool did_requeue = false;
380   for (int i = 0; i < kNumFrames; i++)
381     did_requeue |= !AtomicWrites_SendAttempt(cli.get(), &task_runner_, i);
382 
383   // We expect that at least one of the kNumFrames didn't fit in the socket
384   // buffer and was re-posted, otherwise this entire test would be pointless.
385   ASSERT_TRUE(did_requeue);
386 
387   task_runner_.RunUntilCheckpoint("all_frames_done");
388 }
389 
390 // Checks that the peer_uid() is retained after the client disconnects. The IPC
391 // layer needs to rely on this to validate messages received immediately before
392 // a client disconnects.
TEST_F(UnixSocketTest,PeerUidRetainedAfterDisconnect)393 TEST_F(UnixSocketTest, PeerUidRetainedAfterDisconnect) {
394   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
395   ASSERT_TRUE(srv->is_listening());
396   UnixSocket* srv_client_conn = nullptr;
397   auto srv_connected = task_runner_.CreateCheckpoint("srv_connected");
398   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
399       .WillOnce(Invoke(
400           [&srv_client_conn, srv_connected](UnixSocket*, UnixSocket* srv_conn) {
401             srv_client_conn = srv_conn;
402             EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_conn->peer_uid()));
403             srv_connected();
404           }));
405   auto cli_connected = task_runner_.CreateCheckpoint("cli_connected");
406   auto cli = UnixSocket::Connect(kSocketName, &event_listener_, &task_runner_);
407   EXPECT_CALL(event_listener_, OnConnect(cli.get(), true))
408       .WillOnce(InvokeWithoutArgs(cli_connected));
409 
410   task_runner_.RunUntilCheckpoint("cli_connected");
411   task_runner_.RunUntilCheckpoint("srv_connected");
412   ASSERT_NE(nullptr, srv_client_conn);
413   ASSERT_TRUE(srv_client_conn->is_connected());
414 
415   auto cli_disconnected = task_runner_.CreateCheckpoint("cli_disconnected");
416   EXPECT_CALL(event_listener_, OnDisconnect(srv_client_conn))
417       .WillOnce(InvokeWithoutArgs(cli_disconnected));
418 
419   // TODO(primiano): when the a peer disconnects, the other end receives a
420   // spurious OnDataAvailable() that needs to be acked with a Receive() to read
421   // the EOF. See b/69536434.
422   EXPECT_CALL(event_listener_, OnDataAvailable(srv_client_conn))
423       .WillOnce(Invoke([](UnixSocket* sock) { sock->ReceiveString(); }));
424 
425   cli.reset();
426   task_runner_.RunUntilCheckpoint("cli_disconnected");
427   ASSERT_FALSE(srv_client_conn->is_connected());
428   EXPECT_EQ(geteuid(), static_cast<uint32_t>(srv_client_conn->peer_uid()));
429 }
430 
TEST_F(UnixSocketTest,BlockingSend)431 TEST_F(UnixSocketTest, BlockingSend) {
432   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
433   ASSERT_TRUE(srv->is_listening());
434 
435   auto all_frames_done = task_runner_.CreateCheckpoint("all_frames_done");
436   size_t total_bytes_received = 0;
437   constexpr size_t kTotalBytes = 1024 * 1024 * 4;
438   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
439       .WillOnce(Invoke([this, &total_bytes_received, all_frames_done](
440                            UnixSocket*, UnixSocket* srv_conn) {
441         EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
442             .WillRepeatedly(
443                 Invoke([&total_bytes_received, all_frames_done](UnixSocket* s) {
444                   char buf[1024];
445                   size_t res = s->Receive(buf, sizeof(buf));
446                   total_bytes_received += res;
447                   if (total_bytes_received == kTotalBytes)
448                     all_frames_done();
449                 }));
450       }));
451 
452   // Override default timeout as this test can take time on the emulator.
453   const int kTimeoutMs = 60000 * 3;
454 
455   // Perform the blocking send form another thread.
456   std::thread tx_thread([] {
457     base::TestTaskRunner tx_task_runner;
458     MockEventListener tx_events;
459     auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
460 
461     auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
462     EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
463         .WillOnce(InvokeWithoutArgs(cli_connected));
464     tx_task_runner.RunUntilCheckpoint("cli_connected");
465 
466     auto all_sent = tx_task_runner.CreateCheckpoint("all_sent");
467     char buf[1024 * 32] = {};
468     tx_task_runner.PostTask([&cli, &buf, all_sent] {
469       for (size_t i = 0; i < kTotalBytes / sizeof(buf); i++)
470         cli->Send(buf, sizeof(buf), -1 /*fd*/,
471                   UnixSocket::BlockingMode::kBlocking);
472       all_sent();
473     });
474     tx_task_runner.RunUntilCheckpoint("all_sent", kTimeoutMs);
475   });
476 
477   task_runner_.RunUntilCheckpoint("all_frames_done", kTimeoutMs);
478   tx_thread.join();
479 }
480 
481 // Regression test for b/76155349 . If the receiver end disconnects while the
482 // sender is in the middle of a large send(), the socket should gracefully give
483 // up (i.e. Shutdown()) but not crash.
TEST_F(UnixSocketTest,ReceiverDisconnectsDuringSend)484 TEST_F(UnixSocketTest, ReceiverDisconnectsDuringSend) {
485   auto srv = UnixSocket::Listen(kSocketName, &event_listener_, &task_runner_);
486   ASSERT_TRUE(srv->is_listening());
487   const int kTimeoutMs = 30000;
488 
489   auto receive_done = task_runner_.CreateCheckpoint("receive_done");
490   EXPECT_CALL(event_listener_, OnNewIncomingConnection(srv.get(), _))
491       .WillOnce(Invoke([this, receive_done](UnixSocket*, UnixSocket* srv_conn) {
492         EXPECT_CALL(event_listener_, OnDataAvailable(srv_conn))
493             .WillOnce(Invoke([receive_done](UnixSocket* s) {
494               char buf[1024];
495               size_t res = s->Receive(buf, sizeof(buf));
496               ASSERT_EQ(1024u, res);
497               s->Shutdown(false /*notify*/);
498               receive_done();
499             }));
500       }));
501 
502   // Perform the blocking send form another thread.
503   std::thread tx_thread([] {
504     base::TestTaskRunner tx_task_runner;
505     MockEventListener tx_events;
506     auto cli = UnixSocket::Connect(kSocketName, &tx_events, &tx_task_runner);
507 
508     auto cli_connected = tx_task_runner.CreateCheckpoint("cli_connected");
509     EXPECT_CALL(tx_events, OnConnect(cli.get(), true))
510         .WillOnce(InvokeWithoutArgs(cli_connected));
511     tx_task_runner.RunUntilCheckpoint("cli_connected");
512 
513     auto send_done = tx_task_runner.CreateCheckpoint("send_done");
514     // We need a
515     static constexpr size_t kBufSize = 32 * 1024 * 1024;
516     std::unique_ptr<char[]> buf(new char[kBufSize]());
517     tx_task_runner.PostTask([&cli, &buf, send_done] {
518       bool send_res = cli->Send(buf.get(), kBufSize, -1 /*fd*/,
519                                 UnixSocket::BlockingMode::kBlocking);
520       ASSERT_FALSE(send_res);
521       send_done();
522     });
523 
524     tx_task_runner.RunUntilCheckpoint("send_done", kTimeoutMs);
525   });
526   task_runner_.RunUntilCheckpoint("receive_done", kTimeoutMs);
527   tx_thread.join();
528 }
529 
530 // TODO(primiano): add a test to check that in the case of a peer sending a fd
531 // and the other end just doing a recv (without taking it), the fd is closed and
532 // not left around.
533 
534 // TODO(primiano); add a test to check that a socket can be reused after
535 // Shutdown(),
536 
537 // TODO(primiano): add a test to check that OnDisconnect() is called in all
538 // possible cases.
539 
540 // TODO(primiano): add tests that destroy the socket in all possible stages and
541 // verify that no spurious EventListener callback is received.
542 
543 }  // namespace
544 }  // namespace ipc
545 }  // namespace perfetto
546