1
2 //
3 //
4 // Copyright 2018 gRPC authors.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 //
10 // http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
17 //
18 //
19
20 #include <grpc/support/port_platform.h>
21
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_CFSTREAM_CLIENT
25
26 #include <CoreFoundation/CoreFoundation.h>
27 #include <grpc/event_engine/endpoint_config.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/sync.h>
30 #include <netinet/in.h>
31 #include <string.h>
32
33 #include "absl/log/log.h"
34 #include "src/core/lib/address_utils/sockaddr_utils.h"
35 #include "src/core/lib/event_engine/shim.h"
36 #include "src/core/lib/iomgr/cfstream_handle.h"
37 #include "src/core/lib/iomgr/closure.h"
38 #include "src/core/lib/iomgr/endpoint_cfstream.h"
39 #include "src/core/lib/iomgr/error.h"
40 #include "src/core/lib/iomgr/error_cfstream.h"
41 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
42 #include "src/core/lib/iomgr/tcp_client.h"
43 #include "src/core/lib/iomgr/timer.h"
44 #include "src/core/util/crash.h"
45 #include "src/core/util/host_port.h"
46
47 struct CFStreamConnect {
48 gpr_mu mu;
49 gpr_refcount refcount;
50
51 CFReadStreamRef read_stream;
52 CFWriteStreamRef write_stream;
53 CFStreamHandle* stream_handle;
54
55 grpc_timer alarm;
56 grpc_closure on_alarm;
57 grpc_closure on_open;
58
59 bool read_stream_open;
60 bool write_stream_open;
61 bool failed;
62
63 grpc_closure* closure;
64 grpc_endpoint** endpoint;
65 int refs;
66 std::string addr_name;
67 };
68
CFStreamConnectCleanup(CFStreamConnect * connect)69 static void CFStreamConnectCleanup(CFStreamConnect* connect) {
70 CFSTREAM_HANDLE_UNREF(connect->stream_handle, "async connect clean up");
71 CFRelease(connect->read_stream);
72 CFRelease(connect->write_stream);
73 gpr_mu_destroy(&connect->mu);
74 delete connect;
75 }
76
OnAlarm(void * arg,grpc_error_handle error)77 static void OnAlarm(void* arg, grpc_error_handle error) {
78 CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
79 GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT :" << connect << " OnAlarm, error:"
80 << grpc_core::StatusToString(error);
81 gpr_mu_lock(&connect->mu);
82 grpc_closure* closure = connect->closure;
83 connect->closure = nil;
84 const bool done = (--connect->refs == 0);
85 gpr_mu_unlock(&connect->mu);
86 // Only schedule a callback once, by either OnAlarm or OnOpen. The
87 // first one issues callback while the second one does cleanup.
88 if (done) {
89 CFStreamConnectCleanup(connect);
90 } else {
91 grpc_error_handle error = GRPC_ERROR_CREATE("connect() timed out");
92 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
93 }
94 }
95
OnOpen(void * arg,grpc_error_handle error)96 static void OnOpen(void* arg, grpc_error_handle error) {
97 CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
98 GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT :" << connect << " OnOpen, error:"
99 << grpc_core::StatusToString(error);
100 gpr_mu_lock(&connect->mu);
101 grpc_timer_cancel(&connect->alarm);
102 grpc_closure* closure = connect->closure;
103 connect->closure = nil;
104
105 bool done = (--connect->refs == 0);
106 grpc_endpoint** endpoint = connect->endpoint;
107
108 // Only schedule a callback once, by either OnAlarm or OnOpen. The
109 // first one issues callback while the second one does cleanup.
110 if (done) {
111 gpr_mu_unlock(&connect->mu);
112 CFStreamConnectCleanup(connect);
113 } else {
114 if (error.ok()) {
115 CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
116 if (stream_error == NULL) {
117 stream_error = CFWriteStreamCopyError(connect->write_stream);
118 }
119 if (stream_error) {
120 error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
121 CFRelease(stream_error);
122 }
123 if (error.ok()) {
124 *endpoint = grpc_cfstream_endpoint_create(
125 connect->read_stream, connect->write_stream,
126 connect->addr_name.c_str(), connect->stream_handle);
127 }
128 }
129 gpr_mu_unlock(&connect->mu);
130 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
131 }
132 }
133
ParseResolvedAddress(const grpc_resolved_address * addr,CFStringRef * host,int * port)134 static void ParseResolvedAddress(const grpc_resolved_address* addr,
135 CFStringRef* host, int* port) {
136 std::string host_port = grpc_sockaddr_to_string(addr, true).value();
137 std::string host_string;
138 std::string port_string;
139 grpc_core::SplitHostPort(host_port, &host_string, &port_string);
140 *host = CFStringCreateWithCString(NULL, host_string.c_str(),
141 kCFStringEncodingUTF8);
142 *port = grpc_sockaddr_get_port(addr);
143 }
144
CFStreamClientConnect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set *,const grpc_event_engine::experimental::EndpointConfig & config,const grpc_resolved_address * resolved_addr,grpc_core::Timestamp deadline)145 static int64_t CFStreamClientConnect(
146 grpc_closure* closure, grpc_endpoint** ep,
147 grpc_pollset_set* /*interested_parties*/,
148 const grpc_event_engine::experimental::EndpointConfig& config,
149 const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) {
150 if (grpc_event_engine::experimental::UseEventEngineClient()) {
151 return grpc_event_engine::experimental::event_engine_tcp_client_connect(
152 closure, ep, config, resolved_addr, deadline);
153 }
154
155 auto addr_uri = grpc_sockaddr_to_uri(resolved_addr);
156 if (!addr_uri.ok()) {
157 grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
158 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
159 return 0;
160 }
161
162 CFStreamConnect* connect = new CFStreamConnect();
163 connect->closure = closure;
164 connect->endpoint = ep;
165 connect->addr_name = addr_uri.value();
166 connect->refs = 2; // One for the connect operation, one for the timer.
167 gpr_ref_init(&connect->refcount, 1);
168 gpr_mu_init(&connect->mu);
169
170 GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT: " << connect << ", "
171 << connect->addr_name
172 << ": asynchronously connecting";
173
174 CFReadStreamRef read_stream;
175 CFWriteStreamRef write_stream;
176
177 CFStringRef host;
178 int port;
179 ParseResolvedAddress(resolved_addr, &host, &port);
180 CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
181 &write_stream);
182 CFRelease(host);
183 connect->read_stream = read_stream;
184 connect->write_stream = write_stream;
185 connect->stream_handle =
186 CFStreamHandle::CreateStreamHandle(read_stream, write_stream);
187 GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
188 grpc_schedule_on_exec_ctx);
189 connect->stream_handle->NotifyOnOpen(&connect->on_open);
190 GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
191 grpc_schedule_on_exec_ctx);
192 gpr_mu_lock(&connect->mu);
193 CFReadStreamOpen(read_stream);
194 CFWriteStreamOpen(write_stream);
195 grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
196 gpr_mu_unlock(&connect->mu);
197 return 0;
198 }
199
CFStreamClientCancelConnect(int64_t connection_handle)200 static bool CFStreamClientCancelConnect(int64_t connection_handle) {
201 if (grpc_event_engine::experimental::UseEventEngineClient()) {
202 return grpc_event_engine::experimental::
203 event_engine_tcp_client_cancel_connect(connection_handle);
204 }
205 return false;
206 }
207
208 grpc_tcp_client_vtable grpc_cfstream_client_vtable = {
209 CFStreamClientConnect, CFStreamClientCancelConnect};
210
211 #endif // GRPC_CFSTREAM_CLIENT
212