• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 // Copyright (C) 2021 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 #include <errno.h>
16 #include <gtest/gtest.h>
17 #include <netdb.h>
18 #include <netinet/in.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/socket.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24 
25 #include <condition_variable>
26 #include <cstdint>
27 #include <cstring>
28 #include <functional>
29 #include <memory>
30 #include <mutex>
31 #include <random>
32 #include <vector>
33 
34 #include "log.h"  // for LOG_INFO
35 #include "model/setup/async_manager.h"
36 #include "net/posix/posix_async_socket_connector.h"
37 #include "net/posix/posix_async_socket_server.h"
38 
39 namespace android {
40 namespace net {
41 
42 using clock = std::chrono::system_clock;
43 
44 class SigPipeSignalHandler {
45  public:
SigPipeSignalHandler()46   SigPipeSignalHandler() {
47     sSignal = -1;
48     struct sigaction act = {};
49     act.sa_handler = myHandler;
50     ::sigaction(SIGPIPE, &act, &mOldAction);
51   }
52 
~SigPipeSignalHandler()53   ~SigPipeSignalHandler() { ::sigaction(SIGPIPE, &mOldAction, nullptr); }
54 
signaled() const55   int signaled() const { return sSignal; }
56 
57  private:
58   struct sigaction mOldAction;
59 
60   static int sSignal;
61 
myHandler(int sig)62   static void myHandler(int sig) { sSignal = sig; }
63 };
64 
65 // static
66 int SigPipeSignalHandler::sSignal = 0;
67 
68 using SocketCon = std::shared_ptr<AsyncDataChannel>;
69 
70 class PosixSocketTest : public testing::Test {
71  public:
PosixSocketTest()72   PosixSocketTest() : pasc_(&async_manager_), pass_(0, &async_manager_) {}
73 
~PosixSocketTest()74   ~PosixSocketTest() { pass_.Close(); }
75 
connectPair(std::chrono::milliseconds timeout=500ms)76   std::tuple<SocketCon, SocketCon> connectPair(
77       std::chrono::milliseconds timeout = 500ms) {
78     std::mutex m;
79     std::condition_variable cv;
80 
81     std::shared_ptr<AsyncDataChannel> sock1;
82     std::shared_ptr<AsyncDataChannel> sock2;
83 
84     pass_.SetOnConnectCallback(
85         [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
86           std::unique_lock<std::mutex> guard(m);
87           sock1 = std::move(sock);
88           cv.notify_all();
89         });
90     EXPECT_TRUE(pass_.StartListening());
91 
92     sock2 = pasc_.ConnectToRemoteServer("localhost", pass_.port(), 1000ms);
93     EXPECT_TRUE(sock2.get() != nullptr);
94     EXPECT_TRUE(sock2->Connected());
95 
96     std::unique_lock<std::mutex> lk(m);
97     EXPECT_TRUE(
98         cv.wait_for(lk, timeout, [&] { return sock1.get() != nullptr; }));
99     EXPECT_TRUE(sock1);
100     EXPECT_TRUE(sock1->Connected());
101 
102     return {sock1, sock2};
103   }
104 
105  protected:
106   AsyncManager async_manager_;
107   PosixAsyncSocketConnector pasc_;
108   PosixAsyncSocketServer pass_;
109 };
110 
TEST_F(PosixSocketTest,canConnect)111 TEST_F(PosixSocketTest, canConnect) {
112   auto [sock1, sock2] = connectPair();
113   ASSERT_TRUE(sock1->Connected());
114   ASSERT_TRUE(sock2->Connected());
115 
116   sock1->Close();
117   sock2->Close();
118 
119   ASSERT_FALSE(sock1->Connected());
120   ASSERT_FALSE(sock2->Connected());
121 }
122 
TEST_F(PosixSocketTest,socketSendDoesNotGenerateSigPipe)123 TEST_F(PosixSocketTest, socketSendDoesNotGenerateSigPipe) {
124   // Check that writing to a broken pipe does not generate a SIGPIPE
125   // signal.
126   SigPipeSignalHandler handler;
127   ASSERT_EQ(-1, handler.signaled());
128   auto [sock1, sock2] = connectPair();
129 
130   // s1 and s2 are now connected. Close s1 immediately, then try to
131   // send data through s2.
132   sock1->Close();
133   ASSERT_FALSE(sock1->Connected());
134   // The EPIPE might not happen on the first send due to
135   // TCP packet buffering in the kernel. Perform multiple send()
136   // in a loop to work-around this.
137   errno = 0;
138   const int kMaxSendCount = 1000;
139   int n = 0;
140   while (n < kMaxSendCount) {
141     int ret = sock2->Send((uint8_t*)"xxxx", 4);
142     if (ret < 0) {
143 #ifdef __APPLE__
144       // On OS X, errno is sometimes EPROTOTYPE instead of EPIPE
145       // when this happens.
146       ASSERT_TRUE(errno == EPIPE || errno == EPROTOTYPE) << strerror(errno);
147 #else
148       ASSERT_EQ(EPIPE, errno) << strerror(errno);
149 #endif
150       break;
151     }
152     n++;
153   }
154 
155   // On MacOS you usually have n < 30
156   ASSERT_LT(n, kMaxSendCount);
157 
158   // No signals were raised.
159   ASSERT_EQ(-1, handler.signaled());
160 }
161 
TEST_F(PosixSocketTest,can_send_data_around_poll)162 TEST_F(PosixSocketTest, can_send_data_around_poll) {
163   auto [sock1, sock2] = connectPair();
164   std::string word = "Hello World";
165   std::string input = "           ";
166 
167   ASSERT_EQ(word.size(), input.size());
168   ASSERT_NE(word, input);
169 
170   ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
171   ASSERT_EQ((ssize_t)word.size(), snd);
172 
173   uint8_t* buffer = (uint8_t*)input.data();
174   int buflen = input.size();
175 
176   // Poll for at most 250ms.
177   clock::time_point until = clock::now() + 250ms;
178   do {
179     int recv = sock2->Recv(buffer, buflen);
180     if (recv > 0) {
181       buflen -= recv;
182       buffer += recv;
183     }
184   } while (buflen > 0 && clock::now() < until);
185 
186   ASSERT_EQ(word, input);
187 }
188 
TEST_F(PosixSocketTest,data_results_in_read_event)189 TEST_F(PosixSocketTest, data_results_in_read_event) {
190   auto [sock1, sock2] = connectPair();
191   std::mutex m;
192   std::condition_variable cv;
193   std::string word = "Hello World";
194   std::string input = "           ";
195 
196   bool received = false;
197 
198   // Register a callback that only gets called once..
199   sock2->WatchForNonBlockingRead([&](auto sock) {
200     std::unique_lock<std::mutex> guard(m);
201     received = true;
202     // Unregister, to prevent surprises..
203     sock->StopWatching();
204     cv.notify_all();
205   });
206 
207   ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
208   ASSERT_EQ((ssize_t)word.size(), snd);
209 
210   {
211     std::unique_lock<std::mutex> lk(m);
212 
213     // The callback will be called within 250ms.
214     ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return received; }));
215 
216     uint8_t* buffer = (uint8_t*)input.data();
217     int buflen = input.size();
218 
219     // At least 1 byte is coming in. (Note, we might get just a few
220     // bytes. vs the whole thing as you never know what happens in the
221     // ip stack.)
222     ASSERT_GT(sock2->Recv(buffer, buflen), 0);
223   }
224 }
225 
TEST_F(PosixSocketTest,connectFails)226 TEST_F(PosixSocketTest, connectFails) {
227   int port = pass_.port();
228 
229   // Close the port, we should not be able to connect
230   pass_.Close();
231   ASSERT_FALSE(pass_.Connected());
232 
233   // Max 250ms to go to nowhere...
234   auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
235   ASSERT_FALSE(socket->Connected());
236 }
237 
TEST_F(PosixSocketTest,canConnectMultiple)238 TEST_F(PosixSocketTest, canConnectMultiple) {
239   int port = pass_.port();
240   int CONNECTION_COUNT = 10;
241   std::mutex m;
242   std::condition_variable cv;
243   std::vector<std::shared_ptr<AsyncDataChannel>> connections;
244   bool connected = false;
245 
246   pass_.SetOnConnectCallback([&](std::shared_ptr<AsyncDataChannel> const& sock,
247                                  AsyncDataChannelServer*) {
248     std::unique_lock<std::mutex> guard(m);
249     connections.push_back(sock);
250     connected = true;
251     ASSERT_TRUE(pass_.StartListening());
252     cv.notify_all();
253   });
254   ASSERT_TRUE(pass_.StartListening());
255 
256   for (int i = 0; i < CONNECTION_COUNT; i++) {
257     connected = false;
258     auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
259     ASSERT_TRUE(socket->Connected());
260     std::unique_lock<std::mutex> lk(m);
261     ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
262     connected = false;
263   }
264 
265   ASSERT_EQ(CONNECTION_COUNT, (int)connections.size());
266 }
267 
TEST_F(PosixSocketTest,noConnectWhenNotCallingStart)268 TEST_F(PosixSocketTest, noConnectWhenNotCallingStart) {
269   int port = pass_.port();
270   std::mutex m;
271   std::condition_variable cv;
272   std::vector<std::shared_ptr<AsyncDataChannel>> connections;
273   bool connected = false;
274 
275   pass_.SetOnConnectCallback(
276       [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
277         std::unique_lock<std::mutex> guard(m);
278         connections.push_back(sock);
279         connected = true;
280         cv.notify_all();
281       });
282   ASSERT_TRUE(pass_.StartListening());
283 
284   {
285     connected = false;
286     auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
287     ASSERT_TRUE(socket->Connected());
288     std::unique_lock<std::mutex> lk(m);
289     ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
290   }
291 
292   // After the first connection there was no call to startListening, and hence
293   // no new sockets should be accepted.
294   {
295     connected = false;
296     auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
297 
298     // We should have a partial connection, so we don't know yet that it is not
299     // working..
300     ASSERT_TRUE(socket->Connected());
301     std::unique_lock<std::mutex> lk(m);
302 
303     // Should timeout, as we never invoke the callback that accepts the socket.
304     ASSERT_FALSE(cv.wait_for(lk, 250ms, [&] { return connected; }));
305   }
306 
307   ASSERT_EQ(1, (int)connections.size());
308 }
309 }  // namespace net
310 }  // namespace android
311