1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <errno.h>
16 #include <gtest/gtest.h>
17 #include <netdb.h>
18 #include <netinet/in.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/socket.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24
25 #include <condition_variable>
26 #include <cstdint>
27 #include <cstring>
28 #include <functional>
29 #include <memory>
30 #include <mutex>
31 #include <random>
32 #include <vector>
33
34 #include "model/setup/async_manager.h"
35 #include "net/posix/posix_async_socket_connector.h"
36 #include "net/posix/posix_async_socket_server.h"
37
38 namespace android {
39 namespace net {
40
41 using clock = std::chrono::system_clock;
42
43 class SigPipeSignalHandler {
44 public:
SigPipeSignalHandler()45 SigPipeSignalHandler() {
46 sSignal = -1;
47 struct sigaction act = {};
48 act.sa_handler = myHandler;
49 ::sigaction(SIGPIPE, &act, &mOldAction);
50 }
51
~SigPipeSignalHandler()52 ~SigPipeSignalHandler() { ::sigaction(SIGPIPE, &mOldAction, nullptr); }
53
signaled() const54 int signaled() const { return sSignal; }
55
56 private:
57 struct sigaction mOldAction;
58
59 static int sSignal;
60
myHandler(int sig)61 static void myHandler(int sig) { sSignal = sig; }
62 };
63
64 // static
65 int SigPipeSignalHandler::sSignal = 0;
66
67 using SocketCon = std::shared_ptr<AsyncDataChannel>;
68
69 class PosixSocketTest : public testing::Test {
70 public:
PosixSocketTest()71 PosixSocketTest() : pasc_(&async_manager_), pass_(0, &async_manager_) {}
72
~PosixSocketTest()73 ~PosixSocketTest() { pass_.Close(); }
74
connectPair(std::chrono::milliseconds timeout=500ms)75 std::tuple<SocketCon, SocketCon> connectPair(
76 std::chrono::milliseconds timeout = 500ms) {
77 std::mutex m;
78 std::condition_variable cv;
79
80 std::shared_ptr<AsyncDataChannel> sock1;
81 std::shared_ptr<AsyncDataChannel> sock2;
82
83 pass_.SetOnConnectCallback(
84 [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
85 std::unique_lock<std::mutex> guard(m);
86 sock1 = std::move(sock);
87 cv.notify_all();
88 });
89 EXPECT_TRUE(pass_.StartListening());
90
91 sock2 = pasc_.ConnectToRemoteServer("localhost", pass_.port(), 1000ms);
92 EXPECT_TRUE(sock2.get() != nullptr);
93 EXPECT_TRUE(sock2->Connected());
94
95 std::unique_lock<std::mutex> lk(m);
96 EXPECT_TRUE(
97 cv.wait_for(lk, timeout, [&] { return sock1.get() != nullptr; }));
98 EXPECT_TRUE(sock1);
99 EXPECT_TRUE(sock1->Connected());
100
101 return {sock1, sock2};
102 }
103
104 protected:
105 AsyncManager async_manager_;
106 PosixAsyncSocketConnector pasc_;
107 PosixAsyncSocketServer pass_;
108 };
109
TEST_F(PosixSocketTest,canConnect)110 TEST_F(PosixSocketTest, canConnect) {
111 auto [sock1, sock2] = connectPair();
112 ASSERT_TRUE(sock1->Connected());
113 ASSERT_TRUE(sock2->Connected());
114
115 sock1->Close();
116 sock2->Close();
117
118 ASSERT_FALSE(sock1->Connected());
119 ASSERT_FALSE(sock2->Connected());
120 }
121
TEST_F(PosixSocketTest,socketSendDoesNotGenerateSigPipe)122 TEST_F(PosixSocketTest, socketSendDoesNotGenerateSigPipe) {
123 // Check that writing to a broken pipe does not generate a SIGPIPE
124 // signal.
125 SigPipeSignalHandler handler;
126 ASSERT_EQ(-1, handler.signaled());
127 auto [sock1, sock2] = connectPair();
128
129 // s1 and s2 are now connected. Close s1 immediately, then try to
130 // send data through s2.
131 sock1->Close();
132 ASSERT_FALSE(sock1->Connected());
133 // The EPIPE might not happen on the first send due to
134 // TCP packet buffering in the kernel. Perform multiple send()
135 // in a loop to work-around this.
136 errno = 0;
137 const int kMaxSendCount = 1000;
138 int n = 0;
139 while (n < kMaxSendCount) {
140 int ret = sock2->Send((uint8_t*)"xxxx", 4);
141 if (ret < 0) {
142 #ifdef __APPLE__
143 // On OS X, errno is sometimes EPROTOTYPE instead of EPIPE
144 // when this happens.
145 ASSERT_TRUE(errno == EPIPE || errno == EPROTOTYPE) << strerror(errno);
146 #else
147 ASSERT_EQ(EPIPE, errno) << strerror(errno);
148 #endif
149 break;
150 }
151 n++;
152 }
153
154 // On MacOS you usually have n < 30
155 ASSERT_LT(n, kMaxSendCount);
156
157 // No signals were raised.
158 ASSERT_EQ(-1, handler.signaled());
159 }
160
TEST_F(PosixSocketTest,can_send_data_around_poll)161 TEST_F(PosixSocketTest, can_send_data_around_poll) {
162 auto [sock1, sock2] = connectPair();
163 std::string word = "Hello World";
164 std::string input = " ";
165
166 ASSERT_EQ(word.size(), input.size());
167 ASSERT_NE(word, input);
168
169 ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
170 ASSERT_EQ((ssize_t)word.size(), snd);
171
172 uint8_t* buffer = (uint8_t*)input.data();
173 int buflen = input.size();
174
175 // Poll for at most 250ms.
176 clock::time_point until = clock::now() + 250ms;
177 do {
178 int recv = sock2->Recv(buffer, buflen);
179 if (recv > 0) {
180 buflen -= recv;
181 buffer += recv;
182 }
183 } while (buflen > 0 && clock::now() < until);
184
185 ASSERT_EQ(word, input);
186 }
187
TEST_F(PosixSocketTest,data_results_in_read_event)188 TEST_F(PosixSocketTest, data_results_in_read_event) {
189 auto [sock1, sock2] = connectPair();
190 std::mutex m;
191 std::condition_variable cv;
192 std::string word = "Hello World";
193 std::string input = " ";
194
195 bool received = false;
196
197 // Register a callback that only gets called once..
198 sock2->WatchForNonBlockingRead([&](auto sock) {
199 std::unique_lock<std::mutex> guard(m);
200 received = true;
201 // Unregister, to prevent surprises..
202 sock->StopWatching();
203 cv.notify_all();
204 });
205
206 ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
207 ASSERT_EQ((ssize_t)word.size(), snd);
208
209 {
210 std::unique_lock<std::mutex> lk(m);
211
212 // The callback will be called within 250ms.
213 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return received; }));
214
215 uint8_t* buffer = (uint8_t*)input.data();
216 int buflen = input.size();
217
218 // At least 1 byte is coming in. (Note, we might get just a few
219 // bytes. vs the whole thing as you never know what happens in the
220 // ip stack.)
221 ASSERT_GT(sock2->Recv(buffer, buflen), 0);
222 }
223 }
224
TEST_F(PosixSocketTest,connectFails)225 TEST_F(PosixSocketTest, connectFails) {
226 int port = pass_.port();
227
228 // Close the port, we should not be able to connect
229 pass_.Close();
230 ASSERT_FALSE(pass_.Connected());
231
232 // Max 250ms to go to nowhere...
233 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
234 ASSERT_FALSE(socket->Connected());
235 }
236
TEST_F(PosixSocketTest,canConnectMultiple)237 TEST_F(PosixSocketTest, canConnectMultiple) {
238 int port = pass_.port();
239 int CONNECTION_COUNT = 10;
240 std::mutex m;
241 std::condition_variable cv;
242 std::vector<std::shared_ptr<AsyncDataChannel>> connections;
243 bool connected = false;
244
245 pass_.SetOnConnectCallback([&](std::shared_ptr<AsyncDataChannel> const& sock,
246 AsyncDataChannelServer*) {
247 std::unique_lock<std::mutex> guard(m);
248 connections.push_back(sock);
249 connected = true;
250 ASSERT_TRUE(pass_.StartListening());
251 cv.notify_all();
252 });
253 ASSERT_TRUE(pass_.StartListening());
254
255 for (int i = 0; i < CONNECTION_COUNT; i++) {
256 connected = false;
257 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
258 ASSERT_TRUE(socket->Connected());
259 std::unique_lock<std::mutex> lk(m);
260 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
261 connected = false;
262 }
263
264 ASSERT_EQ(CONNECTION_COUNT, (int)connections.size());
265 }
266
TEST_F(PosixSocketTest,noConnectWhenNotCallingStart)267 TEST_F(PosixSocketTest, noConnectWhenNotCallingStart) {
268 int port = pass_.port();
269 std::mutex m;
270 std::condition_variable cv;
271 std::vector<std::shared_ptr<AsyncDataChannel>> connections;
272 bool connected = false;
273
274 pass_.SetOnConnectCallback(
275 [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
276 std::unique_lock<std::mutex> guard(m);
277 connections.push_back(sock);
278 connected = true;
279 cv.notify_all();
280 });
281 ASSERT_TRUE(pass_.StartListening());
282
283 {
284 connected = false;
285 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
286 ASSERT_TRUE(socket->Connected());
287 std::unique_lock<std::mutex> lk(m);
288 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
289 }
290
291 // After the first connection there was no call to startListening, and hence
292 // no new sockets should be accepted.
293 {
294 connected = false;
295 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
296
297 // We should have a partial connection, so we don't know yet that it is not
298 // working..
299 ASSERT_TRUE(socket->Connected());
300 std::unique_lock<std::mutex> lk(m);
301
302 // Should timeout, as we never invoke the callback that accepts the socket.
303 ASSERT_FALSE(cv.wait_for(lk, 250ms, [&] { return connected; }));
304 }
305
306 ASSERT_EQ(1, (int)connections.size());
307 }
308 } // namespace net
309 } // namespace android
310