• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import <XCTest/XCTest.h>
20
21#include "src/core/lib/iomgr/port.h"
22
23#ifdef GRPC_CFSTREAM
24
25#include <netinet/in.h>
26
27#include <grpc/impl/codegen/sync.h>
28#include <grpc/support/sync.h>
29
30#include "src/core/lib/iomgr/endpoint.h"
31#include "src/core/lib/iomgr/resolve_address.h"
32#include "src/core/lib/iomgr/tcp_client.h"
33#include "test/core/util/test_config.h"
34
35static const int kConnectTimeout = 5;
36static const int kWriteTimeout = 5;
37static const int kReadTimeout = 5;
38
39static const int kBufferSize = 10000;
40
41static const int kRunLoopTimeout = 1;
42
43static void set_atm(void *arg, grpc_error *error) {
44  gpr_atm *p = static_cast<gpr_atm *>(arg);
45  gpr_atm_full_cas(p, -1, reinterpret_cast<gpr_atm>(error));
46}
47
48static void init_event_closure(grpc_closure *closure, gpr_atm *atm) {
49  *atm = -1;
50  GRPC_CLOSURE_INIT(closure, set_atm, static_cast<void *>(atm), grpc_schedule_on_exec_ctx);
51}
52
53static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
54                                             size_t buffer_len) {
55  if (slices->length != buffer_len) {
56    return false;
57  }
58
59  for (int i = 0; i < slices->count; i++) {
60    grpc_slice slice = slices->slices[i];
61    if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
62      return false;
63    }
64    buffer += GRPC_SLICE_LENGTH(slice);
65  }
66
67  return true;
68}
69
70@interface CFStreamEndpointTests : XCTestCase
71
72@end
73
74@implementation CFStreamEndpointTests {
75  grpc_endpoint *ep_;
76  int svr_fd_;
77}
78
79- (BOOL)waitForEvent:(gpr_atm *)event timeout:(int)timeout {
80  grpc_core::ExecCtx::Get()->Flush();
81
82  NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kConnectTimeout];
83  while (gpr_atm_acq_load(event) == -1 && [deadline timeIntervalSinceNow] > 0) {
84    NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kRunLoopTimeout];
85    [[NSRunLoop mainRunLoop] runMode:NSDefaultRunLoopMode beforeDate:deadline];
86  }
87
88  return (gpr_atm_acq_load(event) != -1);
89}
90
91+ (void)setUp {
92  grpc_init();
93}
94
95+ (void)tearDown {
96  grpc_shutdown();
97}
98
99- (void)setUp {
100  self.continueAfterFailure = NO;
101
102  // Set up CFStream connection before testing the endpoint
103
104  grpc_core::ExecCtx exec_ctx;
105
106  grpc_resolved_address resolved_addr;
107  struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr);
108  int svr_fd;
109  int r;
110  gpr_atm connected = -1;
111  grpc_closure done;
112
113  gpr_log(GPR_DEBUG, "test_succeeds");
114
115  memset(&resolved_addr, 0, sizeof(resolved_addr));
116  resolved_addr.len = sizeof(struct sockaddr_in);
117  addr->sin_family = AF_INET;
118
119  /* create a dummy server */
120  svr_fd = socket(AF_INET, SOCK_STREAM, 0);
121  XCTAssertGreaterThanOrEqual(svr_fd, 0);
122  XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0);
123  XCTAssertEqual(listen(svr_fd, 1), 0);
124
125  /* connect to it */
126  XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
127  init_event_closure(&done, &connected);
128  grpc_tcp_client_connect(&done, &ep_, nullptr, nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE);
129
130  /* await the connection */
131  do {
132    resolved_addr.len = sizeof(addr);
133    r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
134               reinterpret_cast<socklen_t *>(&resolved_addr.len));
135  } while (r == -1 && errno == EINTR);
136  XCTAssertGreaterThanOrEqual(r, 0);
137  svr_fd_ = r;
138
139  /* wait for the connection callback to finish */
140  XCTAssertEqual([self waitForEvent:&connected timeout:kConnectTimeout], YES);
141  XCTAssertEqual(reinterpret_cast<grpc_error *>(connected), GRPC_ERROR_NONE);
142}
143
144- (void)tearDown {
145  grpc_core::ExecCtx exec_ctx;
146  close(svr_fd_);
147  grpc_endpoint_destroy(ep_);
148}
149
150- (void)testReadWrite {
151  grpc_core::ExecCtx exec_ctx;
152
153  gpr_atm read;
154  grpc_closure read_done;
155  grpc_slice_buffer read_slices;
156  grpc_slice_buffer read_one_slice;
157  gpr_atm write;
158  grpc_closure write_done;
159  grpc_slice_buffer write_slices;
160
161  grpc_slice slice;
162  char write_buffer[kBufferSize];
163  char read_buffer[kBufferSize];
164  size_t recv_size = 0;
165
166  grpc_slice_buffer_init(&write_slices);
167  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
168  grpc_slice_buffer_add(&write_slices, slice);
169  init_event_closure(&write_done, &write);
170  grpc_endpoint_write(ep_, &write_slices, &write_done);
171
172  XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
173  XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
174
175  while (recv_size < kBufferSize) {
176    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
177    XCTAssertGreaterThanOrEqual(size, 0);
178    recv_size += size;
179  }
180
181  XCTAssertEqual(recv_size, kBufferSize);
182  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
183  ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
184  XCTAssertGreaterThanOrEqual(send_size, 0);
185
186  grpc_slice_buffer_init(&read_slices);
187  grpc_slice_buffer_init(&read_one_slice);
188  while (read_slices.length < kBufferSize) {
189    init_event_closure(&read_done, &read);
190    grpc_endpoint_read(ep_, &read_one_slice, &read_done);
191    XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
192    XCTAssertEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
193    grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
194    XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
195  }
196  XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
197
198  grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
199  grpc_slice_buffer_reset_and_unref(&read_slices);
200  grpc_slice_buffer_reset_and_unref(&write_slices);
201  grpc_slice_buffer_reset_and_unref(&read_one_slice);
202}
203
204- (void)testShutdownBeforeRead {
205  grpc_core::ExecCtx exec_ctx;
206
207  gpr_atm read;
208  grpc_closure read_done;
209  grpc_slice_buffer read_slices;
210  gpr_atm write;
211  grpc_closure write_done;
212  grpc_slice_buffer write_slices;
213
214  grpc_slice slice;
215  char write_buffer[kBufferSize];
216  char read_buffer[kBufferSize];
217  size_t recv_size = 0;
218
219  grpc_slice_buffer_init(&read_slices);
220  init_event_closure(&read_done, &read);
221  grpc_endpoint_read(ep_, &read_slices, &read_done);
222
223  grpc_slice_buffer_init(&write_slices);
224  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
225  grpc_slice_buffer_add(&write_slices, slice);
226  init_event_closure(&write_done, &write);
227  grpc_endpoint_write(ep_, &write_slices, &write_done);
228
229  XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
230  XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
231
232  while (recv_size < kBufferSize) {
233    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
234    XCTAssertGreaterThanOrEqual(size, 0);
235    recv_size += size;
236  }
237
238  XCTAssertEqual(recv_size, kBufferSize);
239  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
240
241  XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], NO);
242
243  grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
244
245  grpc_core::ExecCtx::Get()->Flush();
246  XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
247  XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
248
249  grpc_slice_buffer_reset_and_unref(&read_slices);
250  grpc_slice_buffer_reset_and_unref(&write_slices);
251}
252
253- (void)testRemoteClosed {
254  grpc_core::ExecCtx exec_ctx;
255
256  gpr_atm read;
257  grpc_closure read_done;
258  grpc_slice_buffer read_slices;
259  gpr_atm write;
260  grpc_closure write_done;
261  grpc_slice_buffer write_slices;
262
263  grpc_slice slice;
264  char write_buffer[kBufferSize];
265  char read_buffer[kBufferSize];
266  size_t recv_size = 0;
267
268  init_event_closure(&read_done, &read);
269  grpc_slice_buffer_init(&read_slices);
270  grpc_endpoint_read(ep_, &read_slices, &read_done);
271
272  grpc_slice_buffer_init(&write_slices);
273  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
274  grpc_slice_buffer_add(&write_slices, slice);
275  init_event_closure(&write_done, &write);
276  grpc_endpoint_write(ep_, &write_slices, &write_done);
277
278  XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
279  XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
280
281  while (recv_size < kBufferSize) {
282    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
283    XCTAssertGreaterThanOrEqual(size, 0);
284    recv_size += size;
285  }
286
287  XCTAssertEqual(recv_size, kBufferSize);
288  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
289
290  close(svr_fd_);
291
292  XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
293  XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
294
295  grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
296  grpc_slice_buffer_reset_and_unref(&read_slices);
297  grpc_slice_buffer_reset_and_unref(&write_slices);
298}
299
300- (void)testRemoteReset {
301  grpc_core::ExecCtx exec_ctx;
302
303  gpr_atm read;
304  grpc_closure read_done;
305  grpc_slice_buffer read_slices;
306
307  init_event_closure(&read_done, &read);
308  grpc_slice_buffer_init(&read_slices);
309  grpc_endpoint_read(ep_, &read_slices, &read_done);
310
311  struct linger so_linger;
312  so_linger.l_onoff = 1;
313  so_linger.l_linger = 0;
314  setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
315
316  close(svr_fd_);
317
318  XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
319  XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
320
321  grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
322  grpc_slice_buffer_reset_and_unref(&read_slices);
323}
324
325@end
326
327#else  // GRPC_CFSTREAM
328
329// Dummy test suite
330@interface CFStreamEndpointTests : XCTestCase
331@end
332
333@implementation CFStreamEndpointTests
334- (void)setUp {
335  [super setUp];
336}
337
338- (void)tearDown {
339  [super tearDown];
340}
341
342@end
343
344#endif  // GRPC_CFSTREAM
345