1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# distutils: language=c++ 15 16cimport cpython 17from libc cimport string 18from libc.stdlib cimport malloc, free 19import errno 20gevent_g = None 21gevent_socket = None 22gevent_hub = None 23gevent_event = None 24g_event = None 25g_pool = None 26 27cdef grpc_error* grpc_error_none(): 28 return <grpc_error*>0 29 30cdef grpc_error* socket_error(str syscall, str err): 31 error_str = "{} failed: {}".format(syscall, err) 32 error_bytes = str_to_bytes(error_str) 33 return grpc_socket_error(error_bytes) 34 35cdef resolved_addr_to_tuple(grpc_resolved_address* address): 36 cdef char* res_str 37 port = grpc_sockaddr_get_port(address) 38 str_len = grpc_sockaddr_to_string(&res_str, address, 0) 39 byte_str = _decode(<bytes>res_str[:str_len]) 40 if byte_str.endswith(':' + str(port)): 41 byte_str = byte_str[:(0 - len(str(port)) - 1)] 42 byte_str = byte_str.lstrip('[') 43 byte_str = byte_str.rstrip(']') 44 byte_str = '{}'.format(byte_str) 45 return byte_str, port 46 47cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): 48 cdef grpc_resolved_address c_addr 49 string.memcpy(<void*>c_addr.addr, <void*> address, length) 50 c_addr.len = length 51 return resolved_addr_to_tuple(&c_addr) 52 53cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): 54 cdef grpc_resolved_address c_addr 55 string.memcpy(<void*>c_addr.addr, <void*> address, length) 56 c_addr.len = length 57 return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' 58 59cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): 60 cdef grpc_resolved_addresses* addresses 61 tups_set = set((tup[4][0], tup[4][1]) for tup in tups) 62 addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses)) 63 addresses.naddrs = len(tups_set) 64 addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set)) 65 i = 0 66 for tup in set(tups_set): 67 hostname = str_to_bytes(tup[0]) 68 grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) 69 i += 1 70 return addresses 71 72def _spawn_greenlet(*args): 73 greenlet = g_pool.spawn(*args) 74 75############################### 76### socket implementation ### 77############################### 78 79cdef class SocketWrapper: 80 def __cinit__(self): 81 self.sockopts = [] 82 self.socket = None 83 self.c_socket = NULL 84 self.c_buffer = NULL 85 self.len = 0 86 87cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil: 88 sw = SocketWrapper() 89 sw.c_socket = socket 90 sw.sockopts = [] 91 cpython.Py_INCREF(sw) 92 # Python doesn't support AF_UNSPEC sockets, so we defer creation until 93 # bind/connect when we know what type of socket we need 94 sw.socket = None 95 sw.closed = False 96 sw.accepting_socket = NULL 97 socket.impl = <void*>sw 98 return grpc_error_none() 99 100cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple): 101 try: 102 socket_wrapper.socket.connect(addr_tuple) 103 socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 104 grpc_error_none()) 105 except IOError as io_error: 106 socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 107 socket_error("connect", str(io_error))) 108 g_event.set() 109 110def socket_connect_async(socket_wrapper, addr_tuple): 111 socket_connect_async_cython(socket_wrapper, addr_tuple) 112 113cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr, 114 size_t addr_len, 115 grpc_custom_connect_callback cb) with gil: 116 py_socket = None 117 socket_wrapper = <SocketWrapper>socket.impl 118 socket_wrapper.connect_cb = cb 119 addr_tuple = sockaddr_to_tuple(addr, addr_len) 120 if sockaddr_is_ipv4(addr, addr_len): 121 py_socket = gevent_socket.socket(gevent_socket.AF_INET) 122 else: 123 py_socket = gevent_socket.socket(gevent_socket.AF_INET6) 124 applysockopts(py_socket) 125 socket_wrapper.socket = py_socket 126 _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple) 127 128cdef void socket_destroy(grpc_custom_socket* socket) with gil: 129 cpython.Py_DECREF(<SocketWrapper>socket.impl) 130 131cdef void socket_shutdown(grpc_custom_socket* socket) with gil: 132 try: 133 (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR) 134 except IOError as io_error: 135 if io_error.errno != errno.ENOTCONN: 136 raise io_error 137 138cdef void socket_close(grpc_custom_socket* socket, 139 grpc_custom_close_callback cb) with gil: 140 socket_wrapper = (<SocketWrapper>socket.impl) 141 if socket_wrapper.socket is not None: 142 socket_wrapper.socket.close() 143 socket_wrapper.closed = True 144 socket_wrapper.close_cb = cb 145 # Delay the close callback until the accept() call has picked it up 146 if socket_wrapper.accepting_socket != NULL: 147 return 148 socket_wrapper.close_cb(socket) 149 150def socket_sendmsg(socket, write_bytes): 151 try: 152 return socket.sendmsg(write_bytes) 153 except AttributeError: 154 # sendmsg not available on all Pythons/Platforms 155 return socket.send(b''.join(write_bytes)) 156 157cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes): 158 try: 159 while write_bytes: 160 sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes) 161 while sent_byte_count > 0: 162 if sent_byte_count < len(write_bytes[0]): 163 write_bytes[0] = write_bytes[0][sent_byte_count:] 164 sent_byte_count = 0 165 else: 166 sent_byte_count -= len(write_bytes[0]) 167 write_bytes = write_bytes[1:] 168 socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 169 grpc_error_none()) 170 except IOError as io_error: 171 socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 172 socket_error("send", str(io_error))) 173 g_event.set() 174 175def socket_write_async(socket_wrapper, write_bytes): 176 socket_write_async_cython(socket_wrapper, write_bytes) 177 178cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer, 179 grpc_custom_write_callback cb) with gil: 180 cdef char* start 181 sw = <SocketWrapper>socket.impl 182 sw.write_cb = cb 183 write_bytes = [] 184 for i in range(buffer.count): 185 start = grpc_slice_buffer_start(buffer, i) 186 length = grpc_slice_buffer_length(buffer, i) 187 write_bytes.append(<bytes>start[:length]) 188 _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes) 189 190cdef socket_read_async_cython(SocketWrapper socket_wrapper): 191 cdef char* buff_char_arr 192 try: 193 buff_str = socket_wrapper.socket.recv(socket_wrapper.len) 194 buff_char_arr = buff_str 195 string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str)) 196 socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 197 len(buff_str), grpc_error_none()) 198 except IOError as io_error: 199 socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, 200 -1, socket_error("recv", str(io_error))) 201 g_event.set() 202 203def socket_read_async(socket_wrapper): 204 socket_read_async_cython(socket_wrapper) 205 206cdef void socket_read(grpc_custom_socket* socket, char* buffer, 207 size_t length, grpc_custom_read_callback cb) with gil: 208 sw = <SocketWrapper>socket.impl 209 sw.read_cb = cb 210 sw.c_buffer = buffer 211 sw.len = length 212 _spawn_greenlet(socket_read_async, sw) 213 214cdef grpc_error* socket_getpeername(grpc_custom_socket* socket, 215 const grpc_sockaddr* addr, 216 int* length) with gil: 217 cdef char* src_buf 218 peer = (<SocketWrapper>socket.impl).socket.getpeername() 219 220 cdef grpc_resolved_address c_addr 221 hostname = str_to_bytes(peer[0]) 222 grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) 223 string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) 224 length[0] = c_addr.len 225 return grpc_error_none() 226 227cdef grpc_error* socket_getsockname(grpc_custom_socket* socket, 228 const grpc_sockaddr* addr, 229 int* length) with gil: 230 cdef char* src_buf 231 cdef grpc_resolved_address c_addr 232 if (<SocketWrapper>socket.impl).socket is None: 233 peer = ('0.0.0.0', 0) 234 else: 235 peer = (<SocketWrapper>socket.impl).socket.getsockname() 236 hostname = str_to_bytes(peer[0]) 237 grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) 238 string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) 239 length[0] = c_addr.len 240 return grpc_error_none() 241 242def applysockopts(s): 243 s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) 244 s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) 245 246cdef grpc_error* socket_bind(grpc_custom_socket* socket, 247 const grpc_sockaddr* addr, 248 size_t len, int flags) with gil: 249 addr_tuple = sockaddr_to_tuple(addr, len) 250 try: 251 try: 252 py_socket = gevent_socket.socket(gevent_socket.AF_INET) 253 applysockopts(py_socket) 254 py_socket.bind(addr_tuple) 255 except gevent_socket.gaierror as e: 256 py_socket = gevent_socket.socket(gevent_socket.AF_INET6) 257 applysockopts(py_socket) 258 py_socket.bind(addr_tuple) 259 (<SocketWrapper>socket.impl).socket = py_socket 260 except IOError as io_error: 261 return socket_error("bind", str(io_error)) 262 else: 263 return grpc_error_none() 264 265cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil: 266 (<SocketWrapper>socket.impl).socket.listen(50) 267 return grpc_error_none() 268 269cdef void accept_callback_cython(SocketWrapper s): 270 try: 271 conn, address = s.socket.accept() 272 sw = SocketWrapper() 273 sw.closed = False 274 sw.c_socket = s.accepting_socket 275 sw.sockopts = [] 276 sw.socket = conn 277 sw.c_socket.impl = <void*>sw 278 sw.accepting_socket = NULL 279 cpython.Py_INCREF(sw) 280 s.accepting_socket = NULL 281 s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none()) 282 except IOError as io_error: 283 #TODO actual error 284 s.accepting_socket = NULL 285 s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket, 286 socket_error("accept", str(io_error))) 287 if s.closed: 288 s.close_cb(<grpc_custom_socket*>s.c_socket) 289 g_event.set() 290 291def socket_accept_async(s): 292 accept_callback_cython(s) 293 294cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, 295 grpc_custom_accept_callback cb) with gil: 296 sw = <SocketWrapper>socket.impl 297 sw.accepting_socket = client 298 sw.accept_cb = cb 299 _spawn_greenlet(socket_accept_async, sw) 300 301##################################### 302######Resolver implementation ####### 303##################################### 304 305cdef class ResolveWrapper: 306 def __cinit__(self): 307 self.c_resolver = NULL 308 self.c_host = NULL 309 self.c_port = NULL 310 311cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): 312 try: 313 res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) 314 grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, 315 tuples_to_resolvaddr(res), grpc_error_none()) 316 except IOError as io_error: 317 grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, 318 <grpc_resolved_addresses*>0, 319 socket_error("getaddrinfo", str(io_error))) 320 g_event.set() 321 322def socket_resolve_async_python(resolve_wrapper): 323 socket_resolve_async_cython(resolve_wrapper) 324 325cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil: 326 rw = ResolveWrapper() 327 rw.c_resolver = r 328 rw.c_host = host 329 rw.c_port = port 330 _spawn_greenlet(socket_resolve_async_python, rw) 331 332cdef grpc_error* socket_resolve(char* host, char* port, 333 grpc_resolved_addresses** res) with gil: 334 try: 335 result = gevent_socket.getaddrinfo(host, port) 336 res[0] = tuples_to_resolvaddr(result) 337 return grpc_error_none() 338 except IOError as io_error: 339 return socket_error("getaddrinfo", str(io_error)) 340 341############################### 342### timer implementation ###### 343############################### 344 345cdef class TimerWrapper: 346 def __cinit__(self, deadline): 347 self.timer = gevent_hub.get_hub().loop.timer(deadline) 348 self.event = None 349 350 def start(self): 351 self.event = gevent_event.Event() 352 self.timer.start(self.on_finish) 353 354 def on_finish(self): 355 grpc_custom_timer_callback(self.c_timer, grpc_error_none()) 356 self.timer.stop() 357 g_event.set() 358 359 def stop(self): 360 self.event.set() 361 self.timer.stop() 362 363cdef void timer_start(grpc_custom_timer* t) with gil: 364 timer = TimerWrapper(t.timeout_ms / 1000.0) 365 timer.c_timer = t 366 t.timer = <void*>timer 367 timer.start() 368 369cdef void timer_stop(grpc_custom_timer* t) with gil: 370 time_wrapper = <object>t.timer 371 time_wrapper.stop() 372 373############################### 374### pollset implementation ### 375############################### 376 377cdef void init_loop() with gil: 378 pass 379 380cdef void destroy_loop() with gil: 381 g_pool.join() 382 383cdef void kick_loop() with gil: 384 g_event.set() 385 386cdef void run_loop(size_t timeout_ms) with gil: 387 timeout = timeout_ms / 1000.0 388 if timeout_ms > 0: 389 g_event.wait(timeout) 390 g_event.clear() 391 392############################### 393### Initializer ############### 394############################### 395 396cdef grpc_socket_vtable gevent_socket_vtable 397cdef grpc_custom_resolver_vtable gevent_resolver_vtable 398cdef grpc_custom_timer_vtable gevent_timer_vtable 399cdef grpc_custom_poller_vtable gevent_pollset_vtable 400 401def init_grpc_gevent(): 402 # Lazily import gevent 403 global gevent_socket 404 global gevent_g 405 global gevent_hub 406 global gevent_event 407 global g_event 408 global g_pool 409 import gevent 410 gevent_g = gevent 411 import gevent.socket 412 gevent_socket = gevent.socket 413 import gevent.hub 414 gevent_hub = gevent.hub 415 import gevent.event 416 gevent_event = gevent.event 417 import gevent.pool 418 419 g_event = gevent.event.Event() 420 g_pool = gevent.pool.Group() 421 422 def cb_func(cb, args): 423 _spawn_greenlet(cb, *args) 424 set_async_callback_func(cb_func) 425 426 gevent_resolver_vtable.resolve = socket_resolve 427 gevent_resolver_vtable.resolve_async = socket_resolve_async 428 429 gevent_socket_vtable.init = socket_init 430 gevent_socket_vtable.connect = socket_connect 431 gevent_socket_vtable.destroy = socket_destroy 432 gevent_socket_vtable.shutdown = socket_shutdown 433 gevent_socket_vtable.close = socket_close 434 gevent_socket_vtable.write = socket_write 435 gevent_socket_vtable.read = socket_read 436 gevent_socket_vtable.getpeername = socket_getpeername 437 gevent_socket_vtable.getsockname = socket_getsockname 438 gevent_socket_vtable.bind = socket_bind 439 gevent_socket_vtable.listen = socket_listen 440 gevent_socket_vtable.accept = socket_accept 441 442 gevent_timer_vtable.start = timer_start 443 gevent_timer_vtable.stop = timer_stop 444 445 gevent_pollset_vtable.init = init_loop 446 gevent_pollset_vtable.poll = run_loop 447 gevent_pollset_vtable.kick = kick_loop 448 gevent_pollset_vtable.shutdown = destroy_loop 449 450 grpc_custom_iomgr_init(&gevent_socket_vtable, 451 &gevent_resolver_vtable, 452 &gevent_timer_vtable, 453 &gevent_pollset_vtable) 454