1
2 // Copyright (C) 2021 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 #include <errno.h>
16 #include <gtest/gtest.h>
17 #include <netdb.h>
18 #include <netinet/in.h>
19 #include <signal.h>
20 #include <string.h>
21 #include <sys/socket.h>
22 #include <sys/types.h>
23 #include <unistd.h>
24
25 #include <condition_variable>
26 #include <cstdint>
27 #include <cstring>
28 #include <functional>
29 #include <memory>
30 #include <mutex>
31 #include <random>
32 #include <vector>
33
34 #include "log.h" // for LOG_INFO
35 #include "model/setup/async_manager.h"
36 #include "net/posix/posix_async_socket_connector.h"
37 #include "net/posix/posix_async_socket_server.h"
38
39 namespace android {
40 namespace net {
41
42 using clock = std::chrono::system_clock;
43
44 class SigPipeSignalHandler {
45 public:
SigPipeSignalHandler()46 SigPipeSignalHandler() {
47 sSignal = -1;
48 struct sigaction act = {};
49 act.sa_handler = myHandler;
50 ::sigaction(SIGPIPE, &act, &mOldAction);
51 }
52
~SigPipeSignalHandler()53 ~SigPipeSignalHandler() { ::sigaction(SIGPIPE, &mOldAction, nullptr); }
54
signaled() const55 int signaled() const { return sSignal; }
56
57 private:
58 struct sigaction mOldAction;
59
60 static int sSignal;
61
myHandler(int sig)62 static void myHandler(int sig) { sSignal = sig; }
63 };
64
65 // static
66 int SigPipeSignalHandler::sSignal = 0;
67
68 using SocketCon = std::shared_ptr<AsyncDataChannel>;
69
70 class PosixSocketTest : public testing::Test {
71 public:
PosixSocketTest()72 PosixSocketTest() : pasc_(&async_manager_), pass_(0, &async_manager_) {}
73
~PosixSocketTest()74 ~PosixSocketTest() { pass_.Close(); }
75
connectPair(std::chrono::milliseconds timeout=500ms)76 std::tuple<SocketCon, SocketCon> connectPair(
77 std::chrono::milliseconds timeout = 500ms) {
78 std::mutex m;
79 std::condition_variable cv;
80
81 std::shared_ptr<AsyncDataChannel> sock1;
82 std::shared_ptr<AsyncDataChannel> sock2;
83
84 pass_.SetOnConnectCallback(
85 [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
86 std::unique_lock<std::mutex> guard(m);
87 sock1 = std::move(sock);
88 cv.notify_all();
89 });
90 EXPECT_TRUE(pass_.StartListening());
91
92 sock2 = pasc_.ConnectToRemoteServer("localhost", pass_.port(), 1000ms);
93 EXPECT_TRUE(sock2.get() != nullptr);
94 EXPECT_TRUE(sock2->Connected());
95
96 std::unique_lock<std::mutex> lk(m);
97 EXPECT_TRUE(
98 cv.wait_for(lk, timeout, [&] { return sock1.get() != nullptr; }));
99 EXPECT_TRUE(sock1);
100 EXPECT_TRUE(sock1->Connected());
101
102 return {sock1, sock2};
103 }
104
105 protected:
106 AsyncManager async_manager_;
107 PosixAsyncSocketConnector pasc_;
108 PosixAsyncSocketServer pass_;
109 };
110
TEST_F(PosixSocketTest,canConnect)111 TEST_F(PosixSocketTest, canConnect) {
112 auto [sock1, sock2] = connectPair();
113 ASSERT_TRUE(sock1->Connected());
114 ASSERT_TRUE(sock2->Connected());
115
116 sock1->Close();
117 sock2->Close();
118
119 ASSERT_FALSE(sock1->Connected());
120 ASSERT_FALSE(sock2->Connected());
121 }
122
TEST_F(PosixSocketTest,socketSendDoesNotGenerateSigPipe)123 TEST_F(PosixSocketTest, socketSendDoesNotGenerateSigPipe) {
124 // Check that writing to a broken pipe does not generate a SIGPIPE
125 // signal.
126 SigPipeSignalHandler handler;
127 ASSERT_EQ(-1, handler.signaled());
128 auto [sock1, sock2] = connectPair();
129
130 // s1 and s2 are now connected. Close s1 immediately, then try to
131 // send data through s2.
132 sock1->Close();
133 ASSERT_FALSE(sock1->Connected());
134 // The EPIPE might not happen on the first send due to
135 // TCP packet buffering in the kernel. Perform multiple send()
136 // in a loop to work-around this.
137 errno = 0;
138 const int kMaxSendCount = 1000;
139 int n = 0;
140 while (n < kMaxSendCount) {
141 int ret = sock2->Send((uint8_t*)"xxxx", 4);
142 if (ret < 0) {
143 #ifdef __APPLE__
144 // On OS X, errno is sometimes EPROTOTYPE instead of EPIPE
145 // when this happens.
146 ASSERT_TRUE(errno == EPIPE || errno == EPROTOTYPE) << strerror(errno);
147 #else
148 ASSERT_EQ(EPIPE, errno) << strerror(errno);
149 #endif
150 break;
151 }
152 n++;
153 }
154
155 // On MacOS you usually have n < 30
156 ASSERT_LT(n, kMaxSendCount);
157
158 // No signals were raised.
159 ASSERT_EQ(-1, handler.signaled());
160 }
161
TEST_F(PosixSocketTest,can_send_data_around_poll)162 TEST_F(PosixSocketTest, can_send_data_around_poll) {
163 auto [sock1, sock2] = connectPair();
164 std::string word = "Hello World";
165 std::string input = " ";
166
167 ASSERT_EQ(word.size(), input.size());
168 ASSERT_NE(word, input);
169
170 ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
171 ASSERT_EQ((ssize_t)word.size(), snd);
172
173 uint8_t* buffer = (uint8_t*)input.data();
174 int buflen = input.size();
175
176 // Poll for at most 250ms.
177 clock::time_point until = clock::now() + 250ms;
178 do {
179 int recv = sock2->Recv(buffer, buflen);
180 if (recv > 0) {
181 buflen -= recv;
182 buffer += recv;
183 }
184 } while (buflen > 0 && clock::now() < until);
185
186 ASSERT_EQ(word, input);
187 }
188
TEST_F(PosixSocketTest,data_results_in_read_event)189 TEST_F(PosixSocketTest, data_results_in_read_event) {
190 auto [sock1, sock2] = connectPair();
191 std::mutex m;
192 std::condition_variable cv;
193 std::string word = "Hello World";
194 std::string input = " ";
195
196 bool received = false;
197
198 // Register a callback that only gets called once..
199 sock2->WatchForNonBlockingRead([&](auto sock) {
200 std::unique_lock<std::mutex> guard(m);
201 received = true;
202 // Unregister, to prevent surprises..
203 sock->StopWatching();
204 cv.notify_all();
205 });
206
207 ssize_t snd = sock1->Send((uint8_t*)word.data(), word.size());
208 ASSERT_EQ((ssize_t)word.size(), snd);
209
210 {
211 std::unique_lock<std::mutex> lk(m);
212
213 // The callback will be called within 250ms.
214 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return received; }));
215
216 uint8_t* buffer = (uint8_t*)input.data();
217 int buflen = input.size();
218
219 // At least 1 byte is coming in. (Note, we might get just a few
220 // bytes. vs the whole thing as you never know what happens in the
221 // ip stack.)
222 ASSERT_GT(sock2->Recv(buffer, buflen), 0);
223 }
224 }
225
TEST_F(PosixSocketTest,connectFails)226 TEST_F(PosixSocketTest, connectFails) {
227 int port = pass_.port();
228
229 // Close the port, we should not be able to connect
230 pass_.Close();
231 ASSERT_FALSE(pass_.Connected());
232
233 // Max 250ms to go to nowhere...
234 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
235 ASSERT_FALSE(socket->Connected());
236 }
237
TEST_F(PosixSocketTest,canConnectMultiple)238 TEST_F(PosixSocketTest, canConnectMultiple) {
239 int port = pass_.port();
240 int CONNECTION_COUNT = 10;
241 std::mutex m;
242 std::condition_variable cv;
243 std::vector<std::shared_ptr<AsyncDataChannel>> connections;
244 bool connected = false;
245
246 pass_.SetOnConnectCallback([&](std::shared_ptr<AsyncDataChannel> const& sock,
247 AsyncDataChannelServer*) {
248 std::unique_lock<std::mutex> guard(m);
249 connections.push_back(sock);
250 connected = true;
251 ASSERT_TRUE(pass_.StartListening());
252 cv.notify_all();
253 });
254 ASSERT_TRUE(pass_.StartListening());
255
256 for (int i = 0; i < CONNECTION_COUNT; i++) {
257 connected = false;
258 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
259 ASSERT_TRUE(socket->Connected());
260 std::unique_lock<std::mutex> lk(m);
261 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
262 connected = false;
263 }
264
265 ASSERT_EQ(CONNECTION_COUNT, (int)connections.size());
266 }
267
TEST_F(PosixSocketTest,noConnectWhenNotCallingStart)268 TEST_F(PosixSocketTest, noConnectWhenNotCallingStart) {
269 int port = pass_.port();
270 std::mutex m;
271 std::condition_variable cv;
272 std::vector<std::shared_ptr<AsyncDataChannel>> connections;
273 bool connected = false;
274
275 pass_.SetOnConnectCallback(
276 [&](std::shared_ptr<AsyncDataChannel> sock, AsyncDataChannelServer*) {
277 std::unique_lock<std::mutex> guard(m);
278 connections.push_back(sock);
279 connected = true;
280 cv.notify_all();
281 });
282 ASSERT_TRUE(pass_.StartListening());
283
284 {
285 connected = false;
286 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
287 ASSERT_TRUE(socket->Connected());
288 std::unique_lock<std::mutex> lk(m);
289 ASSERT_TRUE(cv.wait_for(lk, 250ms, [&] { return connected; }));
290 }
291
292 // After the first connection there was no call to startListening, and hence
293 // no new sockets should be accepted.
294 {
295 connected = false;
296 auto socket = pasc_.ConnectToRemoteServer("localhost", port, 250ms);
297
298 // We should have a partial connection, so we don't know yet that it is not
299 // working..
300 ASSERT_TRUE(socket->Connected());
301 std::unique_lock<std::mutex> lk(m);
302
303 // Should timeout, as we never invoke the callback that accepts the socket.
304 ASSERT_FALSE(cv.wait_for(lk, 250ms, [&] { return connected; }));
305 }
306
307 ASSERT_EQ(1, (int)connections.size());
308 }
309 } // namespace net
310 } // namespace android
311