1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import <XCTest/XCTest.h> 20 21#include "src/core/lib/iomgr/port.h" 22 23#ifdef GRPC_CFSTREAM 24 25#include <netinet/in.h> 26 27#include <grpc/impl/codegen/sync.h> 28#include <grpc/support/sync.h> 29 30#include "src/core/lib/iomgr/endpoint.h" 31#include "src/core/lib/iomgr/resolve_address.h" 32#include "src/core/lib/iomgr/tcp_client.h" 33#include "test/core/util/test_config.h" 34 35static const int kConnectTimeout = 5; 36static const int kWriteTimeout = 5; 37static const int kReadTimeout = 5; 38 39static const int kBufferSize = 10000; 40 41static const int kRunLoopTimeout = 1; 42 43static void set_atm(void *arg, grpc_error *error) { 44 gpr_atm *p = static_cast<gpr_atm *>(arg); 45 gpr_atm_full_cas(p, -1, reinterpret_cast<gpr_atm>(error)); 46} 47 48static void init_event_closure(grpc_closure *closure, gpr_atm *atm) { 49 *atm = -1; 50 GRPC_CLOSURE_INIT(closure, set_atm, static_cast<void *>(atm), grpc_schedule_on_exec_ctx); 51} 52 53static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer, 54 size_t buffer_len) { 55 if (slices->length != buffer_len) { 56 return false; 57 } 58 59 for (int i = 0; i < slices->count; i++) { 60 grpc_slice slice = slices->slices[i]; 61 if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) { 62 return false; 63 } 64 buffer += GRPC_SLICE_LENGTH(slice); 65 } 66 67 return true; 68} 69 70@interface CFStreamEndpointTests : XCTestCase 71 72@end 73 74@implementation CFStreamEndpointTests { 75 grpc_endpoint *ep_; 76 int svr_fd_; 77} 78 79- (BOOL)waitForEvent:(gpr_atm *)event timeout:(int)timeout { 80 grpc_core::ExecCtx::Get()->Flush(); 81 82 NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kConnectTimeout]; 83 while (gpr_atm_acq_load(event) == -1 && [deadline timeIntervalSinceNow] > 0) { 84 NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kRunLoopTimeout]; 85 [[NSRunLoop mainRunLoop] runMode:NSDefaultRunLoopMode beforeDate:deadline]; 86 } 87 88 return (gpr_atm_acq_load(event) != -1); 89} 90 91+ (void)setUp { 92 grpc_init(); 93} 94 95+ (void)tearDown { 96 grpc_shutdown(); 97} 98 99- (void)setUp { 100 self.continueAfterFailure = NO; 101 102 // Set up CFStream connection before testing the endpoint 103 104 grpc_core::ExecCtx exec_ctx; 105 106 grpc_resolved_address resolved_addr; 107 struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr); 108 int svr_fd; 109 int r; 110 gpr_atm connected = -1; 111 grpc_closure done; 112 113 gpr_log(GPR_DEBUG, "test_succeeds"); 114 115 memset(&resolved_addr, 0, sizeof(resolved_addr)); 116 resolved_addr.len = sizeof(struct sockaddr_in); 117 addr->sin_family = AF_INET; 118 119 /* create a dummy server */ 120 svr_fd = socket(AF_INET, SOCK_STREAM, 0); 121 XCTAssertGreaterThanOrEqual(svr_fd, 0); 122 XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0); 123 XCTAssertEqual(listen(svr_fd, 1), 0); 124 125 /* connect to it */ 126 XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0); 127 init_event_closure(&done, &connected); 128 grpc_tcp_client_connect(&done, &ep_, nullptr, nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE); 129 130 /* await the connection */ 131 do { 132 resolved_addr.len = sizeof(addr); 133 r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr), 134 reinterpret_cast<socklen_t *>(&resolved_addr.len)); 135 } while (r == -1 && errno == EINTR); 136 XCTAssertGreaterThanOrEqual(r, 0); 137 svr_fd_ = r; 138 139 /* wait for the connection callback to finish */ 140 XCTAssertEqual([self waitForEvent:&connected timeout:kConnectTimeout], YES); 141 XCTAssertEqual(reinterpret_cast<grpc_error *>(connected), GRPC_ERROR_NONE); 142} 143 144- (void)tearDown { 145 grpc_core::ExecCtx exec_ctx; 146 close(svr_fd_); 147 grpc_endpoint_destroy(ep_); 148} 149 150- (void)testReadWrite { 151 grpc_core::ExecCtx exec_ctx; 152 153 gpr_atm read; 154 grpc_closure read_done; 155 grpc_slice_buffer read_slices; 156 grpc_slice_buffer read_one_slice; 157 gpr_atm write; 158 grpc_closure write_done; 159 grpc_slice_buffer write_slices; 160 161 grpc_slice slice; 162 char write_buffer[kBufferSize]; 163 char read_buffer[kBufferSize]; 164 size_t recv_size = 0; 165 166 grpc_slice_buffer_init(&write_slices); 167 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 168 grpc_slice_buffer_add(&write_slices, slice); 169 init_event_closure(&write_done, &write); 170 grpc_endpoint_write(ep_, &write_slices, &write_done); 171 172 XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES); 173 XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE); 174 175 while (recv_size < kBufferSize) { 176 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 177 XCTAssertGreaterThanOrEqual(size, 0); 178 recv_size += size; 179 } 180 181 XCTAssertEqual(recv_size, kBufferSize); 182 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 183 ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0); 184 XCTAssertGreaterThanOrEqual(send_size, 0); 185 186 grpc_slice_buffer_init(&read_slices); 187 grpc_slice_buffer_init(&read_one_slice); 188 while (read_slices.length < kBufferSize) { 189 init_event_closure(&read_done, &read); 190 grpc_endpoint_read(ep_, &read_one_slice, &read_done); 191 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES); 192 XCTAssertEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE); 193 grpc_slice_buffer_move_into(&read_one_slice, &read_slices); 194 XCTAssertLessThanOrEqual(read_slices.length, kBufferSize); 195 } 196 XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize)); 197 198 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); 199 grpc_slice_buffer_reset_and_unref(&read_slices); 200 grpc_slice_buffer_reset_and_unref(&write_slices); 201 grpc_slice_buffer_reset_and_unref(&read_one_slice); 202} 203 204- (void)testShutdownBeforeRead { 205 grpc_core::ExecCtx exec_ctx; 206 207 gpr_atm read; 208 grpc_closure read_done; 209 grpc_slice_buffer read_slices; 210 gpr_atm write; 211 grpc_closure write_done; 212 grpc_slice_buffer write_slices; 213 214 grpc_slice slice; 215 char write_buffer[kBufferSize]; 216 char read_buffer[kBufferSize]; 217 size_t recv_size = 0; 218 219 grpc_slice_buffer_init(&read_slices); 220 init_event_closure(&read_done, &read); 221 grpc_endpoint_read(ep_, &read_slices, &read_done); 222 223 grpc_slice_buffer_init(&write_slices); 224 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 225 grpc_slice_buffer_add(&write_slices, slice); 226 init_event_closure(&write_done, &write); 227 grpc_endpoint_write(ep_, &write_slices, &write_done); 228 229 XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES); 230 XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE); 231 232 while (recv_size < kBufferSize) { 233 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 234 XCTAssertGreaterThanOrEqual(size, 0); 235 recv_size += size; 236 } 237 238 XCTAssertEqual(recv_size, kBufferSize); 239 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 240 241 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], NO); 242 243 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); 244 245 grpc_core::ExecCtx::Get()->Flush(); 246 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES); 247 XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE); 248 249 grpc_slice_buffer_reset_and_unref(&read_slices); 250 grpc_slice_buffer_reset_and_unref(&write_slices); 251} 252 253- (void)testRemoteClosed { 254 grpc_core::ExecCtx exec_ctx; 255 256 gpr_atm read; 257 grpc_closure read_done; 258 grpc_slice_buffer read_slices; 259 gpr_atm write; 260 grpc_closure write_done; 261 grpc_slice_buffer write_slices; 262 263 grpc_slice slice; 264 char write_buffer[kBufferSize]; 265 char read_buffer[kBufferSize]; 266 size_t recv_size = 0; 267 268 init_event_closure(&read_done, &read); 269 grpc_slice_buffer_init(&read_slices); 270 grpc_endpoint_read(ep_, &read_slices, &read_done); 271 272 grpc_slice_buffer_init(&write_slices); 273 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 274 grpc_slice_buffer_add(&write_slices, slice); 275 init_event_closure(&write_done, &write); 276 grpc_endpoint_write(ep_, &write_slices, &write_done); 277 278 XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES); 279 XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE); 280 281 while (recv_size < kBufferSize) { 282 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 283 XCTAssertGreaterThanOrEqual(size, 0); 284 recv_size += size; 285 } 286 287 XCTAssertEqual(recv_size, kBufferSize); 288 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 289 290 close(svr_fd_); 291 292 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES); 293 XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE); 294 295 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); 296 grpc_slice_buffer_reset_and_unref(&read_slices); 297 grpc_slice_buffer_reset_and_unref(&write_slices); 298} 299 300- (void)testRemoteReset { 301 grpc_core::ExecCtx exec_ctx; 302 303 gpr_atm read; 304 grpc_closure read_done; 305 grpc_slice_buffer read_slices; 306 307 init_event_closure(&read_done, &read); 308 grpc_slice_buffer_init(&read_slices); 309 grpc_endpoint_read(ep_, &read_slices, &read_done); 310 311 struct linger so_linger; 312 so_linger.l_onoff = 1; 313 so_linger.l_linger = 0; 314 setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); 315 316 close(svr_fd_); 317 318 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES); 319 XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE); 320 321 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE); 322 grpc_slice_buffer_reset_and_unref(&read_slices); 323} 324 325@end 326 327#else // GRPC_CFSTREAM 328 329// Dummy test suite 330@interface CFStreamEndpointTests : XCTestCase 331@end 332 333@implementation CFStreamEndpointTests 334- (void)setUp { 335 [super setUp]; 336} 337 338- (void)tearDown { 339 [super tearDown]; 340} 341 342@end 343 344#endif // GRPC_CFSTREAM 345