• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import <XCTest/XCTest.h>
20
21#include "src/core/lib/iomgr/port.h"
22
23#ifdef GRPC_CFSTREAM
24
25#include <limits.h>
26
27#include <netinet/in.h>
28
29#include <grpc/grpc.h>
30#include <grpc/support/sync.h>
31
32#include "src/core/lib/address_utils/parse_address.h"
33#include "src/core/lib/address_utils/sockaddr_utils.h"
34#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
35#include "src/core/lib/iomgr/endpoint.h"
36#include "src/core/lib/iomgr/resolve_address.h"
37#include "src/core/lib/iomgr/tcp_client.h"
38#include "src/core/lib/resource_quota/api.h"
39#include "test/core/util/test_config.h"
40
41#include <chrono>
42#include <future>
43
44static const int kConnectTimeout = 5;
45static const int kWriteTimeout = 5;
46static const int kReadTimeout = 5;
47
48static const int kBufferSize = 10000;
49
50static const int kRunLoopTimeout = 1;
51
52static void set_error_handle_promise(void *arg, grpc_error_handle error) {
53  std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg);
54  p->set_value(error);
55}
56
57static void init_event_closure(grpc_closure *closure,
58                               std::promise<grpc_error_handle> *error_handle) {
59  GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle),
60                    grpc_schedule_on_exec_ctx);
61}
62
63static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
64                                             size_t buffer_len) {
65  if (slices->length != buffer_len) {
66    return false;
67  }
68
69  for (int i = 0; i < slices->count; i++) {
70    grpc_slice slice = slices->slices[i];
71    if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
72      return false;
73    }
74    buffer += GRPC_SLICE_LENGTH(slice);
75  }
76
77  return true;
78}
79
80@interface CFStreamEndpointTests : XCTestCase
81
82@end
83
84@implementation CFStreamEndpointTests {
85  grpc_endpoint *ep_;
86  int svr_fd_;
87}
88
89- (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout {
90  grpc_core::ExecCtx::Get()->Flush();
91  return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout;
92}
93
94+ (void)setUp {
95  grpc_init();
96}
97
98+ (void)tearDown {
99  grpc_shutdown();
100}
101
102- (void)setUp {
103  self.continueAfterFailure = NO;
104
105  // Set up CFStream connection before testing the endpoint
106
107  grpc_core::ExecCtx exec_ctx;
108
109  int svr_fd;
110  int r;
111  std::promise<grpc_error_handle> connected_promise;
112  grpc_closure done;
113
114  gpr_log(GPR_DEBUG, "test_succeeds");
115
116  auto resolved_addr = grpc_core::StringToSockaddr("127.0.0.1:0");
117  struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr->addr);
118
119  /* create a phony server */
120  svr_fd = socket(AF_INET, SOCK_STREAM, 0);
121  XCTAssertGreaterThanOrEqual(svr_fd, 0);
122  XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr->len), 0);
123  XCTAssertEqual(listen(svr_fd, 1), 0);
124
125  /* connect to it */
126  XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr->len), 0);
127  init_event_closure(&done, &connected_promise);
128  auto args =
129      grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
130          nullptr);
131  grpc_tcp_client_connect(&done, &ep_, nullptr,
132                          grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
133                          &*resolved_addr, grpc_core::Timestamp::InfFuture());
134
135  /* await the connection */
136  do {
137    resolved_addr->len = sizeof(addr);
138    r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
139               reinterpret_cast<socklen_t *>(&resolved_addr->len));
140  } while (r == -1 && errno == EINTR);
141  XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r),
142                              @(errno));
143  svr_fd_ = r;
144
145  /* wait for the connection callback to finish */
146  std::future<grpc_error_handle> connected_future = connected_promise.get_future();
147  XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES);
148  XCTAssertEqual(connected_future.get(), absl::OkStatus());
149}
150
151- (void)tearDown {
152  grpc_core::ExecCtx exec_ctx;
153  close(svr_fd_);
154  grpc_endpoint_destroy(ep_);
155}
156
157- (void)testReadWrite {
158  grpc_core::ExecCtx exec_ctx;
159
160  grpc_closure read_done;
161  grpc_slice_buffer read_slices;
162  grpc_slice_buffer read_one_slice;
163  std::promise<grpc_error_handle> write_promise;
164  grpc_closure write_done;
165  grpc_slice_buffer write_slices;
166
167  grpc_slice slice;
168  char write_buffer[kBufferSize];
169  char read_buffer[kBufferSize];
170  size_t recv_size = 0;
171
172  grpc_slice_buffer_init(&write_slices);
173  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
174  grpc_slice_buffer_add(&write_slices, slice);
175  init_event_closure(&write_done, &write_promise);
176  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
177
178  std::future<grpc_error_handle> write_future = write_promise.get_future();
179  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
180  XCTAssertEqual(write_future.get(), absl::OkStatus());
181
182  while (recv_size < kBufferSize) {
183    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
184    XCTAssertGreaterThanOrEqual(size, 0);
185    recv_size += size;
186  }
187
188  XCTAssertEqual(recv_size, kBufferSize);
189  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
190  ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
191  XCTAssertGreaterThanOrEqual(send_size, 0);
192
193  grpc_slice_buffer_init(&read_slices);
194  grpc_slice_buffer_init(&read_one_slice);
195  while (read_slices.length < kBufferSize) {
196    std::promise<grpc_error_handle> read_promise;
197    init_event_closure(&read_done, &read_promise);
198    grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false,
199                       /*min_progress_size=*/1);
200    std::future<grpc_error_handle> read_future = read_promise.get_future();
201    XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
202    XCTAssertEqual(read_future.get(), absl::OkStatus());
203    grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
204    XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
205  }
206  XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
207
208  grpc_endpoint_shutdown(ep_, absl::OkStatus());
209  grpc_slice_buffer_reset_and_unref(&read_slices);
210  grpc_slice_buffer_reset_and_unref(&write_slices);
211  grpc_slice_buffer_reset_and_unref(&read_one_slice);
212}
213
214- (void)testShutdownBeforeRead {
215  grpc_core::ExecCtx exec_ctx;
216
217  std::promise<grpc_error_handle> read_promise;
218  grpc_closure read_done;
219  grpc_slice_buffer read_slices;
220  std::promise<grpc_error_handle> write_promise;
221  grpc_closure write_done;
222  grpc_slice_buffer write_slices;
223
224  grpc_slice slice;
225  char write_buffer[kBufferSize];
226  char read_buffer[kBufferSize];
227  size_t recv_size = 0;
228
229  grpc_slice_buffer_init(&read_slices);
230  init_event_closure(&read_done, &read_promise);
231  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
232                     /*min_progress_size=*/1);
233
234  grpc_slice_buffer_init(&write_slices);
235  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
236  grpc_slice_buffer_add(&write_slices, slice);
237  init_event_closure(&write_done, &write_promise);
238  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
239
240  std::future<grpc_error_handle> write_future = write_promise.get_future();
241  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
242  XCTAssertEqual(write_future.get(), absl::OkStatus());
243
244  while (recv_size < kBufferSize) {
245    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
246    XCTAssertGreaterThanOrEqual(size, 0);
247    recv_size += size;
248  }
249
250  XCTAssertEqual(recv_size, kBufferSize);
251  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
252
253  std::future<grpc_error_handle> read_future = read_promise.get_future();
254  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
255
256  grpc_endpoint_shutdown(ep_, absl::OkStatus());
257
258  grpc_core::ExecCtx::Get()->Flush();
259  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
260  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
261
262  grpc_slice_buffer_reset_and_unref(&read_slices);
263  grpc_slice_buffer_reset_and_unref(&write_slices);
264}
265
266- (void)testRemoteClosed {
267  grpc_core::ExecCtx exec_ctx;
268
269  std::promise<grpc_error_handle> read_promise;
270  grpc_closure read_done;
271  grpc_slice_buffer read_slices;
272  std::promise<grpc_error_handle> write_promise;
273  grpc_closure write_done;
274  grpc_slice_buffer write_slices;
275
276  grpc_slice slice;
277  char write_buffer[kBufferSize];
278  char read_buffer[kBufferSize];
279  size_t recv_size = 0;
280
281  init_event_closure(&read_done, &read_promise);
282  grpc_slice_buffer_init(&read_slices);
283  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
284                     /*min_progress_size=*/1);
285
286  grpc_slice_buffer_init(&write_slices);
287  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
288  grpc_slice_buffer_add(&write_slices, slice);
289
290  init_event_closure(&write_done, &write_promise);
291  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
292
293  std::future<grpc_error_handle> write_future = write_promise.get_future();
294  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
295  XCTAssertEqual(write_future.get(), absl::OkStatus());
296
297  while (recv_size < kBufferSize) {
298    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
299    XCTAssertGreaterThanOrEqual(size, 0);
300    recv_size += size;
301  }
302
303  XCTAssertEqual(recv_size, kBufferSize);
304  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
305
306  close(svr_fd_);
307
308  std::future<grpc_error_handle> read_future = read_promise.get_future();
309  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
310  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
311
312  grpc_endpoint_shutdown(ep_, absl::OkStatus());
313  grpc_slice_buffer_reset_and_unref(&read_slices);
314  grpc_slice_buffer_reset_and_unref(&write_slices);
315}
316
317- (void)testRemoteReset {
318  grpc_core::ExecCtx exec_ctx;
319
320  std::promise<grpc_error_handle> read_promise;
321  grpc_closure read_done;
322  grpc_slice_buffer read_slices;
323
324  init_event_closure(&read_done, &read_promise);
325  grpc_slice_buffer_init(&read_slices);
326  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
327                     /*min_progress_size=*/1);
328
329  struct linger so_linger;
330  so_linger.l_onoff = 1;
331  so_linger.l_linger = 0;
332  setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
333
334  close(svr_fd_);
335
336  std::future<grpc_error_handle> read_future = read_promise.get_future();
337  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
338  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
339
340  grpc_endpoint_shutdown(ep_, absl::OkStatus());
341  grpc_slice_buffer_reset_and_unref(&read_slices);
342}
343
344@end
345
346#else  // GRPC_CFSTREAM
347
348// Phony test suite
349@interface CFStreamEndpointTests : XCTestCase
350@end
351
352@implementation CFStreamEndpointTests
353- (void)setUp {
354  [super setUp];
355}
356
357- (void)tearDown {
358  [super tearDown];
359}
360
361@end
362
363#endif  // GRPC_CFSTREAM
364