• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_UV
24 #include <limits.h>
25 #include <string.h>
26 
27 #include <grpc/slice_buffer.h>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/iomgr/error.h"
35 #include "src/core/lib/iomgr/iomgr_custom.h"
36 #include "src/core/lib/iomgr/resolve_address_custom.h"
37 #include "src/core/lib/iomgr/resource_quota.h"
38 #include "src/core/lib/iomgr/tcp_custom.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 #include "src/core/lib/slice/slice_string_helpers.h"
41 
42 #include <uv.h>
43 
44 #define IGNORE_CONST(addr) ((grpc_sockaddr*)(uintptr_t)(addr))
45 
46 typedef struct uv_socket_t {
47   uv_connect_t connect_req;
48   uv_write_t write_req;
49   uv_shutdown_t shutdown_req;
50   uv_tcp_t* handle;
51   uv_buf_t* write_buffers;
52 
53   char* read_buf;
54   size_t read_len;
55 
56   int pending_connections;
57   grpc_custom_socket* accept_socket;
58   grpc_error_handle accept_error;
59 
60   grpc_custom_connect_callback connect_cb;
61   grpc_custom_write_callback write_cb;
62   grpc_custom_read_callback read_cb;
63   grpc_custom_accept_callback accept_cb;
64   grpc_custom_close_callback close_cb;
65 
66 } uv_socket_t;
67 
tcp_error_create(const char * desc,int status)68 static grpc_error_handle tcp_error_create(const char* desc, int status) {
69   if (status == 0) {
70     return GRPC_ERROR_NONE;
71   }
72   grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc);
73   /* All tcp errors are marked with UNAVAILABLE so that application may
74    * choose to retry. */
75   error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
76                              GRPC_STATUS_UNAVAILABLE);
77   return grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
78                             grpc_slice_from_static_string(uv_strerror(status)));
79 }
80 
uv_socket_destroy(grpc_custom_socket * socket)81 static void uv_socket_destroy(grpc_custom_socket* socket) {
82   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
83   gpr_free(uv_socket->handle);
84   gpr_free(uv_socket);
85 }
86 
alloc_uv_buf(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)87 static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size,
88                          uv_buf_t* buf) {
89   uv_socket_t* uv_socket =
90       (uv_socket_t*)((grpc_custom_socket*)handle->data)->impl;
91   (void)suggested_size;
92   buf->base = uv_socket->read_buf;
93   buf->len = uv_socket->read_len;
94 }
95 
uv_read_callback(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)96 static void uv_read_callback(uv_stream_t* stream, ssize_t nread,
97                              const uv_buf_t* buf) {
98   grpc_error_handle error = GRPC_ERROR_NONE;
99   if (nread == 0) {
100     // Nothing happened. Wait for the next callback
101     return;
102   }
103   // TODO(murgatroid99): figure out what the return value here means
104   uv_read_stop(stream);
105   if (nread == UV_EOF) {
106     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
107   } else if (nread < 0) {
108     error = tcp_error_create("TCP Read failed", nread);
109   }
110   grpc_custom_socket* socket = (grpc_custom_socket*)stream->data;
111   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
112   uv_socket->read_cb(socket, (size_t)nread, error);
113 }
114 
uv_close_callback(uv_handle_t * handle)115 static void uv_close_callback(uv_handle_t* handle) {
116   grpc_custom_socket* socket = (grpc_custom_socket*)handle->data;
117   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
118   if (uv_socket->accept_socket) {
119     uv_socket->accept_cb(socket, uv_socket->accept_socket,
120                          GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket closed"));
121   }
122   uv_socket->close_cb(socket);
123 }
124 
uv_socket_read(grpc_custom_socket * socket,char * buffer,size_t length,grpc_custom_read_callback read_cb)125 static void uv_socket_read(grpc_custom_socket* socket, char* buffer,
126                            size_t length, grpc_custom_read_callback read_cb) {
127   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
128   int status;
129   grpc_error_handle error;
130   uv_socket->read_cb = read_cb;
131   uv_socket->read_buf = buffer;
132   uv_socket->read_len = length;
133   // TODO(murgatroid99): figure out what the return value here means
134   status =
135       uv_read_start((uv_stream_t*)uv_socket->handle, (uv_alloc_cb)alloc_uv_buf,
136                     (uv_read_cb)uv_read_callback);
137   if (status != 0) {
138     error = tcp_error_create("TCP Read failed at start", status);
139     uv_socket->read_cb(socket, 0, error);
140   }
141 }
142 
uv_write_callback(uv_write_t * req,int status)143 static void uv_write_callback(uv_write_t* req, int status) {
144   grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
145   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
146   gpr_free(uv_socket->write_buffers);
147   uv_socket->write_cb(socket, tcp_error_create("TCP Write failed", status));
148 }
149 
uv_socket_write(grpc_custom_socket * socket,grpc_slice_buffer * write_slices,grpc_custom_write_callback write_cb)150 void uv_socket_write(grpc_custom_socket* socket,
151                      grpc_slice_buffer* write_slices,
152                      grpc_custom_write_callback write_cb) {
153   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
154   uv_socket->write_cb = write_cb;
155   uv_buf_t* uv_buffers;
156   uv_write_t* write_req;
157 
158   uv_buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * write_slices->count);
159   for (size_t i = 0; i < write_slices->count; i++) {
160     uv_buffers[i].base = (char*)GRPC_SLICE_START_PTR(write_slices->slices[i]);
161     uv_buffers[i].len = GRPC_SLICE_LENGTH(write_slices->slices[i]);
162   }
163 
164   uv_socket->write_buffers = uv_buffers;
165   write_req = &uv_socket->write_req;
166   write_req->data = socket;
167   // TODO(murgatroid99): figure out what the return value here means
168   uv_write(write_req, (uv_stream_t*)uv_socket->handle, uv_buffers,
169            write_slices->count, uv_write_callback);
170 }
171 
shutdown_callback(uv_shutdown_t * req,int status)172 static void shutdown_callback(uv_shutdown_t* req, int status) {}
173 
uv_socket_shutdown(grpc_custom_socket * socket)174 static void uv_socket_shutdown(grpc_custom_socket* socket) {
175   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
176   uv_shutdown_t* req = &uv_socket->shutdown_req;
177   uv_shutdown(req, (uv_stream_t*)uv_socket->handle, shutdown_callback);
178 }
179 
uv_socket_close(grpc_custom_socket * socket,grpc_custom_close_callback close_cb)180 static void uv_socket_close(grpc_custom_socket* socket,
181                             grpc_custom_close_callback close_cb) {
182   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
183   uv_socket->close_cb = close_cb;
184   uv_close((uv_handle_t*)uv_socket->handle, uv_close_callback);
185 }
186 
uv_socket_init_helper(uv_socket_t * uv_socket,int domain)187 static grpc_error_handle uv_socket_init_helper(uv_socket_t* uv_socket,
188                                                int domain) {
189   uv_tcp_t* tcp = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
190   uv_socket->handle = tcp;
191   int status = uv_tcp_init_ex(uv_default_loop(), tcp, (unsigned int)domain);
192   if (status != 0) {
193     return tcp_error_create("Failed to initialize UV tcp handle", status);
194   }
195 #if defined(GPR_LINUX) && defined(SO_REUSEPORT)
196   if (domain == AF_INET || domain == AF_INET6) {
197     int enable = 1;
198     int fd;
199     uv_fileno((uv_handle_t*)tcp, &fd);
200     // TODO Handle error here.
201     setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
202   }
203 #endif
204   uv_socket->write_buffers = nullptr;
205   uv_socket->read_len = 0;
206   uv_tcp_nodelay(uv_socket->handle, 1);
207   // Node uses a garbage collector to call destructors, so we don't
208   // want to hold the uv loop open with active gRPC objects.
209   uv_unref((uv_handle_t*)uv_socket->handle);
210   uv_socket->pending_connections = 0;
211   uv_socket->accept_socket = nullptr;
212   uv_socket->accept_error = GRPC_ERROR_NONE;
213   return GRPC_ERROR_NONE;
214 }
215 
uv_socket_init(grpc_custom_socket * socket,int domain)216 static grpc_error_handle uv_socket_init(grpc_custom_socket* socket,
217                                         int domain) {
218   uv_socket_t* uv_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
219   grpc_error_handle error = uv_socket_init_helper(uv_socket, domain);
220   if (error != GRPC_ERROR_NONE) {
221     return error;
222   }
223   uv_socket->handle->data = socket;
224   socket->impl = uv_socket;
225   return GRPC_ERROR_NONE;
226 }
227 
uv_socket_getpeername(grpc_custom_socket * socket,const grpc_sockaddr * addr,int * addr_len)228 static grpc_error_handle uv_socket_getpeername(grpc_custom_socket* socket,
229                                                const grpc_sockaddr* addr,
230                                                int* addr_len) {
231   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
232   int err = uv_tcp_getpeername(uv_socket->handle,
233                                (struct sockaddr*)IGNORE_CONST(addr), addr_len);
234   return tcp_error_create("getpeername failed", err);
235 }
236 
uv_socket_getsockname(grpc_custom_socket * socket,const grpc_sockaddr * addr,int * addr_len)237 static grpc_error_handle uv_socket_getsockname(grpc_custom_socket* socket,
238                                                const grpc_sockaddr* addr,
239                                                int* addr_len) {
240   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
241   int err = uv_tcp_getsockname(uv_socket->handle,
242                                (struct sockaddr*)IGNORE_CONST(addr), addr_len);
243   return tcp_error_create("getsockname failed", err);
244 }
245 
accept_new_connection(grpc_custom_socket * socket)246 static void accept_new_connection(grpc_custom_socket* socket) {
247   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
248   if (uv_socket->pending_connections == 0 || !uv_socket->accept_socket) {
249     return;
250   }
251   grpc_custom_socket* new_socket = uv_socket->accept_socket;
252   grpc_error_handle error = uv_socket->accept_error;
253   uv_socket->accept_socket = nullptr;
254   uv_socket->accept_error = GRPC_ERROR_NONE;
255   uv_socket->pending_connections -= 1;
256   if (uv_socket->accept_error != GRPC_ERROR_NONE) {
257     uv_stream_t phony_handle;
258     uv_accept((uv_stream_t*)uv_socket->handle, &phony_handle);
259     uv_socket->accept_cb(socket, new_socket, error);
260   } else {
261     uv_socket_t* uv_new_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
262     uv_socket_init_helper(uv_new_socket, AF_UNSPEC);
263     // UV documentation says this is guaranteed to succeed
264     GPR_ASSERT(uv_accept((uv_stream_t*)uv_socket->handle,
265                          (uv_stream_t*)uv_new_socket->handle) == 0);
266     new_socket->impl = uv_new_socket;
267     uv_new_socket->handle->data = new_socket;
268     uv_socket->accept_cb(socket, new_socket, error);
269   }
270 }
271 
uv_on_connect(uv_stream_t * server,int status)272 static void uv_on_connect(uv_stream_t* server, int status) {
273   grpc_custom_socket* socket = (grpc_custom_socket*)server->data;
274   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
275   if (status < 0) {
276     switch (status) {
277       case UV_EINTR:
278       case UV_EAGAIN:
279         return;
280       default:
281         uv_socket->accept_error = tcp_error_create("accept failed", status);
282     }
283   }
284   uv_socket->pending_connections += 1;
285   accept_new_connection(socket);
286 }
287 
uv_socket_accept(grpc_custom_socket * socket,grpc_custom_socket * new_socket,grpc_custom_accept_callback accept_cb)288 void uv_socket_accept(grpc_custom_socket* socket,
289                       grpc_custom_socket* new_socket,
290                       grpc_custom_accept_callback accept_cb) {
291   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
292   uv_socket->accept_cb = accept_cb;
293   GPR_ASSERT(uv_socket->accept_socket == nullptr);
294   uv_socket->accept_socket = new_socket;
295   accept_new_connection(socket);
296 }
297 
uv_socket_bind(grpc_custom_socket * socket,const grpc_sockaddr * addr,size_t len,int flags)298 static grpc_error_handle uv_socket_bind(grpc_custom_socket* socket,
299                                         const grpc_sockaddr* addr, size_t len,
300                                         int flags) {
301   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
302   int status =
303       uv_tcp_bind((uv_tcp_t*)uv_socket->handle, (struct sockaddr*)addr, 0);
304   return tcp_error_create("Failed to bind to port", status);
305 }
306 
uv_socket_listen(grpc_custom_socket * socket)307 static grpc_error_handle uv_socket_listen(grpc_custom_socket* socket) {
308   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
309   int status =
310       uv_listen((uv_stream_t*)uv_socket->handle, SOMAXCONN, uv_on_connect);
311   return tcp_error_create("Failed to listen to port", status);
312 }
313 
uv_tc_on_connect(uv_connect_t * req,int status)314 static void uv_tc_on_connect(uv_connect_t* req, int status) {
315   grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
316   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
317   grpc_error_handle error;
318   if (status == UV_ECANCELED) {
319     // This should only happen if the handle is already closed
320     error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timeout occurred");
321   } else {
322     error = tcp_error_create("Failed to connect to remote host", status);
323   }
324   uv_socket->connect_cb(socket, error);
325 }
326 
uv_socket_connect(grpc_custom_socket * socket,const grpc_sockaddr * addr,size_t len,grpc_custom_connect_callback connect_cb)327 static void uv_socket_connect(grpc_custom_socket* socket,
328                               const grpc_sockaddr* addr, size_t len,
329                               grpc_custom_connect_callback connect_cb) {
330   uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
331   uv_socket->connect_cb = connect_cb;
332   uv_socket->connect_req.data = socket;
333   int status = uv_tcp_connect(&uv_socket->connect_req, uv_socket->handle,
334                               (struct sockaddr*)addr, uv_tc_on_connect);
335   if (status != 0) {
336     // The callback will not be called
337     uv_socket->connect_cb(socket, tcp_error_create("connect failed", status));
338   }
339 }
340 
handle_addrinfo_result(struct addrinfo * result)341 static grpc_resolved_addresses* handle_addrinfo_result(
342     struct addrinfo* result) {
343   struct addrinfo* resp;
344   size_t i;
345   grpc_resolved_addresses* addresses =
346       (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses));
347   addresses->naddrs = 0;
348   for (resp = result; resp != nullptr; resp = resp->ai_next) {
349     addresses->naddrs++;
350   }
351   addresses->addrs = (grpc_resolved_address*)gpr_malloc(
352       sizeof(grpc_resolved_address) * addresses->naddrs);
353   for (resp = result, i = 0; resp != nullptr; resp = resp->ai_next, i++) {
354     memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
355     addresses->addrs[i].len = resp->ai_addrlen;
356   }
357   // addrinfo objects are allocated by libuv (e.g. in uv_getaddrinfo)
358   // and not by gpr_malloc
359   uv_freeaddrinfo(result);
360   return addresses;
361 }
362 
uv_resolve_callback(uv_getaddrinfo_t * req,int status,struct addrinfo * res)363 static void uv_resolve_callback(uv_getaddrinfo_t* req, int status,
364                                 struct addrinfo* res) {
365   grpc_custom_resolver* r = (grpc_custom_resolver*)req->data;
366   gpr_free(req);
367   grpc_resolved_addresses* result = nullptr;
368   if (status == 0) {
369     result = handle_addrinfo_result(res);
370   }
371   grpc_custom_resolve_callback(r, result,
372                                tcp_error_create("getaddrinfo failed", status));
373 }
374 
uv_resolve(const char * host,const char * port,grpc_resolved_addresses ** result)375 static grpc_error_handle uv_resolve(const char* host, const char* port,
376                                     grpc_resolved_addresses** result) {
377   int status;
378   uv_getaddrinfo_t req;
379   struct addrinfo hints;
380   memset(&hints, 0, sizeof(struct addrinfo));
381   hints.ai_family = AF_UNSPEC;     /* ipv4 or ipv6 */
382   hints.ai_socktype = SOCK_STREAM; /* stream socket */
383   hints.ai_flags = AI_PASSIVE;     /* for wildcard IP address */
384   status = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
385   if (status != 0) {
386     *result = nullptr;
387   } else {
388     *result = handle_addrinfo_result(req.addrinfo);
389   }
390   return tcp_error_create("getaddrinfo failed", status);
391 }
392 
uv_resolve_async(grpc_custom_resolver * r,const char * host,const char * port)393 static void uv_resolve_async(grpc_custom_resolver* r, const char* host,
394                              const char* port) {
395   int status;
396   uv_getaddrinfo_t* req =
397       (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t));
398   req->data = r;
399   struct addrinfo hints;
400   memset(&hints, 0, sizeof(struct addrinfo));
401   hints.ai_family = GRPC_AF_UNSPEC;     /* ipv4 or ipv6 */
402   hints.ai_socktype = GRPC_SOCK_STREAM; /* stream socket */
403   hints.ai_flags = GRPC_AI_PASSIVE;     /* for wildcard IP address */
404   status = uv_getaddrinfo(uv_default_loop(), req, uv_resolve_callback, host,
405                           port, &hints);
406   if (status != 0) {
407     gpr_free(req);
408     grpc_error_handle error = tcp_error_create("getaddrinfo failed", status);
409     grpc_custom_resolve_callback(r, NULL, error);
410   }
411 }
412 
413 grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async};
414 
415 grpc_socket_vtable grpc_uv_socket_vtable = {
416     uv_socket_init,     uv_socket_connect,     uv_socket_destroy,
417     uv_socket_shutdown, uv_socket_close,       uv_socket_write,
418     uv_socket_read,     uv_socket_getpeername, uv_socket_getsockname,
419     uv_socket_bind,     uv_socket_listen,      uv_socket_accept};
420 
421 #endif
422