• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import <XCTest/XCTest.h>
20
21#include "src/core/lib/iomgr/port.h"
22
23#ifdef GRPC_CFSTREAM
24
25#include <limits.h>
26
27#include <netinet/in.h>
28
29#include <grpc/grpc.h>
30#include <grpc/impl/codegen/sync.h>
31#include <grpc/support/sync.h>
32
33#include "src/core/lib/address_utils/parse_address.h"
34#include "src/core/lib/address_utils/sockaddr_utils.h"
35#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
36#include "src/core/lib/iomgr/endpoint.h"
37#include "src/core/lib/iomgr/resolve_address.h"
38#include "src/core/lib/iomgr/tcp_client.h"
39#include "src/core/lib/resource_quota/api.h"
40#include "test/core/test_util/test_config.h"
41
42#include <chrono>
43#include <future>
44
45static const int kConnectTimeout = 5;
46static const int kWriteTimeout = 5;
47static const int kReadTimeout = 5;
48
49static const int kBufferSize = 10000;
50
51static const int kRunLoopTimeout = 1;
52
53static void set_error_handle_promise(void *arg, grpc_error_handle error) {
54  std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg);
55  p->set_value(error);
56}
57
58static void init_event_closure(grpc_closure *closure,
59                               std::promise<grpc_error_handle> *error_handle) {
60  GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle),
61                    grpc_schedule_on_exec_ctx);
62}
63
64static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
65                                             size_t buffer_len) {
66  if (slices->length != buffer_len) {
67    return false;
68  }
69
70  for (int i = 0; i < slices->count; i++) {
71    grpc_slice slice = slices->slices[i];
72    if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
73      return false;
74    }
75    buffer += GRPC_SLICE_LENGTH(slice);
76  }
77
78  return true;
79}
80
81@interface CFStreamEndpointTests : XCTestCase
82
83@end
84
85@implementation CFStreamEndpointTests {
86  grpc_endpoint *ep_;
87  int svr_fd_;
88}
89
90- (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout {
91  grpc_core::ExecCtx::Get()->Flush();
92  return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout;
93}
94
95+ (void)setUp {
96  grpc_init();
97}
98
99+ (void)tearDown {
100  grpc_shutdown();
101}
102
103- (void)setUp {
104  self.continueAfterFailure = NO;
105
106  // Set up CFStream connection before testing the endpoint
107
108  grpc_core::ExecCtx exec_ctx;
109
110  int svr_fd;
111  int r;
112  std::promise<grpc_error_handle> connected_promise;
113  grpc_closure done;
114
115  VLOG(2) << "test_succeeds";
116
117  auto resolved_addr = grpc_core::StringToSockaddr("127.0.0.1:0");
118  struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr->addr);
119
120  /* create a phony server */
121  svr_fd = socket(AF_INET, SOCK_STREAM, 0);
122  XCTAssertGreaterThanOrEqual(svr_fd, 0);
123  XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr->len), 0);
124  XCTAssertEqual(listen(svr_fd, 1), 0);
125
126  /* connect to it */
127  XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr->len), 0);
128  init_event_closure(&done, &connected_promise);
129  auto args =
130      grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
131          nullptr);
132  grpc_tcp_client_connect(&done, &ep_, nullptr,
133                          grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
134                          &*resolved_addr, grpc_core::Timestamp::InfFuture());
135
136  /* await the connection */
137  do {
138    resolved_addr->len = sizeof(addr);
139    r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
140               reinterpret_cast<socklen_t *>(&resolved_addr->len));
141  } while (r == -1 && errno == EINTR);
142  XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r),
143                              @(errno));
144  svr_fd_ = r;
145
146  /* wait for the connection callback to finish */
147  std::future<grpc_error_handle> connected_future = connected_promise.get_future();
148  XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES);
149  XCTAssertEqual(connected_future.get(), absl::OkStatus());
150}
151
152- (void)tearDown {
153  grpc_core::ExecCtx exec_ctx;
154  close(svr_fd_);
155  if (ep_ != nullptr) grpc_endpoint_destroy(ep_);
156}
157
158- (void)testReadWrite {
159  grpc_core::ExecCtx exec_ctx;
160
161  grpc_closure read_done;
162  grpc_slice_buffer read_slices;
163  grpc_slice_buffer read_one_slice;
164  std::promise<grpc_error_handle> write_promise;
165  grpc_closure write_done;
166  grpc_slice_buffer write_slices;
167
168  grpc_slice slice;
169  char write_buffer[kBufferSize];
170  char read_buffer[kBufferSize];
171  size_t recv_size = 0;
172
173  grpc_slice_buffer_init(&write_slices);
174  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
175  grpc_slice_buffer_add(&write_slices, slice);
176  init_event_closure(&write_done, &write_promise);
177  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
178
179  std::future<grpc_error_handle> write_future = write_promise.get_future();
180  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
181  XCTAssertEqual(write_future.get(), absl::OkStatus());
182
183  while (recv_size < kBufferSize) {
184    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
185    XCTAssertGreaterThanOrEqual(size, 0);
186    recv_size += size;
187  }
188
189  XCTAssertEqual(recv_size, kBufferSize);
190  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
191  ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
192  XCTAssertGreaterThanOrEqual(send_size, 0);
193
194  grpc_slice_buffer_init(&read_slices);
195  grpc_slice_buffer_init(&read_one_slice);
196  while (read_slices.length < kBufferSize) {
197    std::promise<grpc_error_handle> read_promise;
198    init_event_closure(&read_done, &read_promise);
199    grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false,
200                       /*min_progress_size=*/1);
201    std::future<grpc_error_handle> read_future = read_promise.get_future();
202    XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
203    XCTAssertEqual(read_future.get(), absl::OkStatus());
204    grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
205    XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
206  }
207  XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
208
209  grpc_slice_buffer_reset_and_unref(&read_slices);
210  grpc_slice_buffer_reset_and_unref(&write_slices);
211  grpc_slice_buffer_reset_and_unref(&read_one_slice);
212}
213
214- (void)testShutdownBeforeRead {
215  grpc_core::ExecCtx exec_ctx;
216
217  std::promise<grpc_error_handle> read_promise;
218  grpc_closure read_done;
219  grpc_slice_buffer read_slices;
220  std::promise<grpc_error_handle> write_promise;
221  grpc_closure write_done;
222  grpc_slice_buffer write_slices;
223
224  grpc_slice slice;
225  char write_buffer[kBufferSize];
226  char read_buffer[kBufferSize];
227  size_t recv_size = 0;
228
229  grpc_slice_buffer_init(&read_slices);
230  init_event_closure(&read_done, &read_promise);
231  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
232                     /*min_progress_size=*/1);
233
234  grpc_slice_buffer_init(&write_slices);
235  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
236  grpc_slice_buffer_add(&write_slices, slice);
237  init_event_closure(&write_done, &write_promise);
238  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
239
240  std::future<grpc_error_handle> write_future = write_promise.get_future();
241  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
242  XCTAssertEqual(write_future.get(), absl::OkStatus());
243
244  while (recv_size < kBufferSize) {
245    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
246    XCTAssertGreaterThanOrEqual(size, 0);
247    recv_size += size;
248  }
249
250  XCTAssertEqual(recv_size, kBufferSize);
251  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
252
253  std::future<grpc_error_handle> read_future = read_promise.get_future();
254  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
255
256  grpc_endpoint_destroy(ep_);
257  ep_ = nullptr;
258
259  grpc_core::ExecCtx::Get()->Flush();
260  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
261  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
262
263  grpc_slice_buffer_reset_and_unref(&read_slices);
264  grpc_slice_buffer_reset_and_unref(&write_slices);
265}
266
267- (void)testRemoteClosed {
268  grpc_core::ExecCtx exec_ctx;
269
270  std::promise<grpc_error_handle> read_promise;
271  grpc_closure read_done;
272  grpc_slice_buffer read_slices;
273  std::promise<grpc_error_handle> write_promise;
274  grpc_closure write_done;
275  grpc_slice_buffer write_slices;
276
277  grpc_slice slice;
278  char write_buffer[kBufferSize];
279  char read_buffer[kBufferSize];
280  size_t recv_size = 0;
281
282  init_event_closure(&read_done, &read_promise);
283  grpc_slice_buffer_init(&read_slices);
284  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
285                     /*min_progress_size=*/1);
286
287  grpc_slice_buffer_init(&write_slices);
288  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
289  grpc_slice_buffer_add(&write_slices, slice);
290
291  init_event_closure(&write_done, &write_promise);
292  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
293
294  std::future<grpc_error_handle> write_future = write_promise.get_future();
295  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
296  XCTAssertEqual(write_future.get(), absl::OkStatus());
297
298  while (recv_size < kBufferSize) {
299    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
300    XCTAssertGreaterThanOrEqual(size, 0);
301    recv_size += size;
302  }
303
304  XCTAssertEqual(recv_size, kBufferSize);
305  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
306
307  close(svr_fd_);
308
309  std::future<grpc_error_handle> read_future = read_promise.get_future();
310  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
311  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
312
313  grpc_slice_buffer_reset_and_unref(&read_slices);
314  grpc_slice_buffer_reset_and_unref(&write_slices);
315}
316
317- (void)testRemoteReset {
318  grpc_core::ExecCtx exec_ctx;
319
320  std::promise<grpc_error_handle> read_promise;
321  grpc_closure read_done;
322  grpc_slice_buffer read_slices;
323
324  init_event_closure(&read_done, &read_promise);
325  grpc_slice_buffer_init(&read_slices);
326  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
327                     /*min_progress_size=*/1);
328
329  struct linger so_linger;
330  so_linger.l_onoff = 1;
331  so_linger.l_linger = 0;
332  setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
333
334  close(svr_fd_);
335
336  std::future<grpc_error_handle> read_future = read_promise.get_future();
337  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
338  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
339
340  grpc_slice_buffer_reset_and_unref(&read_slices);
341}
342
343@end
344
345#else  // GRPC_CFSTREAM
346
347// Phony test suite
348@interface CFStreamEndpointTests : XCTestCase
349@end
350
351@implementation CFStreamEndpointTests
352- (void)setUp {
353  [super setUp];
354}
355
356- (void)tearDown {
357  [super tearDown];
358}
359
360@end
361
362#endif  // GRPC_CFSTREAM
363