1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_UV
24 #include <limits.h>
25 #include <string.h>
26
27 #include <grpc/slice_buffer.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/iomgr/error.h"
35 #include "src/core/lib/iomgr/iomgr_custom.h"
36 #include "src/core/lib/iomgr/network_status_tracker.h"
37 #include "src/core/lib/iomgr/resolve_address_custom.h"
38 #include "src/core/lib/iomgr/resource_quota.h"
39 #include "src/core/lib/iomgr/tcp_custom.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42
43 #include <uv.h>
44
45 #define IGNORE_CONST(addr) ((grpc_sockaddr*)(uintptr_t)(addr))
46
47 typedef struct uv_socket_t {
48 uv_connect_t connect_req;
49 uv_write_t write_req;
50 uv_shutdown_t shutdown_req;
51 uv_tcp_t* handle;
52 uv_buf_t* write_buffers;
53
54 char* read_buf;
55 size_t read_len;
56
57 bool pending_connection;
58 grpc_custom_socket* accept_socket;
59 grpc_error* accept_error;
60
61 grpc_custom_connect_callback connect_cb;
62 grpc_custom_write_callback write_cb;
63 grpc_custom_read_callback read_cb;
64 grpc_custom_accept_callback accept_cb;
65 grpc_custom_close_callback close_cb;
66
67 } uv_socket_t;
68
tcp_error_create(const char * desc,int status)69 static grpc_error* tcp_error_create(const char* desc, int status) {
70 if (status == 0) {
71 return GRPC_ERROR_NONE;
72 }
73 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc);
74 /* All tcp errors are marked with UNAVAILABLE so that application may
75 * choose to retry. */
76 error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
77 GRPC_STATUS_UNAVAILABLE);
78 return grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
79 grpc_slice_from_static_string(uv_strerror(status)));
80 }
81
uv_socket_destroy(grpc_custom_socket * socket)82 static void uv_socket_destroy(grpc_custom_socket* socket) {
83 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
84 gpr_free(uv_socket->handle);
85 gpr_free(uv_socket);
86 }
87
alloc_uv_buf(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)88 static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size,
89 uv_buf_t* buf) {
90 uv_socket_t* uv_socket =
91 (uv_socket_t*)((grpc_custom_socket*)handle->data)->impl;
92 (void)suggested_size;
93 buf->base = uv_socket->read_buf;
94 buf->len = uv_socket->read_len;
95 }
96
uv_read_callback(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)97 static void uv_read_callback(uv_stream_t* stream, ssize_t nread,
98 const uv_buf_t* buf) {
99 grpc_error* error = GRPC_ERROR_NONE;
100 if (nread == 0) {
101 // Nothing happened. Wait for the next callback
102 return;
103 }
104 // TODO(murgatroid99): figure out what the return value here means
105 uv_read_stop(stream);
106 if (nread == UV_EOF) {
107 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
108 } else if (nread < 0) {
109 error = tcp_error_create("TCP Read failed", nread);
110 }
111 grpc_custom_socket* socket = (grpc_custom_socket*)stream->data;
112 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
113 uv_socket->read_cb(socket, (size_t)nread, error);
114 }
115
uv_close_callback(uv_handle_t * handle)116 static void uv_close_callback(uv_handle_t* handle) {
117 grpc_custom_socket* socket = (grpc_custom_socket*)handle->data;
118 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
119 if (uv_socket->accept_socket) {
120 uv_socket->accept_cb(socket, uv_socket->accept_socket,
121 GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket closed"));
122 }
123 uv_socket->close_cb(socket);
124 }
125
uv_socket_read(grpc_custom_socket * socket,char * buffer,size_t length,grpc_custom_read_callback read_cb)126 static void uv_socket_read(grpc_custom_socket* socket, char* buffer,
127 size_t length, grpc_custom_read_callback read_cb) {
128 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
129 int status;
130 grpc_error* error;
131 uv_socket->read_cb = read_cb;
132 uv_socket->read_buf = buffer;
133 uv_socket->read_len = length;
134 // TODO(murgatroid99): figure out what the return value here means
135 status =
136 uv_read_start((uv_stream_t*)uv_socket->handle, (uv_alloc_cb)alloc_uv_buf,
137 (uv_read_cb)uv_read_callback);
138 if (status != 0) {
139 error = tcp_error_create("TCP Read failed at start", status);
140 uv_socket->read_cb(socket, 0, error);
141 }
142 }
143
uv_write_callback(uv_write_t * req,int status)144 static void uv_write_callback(uv_write_t* req, int status) {
145 grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
146 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
147 gpr_free(uv_socket->write_buffers);
148 uv_socket->write_cb(socket, tcp_error_create("TCP Write failed", status));
149 }
150
uv_socket_write(grpc_custom_socket * socket,grpc_slice_buffer * write_slices,grpc_custom_write_callback write_cb)151 void uv_socket_write(grpc_custom_socket* socket,
152 grpc_slice_buffer* write_slices,
153 grpc_custom_write_callback write_cb) {
154 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
155 uv_socket->write_cb = write_cb;
156 uv_buf_t* uv_buffers;
157 uv_write_t* write_req;
158
159 uv_buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * write_slices->count);
160 for (size_t i = 0; i < write_slices->count; i++) {
161 uv_buffers[i].base = (char*)GRPC_SLICE_START_PTR(write_slices->slices[i]);
162 uv_buffers[i].len = GRPC_SLICE_LENGTH(write_slices->slices[i]);
163 }
164
165 uv_socket->write_buffers = uv_buffers;
166 write_req = &uv_socket->write_req;
167 write_req->data = socket;
168 // TODO(murgatroid99): figure out what the return value here means
169 uv_write(write_req, (uv_stream_t*)uv_socket->handle, uv_buffers,
170 write_slices->count, uv_write_callback);
171 }
172
shutdown_callback(uv_shutdown_t * req,int status)173 static void shutdown_callback(uv_shutdown_t* req, int status) {}
174
uv_socket_shutdown(grpc_custom_socket * socket)175 static void uv_socket_shutdown(grpc_custom_socket* socket) {
176 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
177 uv_shutdown_t* req = &uv_socket->shutdown_req;
178 uv_shutdown(req, (uv_stream_t*)uv_socket->handle, shutdown_callback);
179 }
180
uv_socket_close(grpc_custom_socket * socket,grpc_custom_close_callback close_cb)181 static void uv_socket_close(grpc_custom_socket* socket,
182 grpc_custom_close_callback close_cb) {
183 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
184 uv_socket->close_cb = close_cb;
185 uv_close((uv_handle_t*)uv_socket->handle, uv_close_callback);
186 }
187
uv_socket_init_helper(uv_socket_t * uv_socket,int domain)188 static grpc_error* uv_socket_init_helper(uv_socket_t* uv_socket, int domain) {
189 uv_tcp_t* tcp = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
190 uv_socket->handle = tcp;
191 int status = uv_tcp_init_ex(uv_default_loop(), tcp, (unsigned int)domain);
192 if (status != 0) {
193 return tcp_error_create("Failed to initialize UV tcp handle", status);
194 }
195 #if defined(GPR_LINUX) && defined(SO_REUSEPORT)
196 if (domain == AF_INET || domain == AF_INET6) {
197 int enable = 1;
198 int fd;
199 uv_fileno((uv_handle_t*)tcp, &fd);
200 // TODO Handle error here.
201 setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
202 }
203 #endif
204 uv_socket->write_buffers = nullptr;
205 uv_socket->read_len = 0;
206 uv_tcp_nodelay(uv_socket->handle, 1);
207 // Node uses a garbage collector to call destructors, so we don't
208 // want to hold the uv loop open with active gRPC objects.
209 uv_unref((uv_handle_t*)uv_socket->handle);
210 uv_socket->pending_connection = false;
211 uv_socket->accept_socket = nullptr;
212 uv_socket->accept_error = GRPC_ERROR_NONE;
213 return GRPC_ERROR_NONE;
214 }
215
uv_socket_init(grpc_custom_socket * socket,int domain)216 static grpc_error* uv_socket_init(grpc_custom_socket* socket, int domain) {
217 uv_socket_t* uv_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
218 grpc_error* error = uv_socket_init_helper(uv_socket, domain);
219 if (error != GRPC_ERROR_NONE) {
220 return error;
221 }
222 uv_socket->handle->data = socket;
223 socket->impl = uv_socket;
224 return GRPC_ERROR_NONE;
225 }
226
uv_socket_getpeername(grpc_custom_socket * socket,const grpc_sockaddr * addr,int * addr_len)227 static grpc_error* uv_socket_getpeername(grpc_custom_socket* socket,
228 const grpc_sockaddr* addr,
229 int* addr_len) {
230 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
231 int err = uv_tcp_getpeername(uv_socket->handle,
232 (struct sockaddr*)IGNORE_CONST(addr), addr_len);
233 return tcp_error_create("getpeername failed", err);
234 }
235
uv_socket_getsockname(grpc_custom_socket * socket,const grpc_sockaddr * addr,int * addr_len)236 static grpc_error* uv_socket_getsockname(grpc_custom_socket* socket,
237 const grpc_sockaddr* addr,
238 int* addr_len) {
239 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
240 int err = uv_tcp_getsockname(uv_socket->handle,
241 (struct sockaddr*)IGNORE_CONST(addr), addr_len);
242 return tcp_error_create("getsockname failed", err);
243 }
244
accept_new_connection(grpc_custom_socket * socket)245 static void accept_new_connection(grpc_custom_socket* socket) {
246 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
247 if (!uv_socket->pending_connection || !uv_socket->accept_socket) {
248 return;
249 }
250 grpc_custom_socket* new_socket = uv_socket->accept_socket;
251 grpc_error* error = uv_socket->accept_error;
252 uv_socket->accept_socket = nullptr;
253 uv_socket->accept_error = GRPC_ERROR_NONE;
254 uv_socket->pending_connection = false;
255 if (uv_socket->accept_error != GRPC_ERROR_NONE) {
256 uv_stream_t dummy_handle;
257 uv_accept((uv_stream_t*)uv_socket->handle, &dummy_handle);
258 uv_socket->accept_cb(socket, new_socket, error);
259 } else {
260 uv_socket_t* uv_new_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
261 uv_socket_init_helper(uv_new_socket, AF_UNSPEC);
262 // UV documentation says this is guaranteed to succeed
263 GPR_ASSERT(uv_accept((uv_stream_t*)uv_socket->handle,
264 (uv_stream_t*)uv_new_socket->handle) == 0);
265 new_socket->impl = uv_new_socket;
266 uv_new_socket->handle->data = new_socket;
267 uv_socket->accept_cb(socket, new_socket, error);
268 }
269 }
270
uv_on_connect(uv_stream_t * server,int status)271 static void uv_on_connect(uv_stream_t* server, int status) {
272 grpc_custom_socket* socket = (grpc_custom_socket*)server->data;
273 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
274 GPR_ASSERT(!uv_socket->pending_connection);
275 uv_socket->pending_connection = true;
276 if (status < 0) {
277 switch (status) {
278 case UV_EINTR:
279 case UV_EAGAIN:
280 return;
281 default:
282 uv_socket->accept_error = tcp_error_create("accept failed", status);
283 }
284 }
285 accept_new_connection(socket);
286 }
287
uv_socket_accept(grpc_custom_socket * socket,grpc_custom_socket * new_socket,grpc_custom_accept_callback accept_cb)288 void uv_socket_accept(grpc_custom_socket* socket,
289 grpc_custom_socket* new_socket,
290 grpc_custom_accept_callback accept_cb) {
291 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
292 uv_socket->accept_cb = accept_cb;
293 GPR_ASSERT(uv_socket->accept_socket == nullptr);
294 uv_socket->accept_socket = new_socket;
295 accept_new_connection(socket);
296 }
297
uv_socket_bind(grpc_custom_socket * socket,const grpc_sockaddr * addr,size_t len,int flags)298 static grpc_error* uv_socket_bind(grpc_custom_socket* socket,
299 const grpc_sockaddr* addr, size_t len,
300 int flags) {
301 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
302 int status =
303 uv_tcp_bind((uv_tcp_t*)uv_socket->handle, (struct sockaddr*)addr, 0);
304 return tcp_error_create("Failed to bind to port", status);
305 }
306
uv_socket_listen(grpc_custom_socket * socket)307 static grpc_error* uv_socket_listen(grpc_custom_socket* socket) {
308 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
309 int status =
310 uv_listen((uv_stream_t*)uv_socket->handle, SOMAXCONN, uv_on_connect);
311 return tcp_error_create("Failed to listen to port", status);
312 }
313
uv_tc_on_connect(uv_connect_t * req,int status)314 static void uv_tc_on_connect(uv_connect_t* req, int status) {
315 grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
316 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
317 grpc_error* error;
318 if (status == UV_ECANCELED) {
319 // This should only happen if the handle is already closed
320 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timeout occurred");
321 } else {
322 error = tcp_error_create("Failed to connect to remote host", status);
323 }
324 uv_socket->connect_cb(socket, error);
325 }
326
uv_socket_connect(grpc_custom_socket * socket,const grpc_sockaddr * addr,size_t len,grpc_custom_connect_callback connect_cb)327 static void uv_socket_connect(grpc_custom_socket* socket,
328 const grpc_sockaddr* addr, size_t len,
329 grpc_custom_connect_callback connect_cb) {
330 uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
331 uv_socket->connect_cb = connect_cb;
332 uv_socket->connect_req.data = socket;
333 int status = uv_tcp_connect(&uv_socket->connect_req, uv_socket->handle,
334 (struct sockaddr*)addr, uv_tc_on_connect);
335 if (status != 0) {
336 // The callback will not be called
337 uv_socket->connect_cb(socket, tcp_error_create("connect failed", status));
338 }
339 }
340
handle_addrinfo_result(struct addrinfo * result)341 static grpc_resolved_addresses* handle_addrinfo_result(
342 struct addrinfo* result) {
343 struct addrinfo* resp;
344 size_t i;
345 grpc_resolved_addresses* addresses =
346 (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses));
347 addresses->naddrs = 0;
348 for (resp = result; resp != nullptr; resp = resp->ai_next) {
349 addresses->naddrs++;
350 }
351 addresses->addrs = (grpc_resolved_address*)gpr_malloc(
352 sizeof(grpc_resolved_address) * addresses->naddrs);
353 for (resp = result, i = 0; resp != nullptr; resp = resp->ai_next, i++) {
354 memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
355 addresses->addrs[i].len = resp->ai_addrlen;
356 }
357 // addrinfo objects are allocated by libuv (e.g. in uv_getaddrinfo)
358 // and not by gpr_malloc
359 uv_freeaddrinfo(result);
360 return addresses;
361 }
362
uv_resolve_callback(uv_getaddrinfo_t * req,int status,struct addrinfo * res)363 static void uv_resolve_callback(uv_getaddrinfo_t* req, int status,
364 struct addrinfo* res) {
365 grpc_custom_resolver* r = (grpc_custom_resolver*)req->data;
366 gpr_free(req);
367 grpc_resolved_addresses* result = nullptr;
368 if (status == 0) {
369 result = handle_addrinfo_result(res);
370 }
371 grpc_custom_resolve_callback(r, result,
372 tcp_error_create("getaddrinfo failed", status));
373 }
374
uv_resolve(char * host,char * port,grpc_resolved_addresses ** result)375 static grpc_error* uv_resolve(char* host, char* port,
376 grpc_resolved_addresses** result) {
377 int status;
378 uv_getaddrinfo_t req;
379 struct addrinfo hints;
380 memset(&hints, 0, sizeof(struct addrinfo));
381 hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
382 hints.ai_socktype = SOCK_STREAM; /* stream socket */
383 hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
384 status = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
385 if (status != 0) {
386 *result = nullptr;
387 } else {
388 *result = handle_addrinfo_result(req.addrinfo);
389 }
390 return tcp_error_create("getaddrinfo failed", status);
391 }
392
uv_resolve_async(grpc_custom_resolver * r,char * host,char * port)393 static void uv_resolve_async(grpc_custom_resolver* r, char* host, char* port) {
394 int status;
395 uv_getaddrinfo_t* req =
396 (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t));
397 req->data = r;
398 struct addrinfo hints;
399 memset(&hints, 0, sizeof(struct addrinfo));
400 hints.ai_family = GRPC_AF_UNSPEC; /* ipv4 or ipv6 */
401 hints.ai_socktype = GRPC_SOCK_STREAM; /* stream socket */
402 hints.ai_flags = GRPC_AI_PASSIVE; /* for wildcard IP address */
403 status = uv_getaddrinfo(uv_default_loop(), req, uv_resolve_callback, host,
404 port, &hints);
405 if (status != 0) {
406 gpr_free(req);
407 grpc_error* error = tcp_error_create("getaddrinfo failed", status);
408 grpc_custom_resolve_callback(r, NULL, error);
409 }
410 }
411
412 grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async};
413
414 grpc_socket_vtable grpc_uv_socket_vtable = {
415 uv_socket_init, uv_socket_connect, uv_socket_destroy,
416 uv_socket_shutdown, uv_socket_close, uv_socket_write,
417 uv_socket_read, uv_socket_getpeername, uv_socket_getsockname,
418 uv_socket_bind, uv_socket_listen, uv_socket_accept};
419
420 #endif
421