1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import <XCTest/XCTest.h> 20 21#include "src/core/lib/iomgr/port.h" 22 23#ifdef GRPC_CFSTREAM 24 25#include <limits.h> 26 27#include <netinet/in.h> 28 29#include <grpc/grpc.h> 30#include <grpc/support/sync.h> 31 32#include "src/core/lib/address_utils/parse_address.h" 33#include "src/core/lib/address_utils/sockaddr_utils.h" 34#include "src/core/lib/event_engine/channel_args_endpoint_config.h" 35#include "src/core/lib/iomgr/endpoint.h" 36#include "src/core/lib/iomgr/resolve_address.h" 37#include "src/core/lib/iomgr/tcp_client.h" 38#include "src/core/lib/resource_quota/api.h" 39#include "test/core/util/test_config.h" 40 41#include <chrono> 42#include <future> 43 44static const int kConnectTimeout = 5; 45static const int kWriteTimeout = 5; 46static const int kReadTimeout = 5; 47 48static const int kBufferSize = 10000; 49 50static const int kRunLoopTimeout = 1; 51 52static void set_error_handle_promise(void *arg, grpc_error_handle error) { 53 std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg); 54 p->set_value(error); 55} 56 57static void init_event_closure(grpc_closure *closure, 58 std::promise<grpc_error_handle> *error_handle) { 59 GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle), 60 grpc_schedule_on_exec_ctx); 61} 62 63static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer, 64 size_t buffer_len) { 65 if (slices->length != buffer_len) { 66 return false; 67 } 68 69 for (int i = 0; i < slices->count; i++) { 70 grpc_slice slice = slices->slices[i]; 71 if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) { 72 return false; 73 } 74 buffer += GRPC_SLICE_LENGTH(slice); 75 } 76 77 return true; 78} 79 80@interface CFStreamEndpointTests : XCTestCase 81 82@end 83 84@implementation CFStreamEndpointTests { 85 grpc_endpoint *ep_; 86 int svr_fd_; 87} 88 89- (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout { 90 grpc_core::ExecCtx::Get()->Flush(); 91 return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout; 92} 93 94+ (void)setUp { 95 grpc_init(); 96} 97 98+ (void)tearDown { 99 grpc_shutdown(); 100} 101 102- (void)setUp { 103 self.continueAfterFailure = NO; 104 105 // Set up CFStream connection before testing the endpoint 106 107 grpc_core::ExecCtx exec_ctx; 108 109 int svr_fd; 110 int r; 111 std::promise<grpc_error_handle> connected_promise; 112 grpc_closure done; 113 114 gpr_log(GPR_DEBUG, "test_succeeds"); 115 116 auto resolved_addr = grpc_core::StringToSockaddr("127.0.0.1:0"); 117 struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr->addr); 118 119 /* create a phony server */ 120 svr_fd = socket(AF_INET, SOCK_STREAM, 0); 121 XCTAssertGreaterThanOrEqual(svr_fd, 0); 122 XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr->len), 0); 123 XCTAssertEqual(listen(svr_fd, 1), 0); 124 125 /* connect to it */ 126 XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr->len), 0); 127 init_event_closure(&done, &connected_promise); 128 auto args = 129 grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs( 130 nullptr); 131 grpc_tcp_client_connect(&done, &ep_, nullptr, 132 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), 133 &*resolved_addr, grpc_core::Timestamp::InfFuture()); 134 135 /* await the connection */ 136 do { 137 resolved_addr->len = sizeof(addr); 138 r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr), 139 reinterpret_cast<socklen_t *>(&resolved_addr->len)); 140 } while (r == -1 && errno == EINTR); 141 XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r), 142 @(errno)); 143 svr_fd_ = r; 144 145 /* wait for the connection callback to finish */ 146 std::future<grpc_error_handle> connected_future = connected_promise.get_future(); 147 XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES); 148 XCTAssertEqual(connected_future.get(), absl::OkStatus()); 149} 150 151- (void)tearDown { 152 grpc_core::ExecCtx exec_ctx; 153 close(svr_fd_); 154 grpc_endpoint_destroy(ep_); 155} 156 157- (void)testReadWrite { 158 grpc_core::ExecCtx exec_ctx; 159 160 grpc_closure read_done; 161 grpc_slice_buffer read_slices; 162 grpc_slice_buffer read_one_slice; 163 std::promise<grpc_error_handle> write_promise; 164 grpc_closure write_done; 165 grpc_slice_buffer write_slices; 166 167 grpc_slice slice; 168 char write_buffer[kBufferSize]; 169 char read_buffer[kBufferSize]; 170 size_t recv_size = 0; 171 172 grpc_slice_buffer_init(&write_slices); 173 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 174 grpc_slice_buffer_add(&write_slices, slice); 175 init_event_closure(&write_done, &write_promise); 176 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX); 177 178 std::future<grpc_error_handle> write_future = write_promise.get_future(); 179 XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); 180 XCTAssertEqual(write_future.get(), absl::OkStatus()); 181 182 while (recv_size < kBufferSize) { 183 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 184 XCTAssertGreaterThanOrEqual(size, 0); 185 recv_size += size; 186 } 187 188 XCTAssertEqual(recv_size, kBufferSize); 189 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 190 ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0); 191 XCTAssertGreaterThanOrEqual(send_size, 0); 192 193 grpc_slice_buffer_init(&read_slices); 194 grpc_slice_buffer_init(&read_one_slice); 195 while (read_slices.length < kBufferSize) { 196 std::promise<grpc_error_handle> read_promise; 197 init_event_closure(&read_done, &read_promise); 198 grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false, 199 /*min_progress_size=*/1); 200 std::future<grpc_error_handle> read_future = read_promise.get_future(); 201 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 202 XCTAssertEqual(read_future.get(), absl::OkStatus()); 203 grpc_slice_buffer_move_into(&read_one_slice, &read_slices); 204 XCTAssertLessThanOrEqual(read_slices.length, kBufferSize); 205 } 206 XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize)); 207 208 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 209 grpc_slice_buffer_reset_and_unref(&read_slices); 210 grpc_slice_buffer_reset_and_unref(&write_slices); 211 grpc_slice_buffer_reset_and_unref(&read_one_slice); 212} 213 214- (void)testShutdownBeforeRead { 215 grpc_core::ExecCtx exec_ctx; 216 217 std::promise<grpc_error_handle> read_promise; 218 grpc_closure read_done; 219 grpc_slice_buffer read_slices; 220 std::promise<grpc_error_handle> write_promise; 221 grpc_closure write_done; 222 grpc_slice_buffer write_slices; 223 224 grpc_slice slice; 225 char write_buffer[kBufferSize]; 226 char read_buffer[kBufferSize]; 227 size_t recv_size = 0; 228 229 grpc_slice_buffer_init(&read_slices); 230 init_event_closure(&read_done, &read_promise); 231 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false, 232 /*min_progress_size=*/1); 233 234 grpc_slice_buffer_init(&write_slices); 235 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 236 grpc_slice_buffer_add(&write_slices, slice); 237 init_event_closure(&write_done, &write_promise); 238 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX); 239 240 std::future<grpc_error_handle> write_future = write_promise.get_future(); 241 XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); 242 XCTAssertEqual(write_future.get(), absl::OkStatus()); 243 244 while (recv_size < kBufferSize) { 245 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 246 XCTAssertGreaterThanOrEqual(size, 0); 247 recv_size += size; 248 } 249 250 XCTAssertEqual(recv_size, kBufferSize); 251 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 252 253 std::future<grpc_error_handle> read_future = read_promise.get_future(); 254 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO); 255 256 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 257 258 grpc_core::ExecCtx::Get()->Flush(); 259 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 260 XCTAssertNotEqual(read_future.get(), absl::OkStatus()); 261 262 grpc_slice_buffer_reset_and_unref(&read_slices); 263 grpc_slice_buffer_reset_and_unref(&write_slices); 264} 265 266- (void)testRemoteClosed { 267 grpc_core::ExecCtx exec_ctx; 268 269 std::promise<grpc_error_handle> read_promise; 270 grpc_closure read_done; 271 grpc_slice_buffer read_slices; 272 std::promise<grpc_error_handle> write_promise; 273 grpc_closure write_done; 274 grpc_slice_buffer write_slices; 275 276 grpc_slice slice; 277 char write_buffer[kBufferSize]; 278 char read_buffer[kBufferSize]; 279 size_t recv_size = 0; 280 281 init_event_closure(&read_done, &read_promise); 282 grpc_slice_buffer_init(&read_slices); 283 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false, 284 /*min_progress_size=*/1); 285 286 grpc_slice_buffer_init(&write_slices); 287 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 288 grpc_slice_buffer_add(&write_slices, slice); 289 290 init_event_closure(&write_done, &write_promise); 291 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX); 292 293 std::future<grpc_error_handle> write_future = write_promise.get_future(); 294 XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); 295 XCTAssertEqual(write_future.get(), absl::OkStatus()); 296 297 while (recv_size < kBufferSize) { 298 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 299 XCTAssertGreaterThanOrEqual(size, 0); 300 recv_size += size; 301 } 302 303 XCTAssertEqual(recv_size, kBufferSize); 304 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 305 306 close(svr_fd_); 307 308 std::future<grpc_error_handle> read_future = read_promise.get_future(); 309 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 310 XCTAssertNotEqual(read_future.get(), absl::OkStatus()); 311 312 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 313 grpc_slice_buffer_reset_and_unref(&read_slices); 314 grpc_slice_buffer_reset_and_unref(&write_slices); 315} 316 317- (void)testRemoteReset { 318 grpc_core::ExecCtx exec_ctx; 319 320 std::promise<grpc_error_handle> read_promise; 321 grpc_closure read_done; 322 grpc_slice_buffer read_slices; 323 324 init_event_closure(&read_done, &read_promise); 325 grpc_slice_buffer_init(&read_slices); 326 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false, 327 /*min_progress_size=*/1); 328 329 struct linger so_linger; 330 so_linger.l_onoff = 1; 331 so_linger.l_linger = 0; 332 setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); 333 334 close(svr_fd_); 335 336 std::future<grpc_error_handle> read_future = read_promise.get_future(); 337 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 338 XCTAssertNotEqual(read_future.get(), absl::OkStatus()); 339 340 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 341 grpc_slice_buffer_reset_and_unref(&read_slices); 342} 343 344@end 345 346#else // GRPC_CFSTREAM 347 348// Phony test suite 349@interface CFStreamEndpointTests : XCTestCase 350@end 351 352@implementation CFStreamEndpointTests 353- (void)setUp { 354 [super setUp]; 355} 356 357- (void)tearDown { 358 [super tearDown]; 359} 360 361@end 362 363#endif // GRPC_CFSTREAM 364