• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 //
3 //
4 // Copyright 2018 gRPC authors.
5 //
6 // Licensed under the Apache License, Version 2.0 (the "License");
7 // you may not use this file except in compliance with the License.
8 // You may obtain a copy of the License at
9 //
10 //     http://www.apache.org/licenses/LICENSE-2.0
11 //
12 // Unless required by applicable law or agreed to in writing, software
13 // distributed under the License is distributed on an "AS IS" BASIS,
14 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 // See the License for the specific language governing permissions and
16 // limitations under the License.
17 //
18 //
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_CFSTREAM_CLIENT
25 
26 #include <CoreFoundation/CoreFoundation.h>
27 #include <grpc/event_engine/endpoint_config.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/sync.h>
30 #include <netinet/in.h>
31 #include <string.h>
32 
33 #include "absl/log/log.h"
34 #include "src/core/lib/address_utils/sockaddr_utils.h"
35 #include "src/core/lib/event_engine/shim.h"
36 #include "src/core/lib/iomgr/cfstream_handle.h"
37 #include "src/core/lib/iomgr/closure.h"
38 #include "src/core/lib/iomgr/endpoint_cfstream.h"
39 #include "src/core/lib/iomgr/error.h"
40 #include "src/core/lib/iomgr/error_cfstream.h"
41 #include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
42 #include "src/core/lib/iomgr/tcp_client.h"
43 #include "src/core/lib/iomgr/timer.h"
44 #include "src/core/util/crash.h"
45 #include "src/core/util/host_port.h"
46 
47 struct CFStreamConnect {
48   gpr_mu mu;
49   gpr_refcount refcount;
50 
51   CFReadStreamRef read_stream;
52   CFWriteStreamRef write_stream;
53   CFStreamHandle* stream_handle;
54 
55   grpc_timer alarm;
56   grpc_closure on_alarm;
57   grpc_closure on_open;
58 
59   bool read_stream_open;
60   bool write_stream_open;
61   bool failed;
62 
63   grpc_closure* closure;
64   grpc_endpoint** endpoint;
65   int refs;
66   std::string addr_name;
67 };
68 
CFStreamConnectCleanup(CFStreamConnect * connect)69 static void CFStreamConnectCleanup(CFStreamConnect* connect) {
70   CFSTREAM_HANDLE_UNREF(connect->stream_handle, "async connect clean up");
71   CFRelease(connect->read_stream);
72   CFRelease(connect->write_stream);
73   gpr_mu_destroy(&connect->mu);
74   delete connect;
75 }
76 
OnAlarm(void * arg,grpc_error_handle error)77 static void OnAlarm(void* arg, grpc_error_handle error) {
78   CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
79   GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT :" << connect << " OnAlarm, error:"
80                           << grpc_core::StatusToString(error);
81   gpr_mu_lock(&connect->mu);
82   grpc_closure* closure = connect->closure;
83   connect->closure = nil;
84   const bool done = (--connect->refs == 0);
85   gpr_mu_unlock(&connect->mu);
86   // Only schedule a callback once, by either OnAlarm or OnOpen. The
87   // first one issues callback while the second one does cleanup.
88   if (done) {
89     CFStreamConnectCleanup(connect);
90   } else {
91     grpc_error_handle error = GRPC_ERROR_CREATE("connect() timed out");
92     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
93   }
94 }
95 
OnOpen(void * arg,grpc_error_handle error)96 static void OnOpen(void* arg, grpc_error_handle error) {
97   CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
98   GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT :" << connect << " OnOpen, error:"
99                           << grpc_core::StatusToString(error);
100   gpr_mu_lock(&connect->mu);
101   grpc_timer_cancel(&connect->alarm);
102   grpc_closure* closure = connect->closure;
103   connect->closure = nil;
104 
105   bool done = (--connect->refs == 0);
106   grpc_endpoint** endpoint = connect->endpoint;
107 
108   // Only schedule a callback once, by either OnAlarm or OnOpen. The
109   // first one issues callback while the second one does cleanup.
110   if (done) {
111     gpr_mu_unlock(&connect->mu);
112     CFStreamConnectCleanup(connect);
113   } else {
114     if (error.ok()) {
115       CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
116       if (stream_error == NULL) {
117         stream_error = CFWriteStreamCopyError(connect->write_stream);
118       }
119       if (stream_error) {
120         error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
121         CFRelease(stream_error);
122       }
123       if (error.ok()) {
124         *endpoint = grpc_cfstream_endpoint_create(
125             connect->read_stream, connect->write_stream,
126             connect->addr_name.c_str(), connect->stream_handle);
127       }
128     }
129     gpr_mu_unlock(&connect->mu);
130     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
131   }
132 }
133 
ParseResolvedAddress(const grpc_resolved_address * addr,CFStringRef * host,int * port)134 static void ParseResolvedAddress(const grpc_resolved_address* addr,
135                                  CFStringRef* host, int* port) {
136   std::string host_port = grpc_sockaddr_to_string(addr, true).value();
137   std::string host_string;
138   std::string port_string;
139   grpc_core::SplitHostPort(host_port, &host_string, &port_string);
140   *host = CFStringCreateWithCString(NULL, host_string.c_str(),
141                                     kCFStringEncodingUTF8);
142   *port = grpc_sockaddr_get_port(addr);
143 }
144 
CFStreamClientConnect(grpc_closure * closure,grpc_endpoint ** ep,grpc_pollset_set *,const grpc_event_engine::experimental::EndpointConfig & config,const grpc_resolved_address * resolved_addr,grpc_core::Timestamp deadline)145 static int64_t CFStreamClientConnect(
146     grpc_closure* closure, grpc_endpoint** ep,
147     grpc_pollset_set* /*interested_parties*/,
148     const grpc_event_engine::experimental::EndpointConfig& config,
149     const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) {
150   if (grpc_event_engine::experimental::UseEventEngineClient()) {
151     return grpc_event_engine::experimental::event_engine_tcp_client_connect(
152         closure, ep, config, resolved_addr, deadline);
153   }
154 
155   auto addr_uri = grpc_sockaddr_to_uri(resolved_addr);
156   if (!addr_uri.ok()) {
157     grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
158     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
159     return 0;
160   }
161 
162   CFStreamConnect* connect = new CFStreamConnect();
163   connect->closure = closure;
164   connect->endpoint = ep;
165   connect->addr_name = addr_uri.value();
166   connect->refs = 2;  // One for the connect operation, one for the timer.
167   gpr_ref_init(&connect->refcount, 1);
168   gpr_mu_init(&connect->mu);
169 
170   GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT: " << connect << ", "
171                           << connect->addr_name
172                           << ": asynchronously connecting";
173 
174   CFReadStreamRef read_stream;
175   CFWriteStreamRef write_stream;
176 
177   CFStringRef host;
178   int port;
179   ParseResolvedAddress(resolved_addr, &host, &port);
180   CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
181                                      &write_stream);
182   CFRelease(host);
183   connect->read_stream = read_stream;
184   connect->write_stream = write_stream;
185   connect->stream_handle =
186       CFStreamHandle::CreateStreamHandle(read_stream, write_stream);
187   GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
188                     grpc_schedule_on_exec_ctx);
189   connect->stream_handle->NotifyOnOpen(&connect->on_open);
190   GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
191                     grpc_schedule_on_exec_ctx);
192   gpr_mu_lock(&connect->mu);
193   CFReadStreamOpen(read_stream);
194   CFWriteStreamOpen(write_stream);
195   grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
196   gpr_mu_unlock(&connect->mu);
197   return 0;
198 }
199 
CFStreamClientCancelConnect(int64_t connection_handle)200 static bool CFStreamClientCancelConnect(int64_t connection_handle) {
201   if (grpc_event_engine::experimental::UseEventEngineClient()) {
202     return grpc_event_engine::experimental::
203         event_engine_tcp_client_cancel_connect(connection_handle);
204   }
205   return false;
206 }
207 
208 grpc_tcp_client_vtable grpc_cfstream_client_vtable = {
209     CFStreamClientConnect, CFStreamClientCancelConnect};
210 
211 #endif  // GRPC_CFSTREAM_CLIENT
212