1# nghttp2 - HTTP/2 C Library 2 3# Copyright (c) 2013 Tatsuhiro Tsujikawa 4 5# Permission is hereby granted, free of charge, to any person obtaining 6# a copy of this software and associated documentation files (the 7# "Software"), to deal in the Software without restriction, including 8# without limitation the rights to use, copy, modify, merge, publish, 9# distribute, sublicense, and/or sell copies of the Software, and to 10# permit persons to whom the Software is furnished to do so, subject to 11# the following conditions: 12 13# The above copyright notice and this permission notice shall be 14# included in all copies or substantial portions of the Software. 15 16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 20# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 21# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 22# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 23cimport cnghttp2 24 25from libc.stdlib cimport malloc, free 26from libc.string cimport memcpy, memset 27from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t 28import logging 29 30 31DEFAULT_HEADER_TABLE_SIZE = cnghttp2.NGHTTP2_DEFAULT_HEADER_TABLE_SIZE 32DEFLATE_MAX_HEADER_TABLE_SIZE = 4096 33 34HD_ENTRY_OVERHEAD = 32 35 36class HDTableEntry: 37 38 def __init__(self, name, namelen, value, valuelen): 39 self.name = name 40 self.namelen = namelen 41 self.value = value 42 self.valuelen = valuelen 43 44 def space(self): 45 return self.namelen + self.valuelen + HD_ENTRY_OVERHEAD 46 47cdef _get_pybytes(uint8_t *b, uint16_t blen): 48 return b[:blen] 49 50cdef class HDDeflater: 51 '''Performs header compression. The constructor takes 52 |hd_table_bufsize_max| parameter, which limits the usage of header 53 table in the given amount of bytes. This is necessary because the 54 header compressor and decompressor share the same amount of 55 header table and the decompressor decides that number. The 56 compressor may not want to use all header table size because of 57 limited memory availability. In that case, the 58 |hd_table_bufsize_max| can be used to cap the upper limit of table 59 size whatever the header table size is chosen by the decompressor. 60 The default value of |hd_table_bufsize_max| is 4096 bytes. 61 62 The following example shows how to compress request header sets: 63 64 import binascii, nghttp2 65 66 deflater = nghttp2.HDDeflater() 67 res = deflater.deflate([(b'foo', b'bar'), 68 (b'baz', b'buz')]) 69 print(binascii.b2a_hex(res)) 70 71 ''' 72 73 cdef cnghttp2.nghttp2_hd_deflater *_deflater 74 75 def __cinit__(self, hd_table_bufsize_max = DEFLATE_MAX_HEADER_TABLE_SIZE): 76 rv = cnghttp2.nghttp2_hd_deflate_new(&self._deflater, 77 hd_table_bufsize_max) 78 if rv != 0: 79 raise Exception(_strerror(rv)) 80 81 def __dealloc__(self): 82 cnghttp2.nghttp2_hd_deflate_del(self._deflater) 83 84 def deflate(self, headers): 85 '''Compresses the |headers|. The |headers| must be sequence of tuple 86 of name/value pair, which are sequence of bytes (not unicode 87 string). 88 89 This function returns the encoded header block in byte string. 90 An exception will be raised on error. 91 92 ''' 93 cdef cnghttp2.nghttp2_nv *nva = <cnghttp2.nghttp2_nv*>\ 94 malloc(sizeof(cnghttp2.nghttp2_nv)*\ 95 len(headers)) 96 cdef cnghttp2.nghttp2_nv *nvap = nva 97 98 for k, v in headers: 99 nvap[0].name = k 100 nvap[0].namelen = len(k) 101 nvap[0].value = v 102 nvap[0].valuelen = len(v) 103 nvap[0].flags = cnghttp2.NGHTTP2_NV_FLAG_NONE 104 nvap += 1 105 106 cdef size_t outcap = 0 107 cdef ssize_t rv 108 cdef uint8_t *out 109 cdef size_t outlen 110 111 outlen = cnghttp2.nghttp2_hd_deflate_bound(self._deflater, 112 nva, len(headers)) 113 114 out = <uint8_t*>malloc(outlen) 115 116 rv = cnghttp2.nghttp2_hd_deflate_hd(self._deflater, out, outlen, 117 nva, len(headers)) 118 free(nva) 119 120 if rv < 0: 121 free(out) 122 123 raise Exception(_strerror(rv)) 124 125 cdef bytes res 126 127 try: 128 res = out[:rv] 129 finally: 130 free(out) 131 132 return res 133 134 def change_table_size(self, hd_table_bufsize_max): 135 '''Changes header table size to |hd_table_bufsize_max| byte. 136 137 An exception will be raised on error. 138 139 ''' 140 cdef int rv 141 rv = cnghttp2.nghttp2_hd_deflate_change_table_size(self._deflater, 142 hd_table_bufsize_max) 143 if rv != 0: 144 raise Exception(_strerror(rv)) 145 146 def get_hd_table(self): 147 '''Returns copy of current dynamic header table.''' 148 cdef size_t length = cnghttp2.nghttp2_hd_deflate_get_num_table_entries( 149 self._deflater) 150 cdef const cnghttp2.nghttp2_nv *nv 151 res = [] 152 for i in range(62, length + 1): 153 nv = cnghttp2.nghttp2_hd_deflate_get_table_entry(self._deflater, i) 154 k = _get_pybytes(nv.name, nv.namelen) 155 v = _get_pybytes(nv.value, nv.valuelen) 156 res.append(HDTableEntry(k, nv.namelen, v, nv.valuelen)) 157 return res 158 159cdef class HDInflater: 160 '''Performs header decompression. 161 162 The following example shows how to compress request header sets: 163 164 data = b'0082c5ad82bd0f000362617a0362757a' 165 inflater = nghttp2.HDInflater() 166 hdrs = inflater.inflate(data) 167 print(hdrs) 168 169 ''' 170 171 cdef cnghttp2.nghttp2_hd_inflater *_inflater 172 173 def __cinit__(self): 174 rv = cnghttp2.nghttp2_hd_inflate_new(&self._inflater) 175 if rv != 0: 176 raise Exception(_strerror(rv)) 177 178 def __dealloc__(self): 179 cnghttp2.nghttp2_hd_inflate_del(self._inflater) 180 181 def inflate(self, data): 182 '''Decompresses the compressed header block |data|. The |data| must be 183 byte string (not unicode string). 184 185 ''' 186 cdef cnghttp2.nghttp2_nv nv 187 cdef int inflate_flags 188 cdef ssize_t rv 189 cdef uint8_t *buf = data 190 cdef size_t buflen = len(data) 191 res = [] 192 while True: 193 inflate_flags = 0 194 rv = cnghttp2.nghttp2_hd_inflate_hd2(self._inflater, &nv, 195 &inflate_flags, 196 buf, buflen, 1) 197 if rv < 0: 198 raise Exception(_strerror(rv)) 199 buf += rv 200 buflen -= rv 201 if inflate_flags & cnghttp2.NGHTTP2_HD_INFLATE_EMIT: 202 # may throw 203 res.append((nv.name[:nv.namelen], nv.value[:nv.valuelen])) 204 if inflate_flags & cnghttp2.NGHTTP2_HD_INFLATE_FINAL: 205 break 206 207 cnghttp2.nghttp2_hd_inflate_end_headers(self._inflater) 208 return res 209 210 def change_table_size(self, hd_table_bufsize_max): 211 '''Changes header table size to |hd_table_bufsize_max| byte. 212 213 An exception will be raised on error. 214 215 ''' 216 cdef int rv 217 rv = cnghttp2.nghttp2_hd_inflate_change_table_size(self._inflater, 218 hd_table_bufsize_max) 219 if rv != 0: 220 raise Exception(_strerror(rv)) 221 222 def get_hd_table(self): 223 '''Returns copy of current dynamic header table.''' 224 cdef size_t length = cnghttp2.nghttp2_hd_inflate_get_num_table_entries( 225 self._inflater) 226 cdef const cnghttp2.nghttp2_nv *nv 227 res = [] 228 for i in range(62, length + 1): 229 nv = cnghttp2.nghttp2_hd_inflate_get_table_entry(self._inflater, i) 230 k = _get_pybytes(nv.name, nv.namelen) 231 v = _get_pybytes(nv.value, nv.valuelen) 232 res.append(HDTableEntry(k, nv.namelen, v, nv.valuelen)) 233 return res 234 235cdef _strerror(int liberror_code): 236 return cnghttp2.nghttp2_strerror(liberror_code).decode('utf-8') 237 238def print_hd_table(hdtable): 239 '''Convenient function to print |hdtable| to the standard output. This 240 function does not work if header name/value cannot be decoded using 241 UTF-8 encoding. 242 243 s=N means the entry occupies N bytes in header table. 244 245 ''' 246 idx = 0 247 for entry in hdtable: 248 idx += 1 249 print('[{}] (s={}) {}: {}'\ 250 .format(idx, entry.space(), 251 entry.name.decode('utf-8'), 252 entry.value.decode('utf-8'))) 253 254try: 255 import socket 256 import io 257 import asyncio 258 import traceback 259 import sys 260 import email.utils 261 import datetime 262 import time 263 import ssl as tls 264 from urllib.parse import urlparse 265except ImportError: 266 asyncio = None 267 268# body generator flags 269DATA_OK = 0 270DATA_EOF = 1 271DATA_DEFERRED = 2 272 273class _ByteIOWrapper: 274 275 def __init__(self, b): 276 self.b = b 277 278 def generate(self, n): 279 data = self.b.read1(n) 280 if not data: 281 return None, DATA_EOF 282 return data, DATA_OK 283 284def wrap_body(body): 285 if body is None: 286 return body 287 elif isinstance(body, str): 288 return _ByteIOWrapper(io.BytesIO(body.encode('utf-8'))).generate 289 elif isinstance(body, bytes): 290 return _ByteIOWrapper(io.BytesIO(body)).generate 291 elif isinstance(body, io.IOBase): 292 return _ByteIOWrapper(body).generate 293 else: 294 # assume that callable in the form f(n) returning tuple byte 295 # string and flag. 296 return body 297 298def negotiated_protocol(ssl_obj): 299 protocol = ssl_obj.selected_alpn_protocol() 300 if protocol: 301 logging.info('alpn, protocol:%s', protocol) 302 return protocol 303 304 protocol = ssl_obj.selected_npn_protocol() 305 if protocol: 306 logging.info('npn, protocol:%s', protocol) 307 return protocol 308 309 return None 310 311def set_application_protocol(ssl_ctx): 312 app_protos = [cnghttp2.NGHTTP2_PROTO_VERSION_ID.decode('utf-8')] 313 ssl_ctx.set_npn_protocols(app_protos) 314 if tls.HAS_ALPN: 315 ssl_ctx.set_alpn_protocols(app_protos) 316 317cdef _get_stream_user_data(cnghttp2.nghttp2_session *session, 318 int32_t stream_id): 319 cdef void *stream_user_data 320 321 stream_user_data = cnghttp2.nghttp2_session_get_stream_user_data\ 322 (session, stream_id) 323 if stream_user_data == NULL: 324 return None 325 326 return <object>stream_user_data 327 328cdef size_t _make_nva(cnghttp2.nghttp2_nv **nva_ptr, headers): 329 cdef cnghttp2.nghttp2_nv *nva 330 cdef size_t nvlen 331 332 nvlen = len(headers) 333 nva = <cnghttp2.nghttp2_nv*>malloc(sizeof(cnghttp2.nghttp2_nv) * nvlen) 334 for i, (k, v) in enumerate(headers): 335 nva[i].name = k 336 nva[i].namelen = len(k) 337 nva[i].value = v 338 nva[i].valuelen = len(v) 339 nva[i].flags = cnghttp2.NGHTTP2_NV_FLAG_NONE 340 341 nva_ptr[0] = nva 342 343 return nvlen 344 345cdef int server_on_header(cnghttp2.nghttp2_session *session, 346 const cnghttp2.nghttp2_frame *frame, 347 const uint8_t *name, size_t namelen, 348 const uint8_t *value, size_t valuelen, 349 uint8_t flags, 350 void *user_data): 351 cdef http2 = <_HTTP2SessionCoreBase>user_data 352 logging.debug('server_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) 353 354 handler = _get_stream_user_data(session, frame.hd.stream_id) 355 return on_header(name, namelen, value, valuelen, flags, handler) 356 357cdef int client_on_header(cnghttp2.nghttp2_session *session, 358 const cnghttp2.nghttp2_frame *frame, 359 const uint8_t *name, size_t namelen, 360 const uint8_t *value, size_t valuelen, 361 uint8_t flags, 362 void *user_data): 363 cdef http2 = <_HTTP2SessionCoreBase>user_data 364 logging.debug('client_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) 365 366 if frame.hd.type == cnghttp2.NGHTTP2_HEADERS: 367 handler = _get_stream_user_data(session, frame.hd.stream_id) 368 elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: 369 handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id) 370 371 return on_header(name, namelen, value, valuelen, flags, handler) 372 373 374cdef int on_header(const uint8_t *name, size_t namelen, 375 const uint8_t *value, size_t valuelen, 376 uint8_t flags, 377 object handler): 378 if not handler: 379 return 0 380 381 key = name[:namelen] 382 values = value[:valuelen].split(b'\x00') 383 if key == b':scheme': 384 handler.scheme = values[0] 385 elif key == b':method': 386 handler.method = values[0] 387 elif key == b':authority' or key == b'host': 388 handler.host = values[0] 389 elif key == b':path': 390 handler.path = values[0] 391 elif key == b':status': 392 handler.status = values[0] 393 394 if key == b'cookie': 395 handler.cookies.extend(values) 396 else: 397 for v in values: 398 handler.headers.append((key, v)) 399 400 return 0 401 402cdef int server_on_begin_request_headers(cnghttp2.nghttp2_session *session, 403 const cnghttp2.nghttp2_frame *frame, 404 void *user_data): 405 cdef http2 = <_HTTP2SessionCore>user_data 406 407 handler = http2._make_handler(frame.hd.stream_id) 408 cnghttp2.nghttp2_session_set_stream_user_data(session, frame.hd.stream_id, 409 <void*>handler) 410 411 return 0 412 413cdef int server_on_begin_headers(cnghttp2.nghttp2_session *session, 414 const cnghttp2.nghttp2_frame *frame, 415 void *user_data): 416 if frame.hd.type == cnghttp2.NGHTTP2_HEADERS: 417 if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST: 418 return server_on_begin_request_headers(session, frame, user_data) 419 420 return 0 421 422cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session, 423 const cnghttp2.nghttp2_frame *frame, 424 void *user_data): 425 cdef http2 = <_HTTP2SessionCore>user_data 426 logging.debug('server_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) 427 428 if frame.hd.type == cnghttp2.NGHTTP2_DATA: 429 if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: 430 handler = _get_stream_user_data(session, frame.hd.stream_id) 431 if not handler: 432 return 0 433 try: 434 handler.on_request_done() 435 except: 436 sys.stderr.write(traceback.format_exc()) 437 return http2._rst_stream(frame.hd.stream_id) 438 elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS: 439 if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST: 440 handler = _get_stream_user_data(session, frame.hd.stream_id) 441 if not handler: 442 return 0 443 if handler.cookies: 444 handler.headers.append((b'cookie', 445 b'; '.join(handler.cookies))) 446 handler.cookies = None 447 try: 448 handler.on_headers() 449 if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: 450 handler.on_request_done() 451 except: 452 sys.stderr.write(traceback.format_exc()) 453 return http2._rst_stream(frame.hd.stream_id) 454 elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: 455 if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK): 456 http2._stop_settings_timer() 457 458 return 0 459 460cdef int on_data_chunk_recv(cnghttp2.nghttp2_session *session, 461 uint8_t flags, 462 int32_t stream_id, const uint8_t *data, 463 size_t length, void *user_data): 464 cdef http2 = <_HTTP2SessionCoreBase>user_data 465 466 handler = _get_stream_user_data(session, stream_id) 467 if not handler: 468 return 0 469 470 try: 471 handler.on_data(data[:length]) 472 except: 473 sys.stderr.write(traceback.format_exc()) 474 return http2._rst_stream(stream_id) 475 476 return 0 477 478cdef int server_on_frame_send(cnghttp2.nghttp2_session *session, 479 const cnghttp2.nghttp2_frame *frame, 480 void *user_data): 481 cdef http2 = <_HTTP2SessionCore>user_data 482 logging.debug('server_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) 483 484 if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: 485 # For PUSH_PROMISE, send push response immediately 486 handler = _get_stream_user_data\ 487 (session, frame.push_promise.promised_stream_id) 488 if not handler: 489 return 0 490 491 http2.send_response(handler) 492 elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: 493 if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) != 0: 494 return 0 495 http2._start_settings_timer() 496 elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS: 497 if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM) and \ 498 cnghttp2.nghttp2_session_check_server_session(session): 499 # Send RST_STREAM if remote is not closed yet 500 if cnghttp2.nghttp2_session_get_stream_remote_close( 501 session, frame.hd.stream_id) == 0: 502 http2._rst_stream(frame.hd.stream_id, cnghttp2.NGHTTP2_NO_ERROR) 503 504cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session, 505 const cnghttp2.nghttp2_frame *frame, 506 int lib_error_code, 507 void *user_data): 508 cdef http2 = <_HTTP2SessionCore>user_data 509 logging.debug('server_on_frame_not_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) 510 511 if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: 512 # We have to remove handler here. Without this, it is not 513 # removed until session is terminated. 514 handler = _get_stream_user_data\ 515 (session, frame.push_promise.promised_stream_id) 516 if not handler: 517 return 0 518 http2._remove_handler(handler) 519 520cdef int on_stream_close(cnghttp2.nghttp2_session *session, 521 int32_t stream_id, 522 uint32_t error_code, 523 void *user_data): 524 cdef http2 = <_HTTP2SessionCoreBase>user_data 525 logging.debug('on_stream_close, stream_id:%s', stream_id) 526 527 handler = _get_stream_user_data(session, stream_id) 528 if not handler: 529 return 0 530 531 try: 532 handler.on_close(error_code) 533 except: 534 sys.stderr.write(traceback.format_exc()) 535 536 http2._remove_handler(handler) 537 538 return 0 539 540cdef ssize_t data_source_read(cnghttp2.nghttp2_session *session, 541 int32_t stream_id, 542 uint8_t *buf, size_t length, 543 uint32_t *data_flags, 544 cnghttp2.nghttp2_data_source *source, 545 void *user_data): 546 cdef http2 = <_HTTP2SessionCoreBase>user_data 547 generator = <object>source.ptr 548 549 http2.enter_callback() 550 try: 551 data, flag = generator(length) 552 except: 553 sys.stderr.write(traceback.format_exc()) 554 return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; 555 finally: 556 http2.leave_callback() 557 558 if flag == DATA_DEFERRED: 559 return cnghttp2.NGHTTP2_ERR_DEFERRED 560 561 if data: 562 nread = len(data) 563 memcpy(buf, <uint8_t*>data, nread) 564 else: 565 nread = 0 566 567 if flag == DATA_EOF: 568 data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF 569 if cnghttp2.nghttp2_session_check_server_session(session): 570 # Send RST_STREAM if remote is not closed yet 571 if cnghttp2.nghttp2_session_get_stream_remote_close( 572 session, stream_id) == 0: 573 http2._rst_stream(stream_id, cnghttp2.NGHTTP2_NO_ERROR) 574 elif flag != DATA_OK: 575 return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE 576 577 return nread 578 579cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session, 580 const cnghttp2.nghttp2_frame *frame, 581 void *user_data): 582 cdef http2 = <_HTTP2ClientSessionCore>user_data 583 584 if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: 585 # Generate a temporary handler until the headers are all received 586 push_handler = BaseResponseHandler() 587 http2._add_handler(push_handler, frame.push_promise.promised_stream_id) 588 cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id, 589 <void*>push_handler) 590 591 return 0 592 593cdef int client_on_frame_recv(cnghttp2.nghttp2_session *session, 594 const cnghttp2.nghttp2_frame *frame, 595 void *user_data): 596 cdef http2 = <_HTTP2ClientSessionCore>user_data 597 logging.debug('client_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) 598 599 if frame.hd.type == cnghttp2.NGHTTP2_DATA: 600 if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: 601 handler = _get_stream_user_data(session, frame.hd.stream_id) 602 if not handler: 603 return 0 604 try: 605 handler.on_response_done() 606 except: 607 sys.stderr.write(traceback.format_exc()) 608 return http2._rst_stream(frame.hd.stream_id) 609 elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS: 610 if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_RESPONSE or frame.headers.cat == cnghttp2.NGHTTP2_HCAT_PUSH_RESPONSE: 611 handler = _get_stream_user_data(session, frame.hd.stream_id) 612 613 if not handler: 614 return 0 615 # TODO handle 1xx non-final response 616 if handler.cookies: 617 handler.headers.append((b'cookie', 618 b'; '.join(handler.cookies))) 619 handler.cookies = None 620 try: 621 handler.on_headers() 622 if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM: 623 handler.on_response_done() 624 except: 625 sys.stderr.write(traceback.format_exc()) 626 return http2._rst_stream(frame.hd.stream_id) 627 elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: 628 if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK): 629 http2._stop_settings_timer() 630 elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE: 631 handler = _get_stream_user_data(session, frame.hd.stream_id) 632 if not handler: 633 return 0 634 # Get the temporary push_handler which now should have all of the header data 635 push_handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id) 636 if not push_handler: 637 return 0 638 # Remove the temporary handler 639 http2._remove_handler(push_handler) 640 cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id, 641 <void*>NULL) 642 643 try: 644 handler.on_push_promise(push_handler) 645 except: 646 sys.stderr.write(traceback.format_exc()) 647 return http2._rst_stream(frame.hd.stream_id) 648 649 return 0 650 651cdef int client_on_frame_send(cnghttp2.nghttp2_session *session, 652 const cnghttp2.nghttp2_frame *frame, 653 void *user_data): 654 cdef http2 = <_HTTP2ClientSessionCore>user_data 655 logging.debug('client_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id) 656 657 if frame.hd.type == cnghttp2.NGHTTP2_SETTINGS: 658 if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) != 0: 659 return 0 660 http2._start_settings_timer() 661 662cdef class _HTTP2SessionCoreBase: 663 cdef cnghttp2.nghttp2_session *session 664 cdef transport 665 cdef handler_class 666 cdef handlers 667 cdef settings_timer 668 cdef inside_callback 669 670 def __cinit__(self, transport, handler_class=None): 671 self.session = NULL 672 self.transport = transport 673 self.handler_class = handler_class 674 self.handlers = set() 675 self.settings_timer = None 676 self.inside_callback = False 677 678 def __dealloc__(self): 679 cnghttp2.nghttp2_session_del(self.session) 680 681 def data_received(self, data): 682 cdef ssize_t rv 683 684 rv = cnghttp2.nghttp2_session_mem_recv(self.session, data, len(data)) 685 if rv < 0: 686 raise Exception('nghttp2_session_mem_recv failed: {}'.format\ 687 (_strerror(rv))) 688 self.send_data() 689 690 OUTBUF_MAX = 65535 691 SETTINGS_TIMEOUT = 5.0 692 693 def send_data(self): 694 cdef ssize_t outbuflen 695 cdef const uint8_t *outbuf 696 697 while True: 698 if self.transport.get_write_buffer_size() > self.OUTBUF_MAX: 699 break 700 outbuflen = cnghttp2.nghttp2_session_mem_send(self.session, &outbuf) 701 if outbuflen == 0: 702 break 703 if outbuflen < 0: 704 raise Exception('nghttp2_session_mem_send faild: {}'.format\ 705 (_strerror(outbuflen))) 706 self.transport.write(outbuf[:outbuflen]) 707 708 if self.transport.get_write_buffer_size() == 0 and \ 709 cnghttp2.nghttp2_session_want_read(self.session) == 0 and \ 710 cnghttp2.nghttp2_session_want_write(self.session) == 0: 711 self.transport.close() 712 713 def resume(self, stream_id): 714 cnghttp2.nghttp2_session_resume_data(self.session, stream_id) 715 if not self.inside_callback: 716 self.send_data() 717 718 def enter_callback(self): 719 self.inside_callback = True 720 721 def leave_callback(self): 722 self.inside_callback = False 723 724 def _make_handler(self, stream_id): 725 logging.debug('_make_handler, stream_id:%s', stream_id) 726 handler = self.handler_class(self, stream_id) 727 self.handlers.add(handler) 728 return handler 729 730 def _remove_handler(self, handler): 731 logging.debug('_remove_handler, stream_id:%s', handler.stream_id) 732 self.handlers.remove(handler) 733 734 def _add_handler(self, handler, stream_id): 735 logging.debug('_add_handler, stream_id:%s', stream_id) 736 handler.stream_id = stream_id 737 handler.http2 = self 738 handler.remote_address = self._get_remote_address() 739 handler.client_certificate = self._get_client_certificate() 740 self.handlers.add(handler) 741 742 def _rst_stream(self, stream_id, 743 error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR): 744 cdef int rv 745 746 rv = cnghttp2.nghttp2_submit_rst_stream\ 747 (self.session, cnghttp2.NGHTTP2_FLAG_NONE, 748 stream_id, error_code) 749 750 return rv 751 752 def _get_remote_address(self): 753 return self.transport.get_extra_info('peername') 754 755 def _get_client_certificate(self): 756 sock = self.transport.get_extra_info('socket') 757 try: 758 return sock.getpeercert() 759 except AttributeError: 760 return None 761 762 def _start_settings_timer(self): 763 loop = asyncio.get_event_loop() 764 self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT, 765 self._settings_timeout) 766 767 def _stop_settings_timer(self): 768 if self.settings_timer: 769 self.settings_timer.cancel() 770 self.settings_timer = None 771 772 def _settings_timeout(self): 773 cdef int rv 774 775 logging.debug('_settings_timeout') 776 777 self.settings_timer = None 778 779 rv = cnghttp2.nghttp2_session_terminate_session\ 780 (self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT) 781 try: 782 self.send_data() 783 except Exception as err: 784 sys.stderr.write(traceback.format_exc()) 785 self.transport.close() 786 return 787 788 def _log_request(self, handler): 789 now = datetime.datetime.now() 790 tv = time.mktime(now.timetuple()) 791 datestr = email.utils.formatdate(timeval=tv, localtime=False, 792 usegmt=True) 793 try: 794 method = handler.method.decode('utf-8') 795 except: 796 method = handler.method 797 try: 798 path = handler.path.decode('utf-8') 799 except: 800 path = handler.path 801 logging.info('%s - - [%s] "%s %s HTTP/2" %s - %s', handler.remote_address[0], 802 datestr, method, path, handler.status, 803 'P' if handler.pushed else '-') 804 805 def close(self): 806 rv = cnghttp2.nghttp2_session_terminate_session\ 807 (self.session, cnghttp2.NGHTTP2_NO_ERROR) 808 try: 809 self.send_data() 810 except Exception as err: 811 sys.stderr.write(traceback.format_exc()) 812 self.transport.close() 813 return 814 815cdef class _HTTP2SessionCore(_HTTP2SessionCoreBase): 816 def __cinit__(self, *args, **kwargs): 817 cdef cnghttp2.nghttp2_session_callbacks *callbacks 818 cdef cnghttp2.nghttp2_settings_entry iv[2] 819 cdef int rv 820 821 super(_HTTP2SessionCore, self).__init__(*args, **kwargs) 822 823 rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks) 824 825 if rv != 0: 826 raise Exception('nghttp2_session_callbacks_new failed: {}'.format\ 827 (_strerror(rv))) 828 829 cnghttp2.nghttp2_session_callbacks_set_on_header_callback( 830 callbacks, server_on_header) 831 cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback( 832 callbacks, server_on_begin_headers) 833 cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback( 834 callbacks, server_on_frame_recv) 835 cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback( 836 callbacks, on_stream_close) 837 cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback( 838 callbacks, server_on_frame_send) 839 cnghttp2.nghttp2_session_callbacks_set_on_frame_not_send_callback( 840 callbacks, server_on_frame_not_send) 841 cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback( 842 callbacks, on_data_chunk_recv) 843 844 rv = cnghttp2.nghttp2_session_server_new(&self.session, callbacks, 845 <void*>self) 846 847 cnghttp2.nghttp2_session_callbacks_del(callbacks) 848 849 if rv != 0: 850 raise Exception('nghttp2_session_server_new failed: {}'.format\ 851 (_strerror(rv))) 852 853 iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS 854 iv[0].value = 100 855 iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE 856 iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE 857 858 rv = cnghttp2.nghttp2_submit_settings(self.session, 859 cnghttp2.NGHTTP2_FLAG_NONE, 860 iv, sizeof(iv) // sizeof(iv[0])) 861 862 if rv != 0: 863 raise Exception('nghttp2_submit_settings failed: {}'.format\ 864 (_strerror(rv))) 865 866 def send_response(self, handler): 867 cdef cnghttp2.nghttp2_data_provider prd 868 cdef cnghttp2.nghttp2_data_provider *prd_ptr 869 cdef cnghttp2.nghttp2_nv *nva 870 cdef size_t nvlen 871 cdef int rv 872 873 logging.debug('send_response, stream_id:%s', handler.stream_id) 874 875 nva = NULL 876 nvlen = _make_nva(&nva, handler.response_headers) 877 878 if handler.response_body: 879 prd.source.ptr = <void*>handler.response_body 880 prd.read_callback = data_source_read 881 prd_ptr = &prd 882 else: 883 prd_ptr = NULL 884 885 rv = cnghttp2.nghttp2_submit_response(self.session, handler.stream_id, 886 nva, nvlen, prd_ptr) 887 888 free(nva) 889 890 if rv != 0: 891 # TODO Ignore return value 892 self._rst_stream(handler.stream_id) 893 raise Exception('nghttp2_submit_response failed: {}'.format\ 894 (_strerror(rv))) 895 896 self._log_request(handler) 897 898 def push(self, handler, promised_handler): 899 cdef cnghttp2.nghttp2_nv *nva 900 cdef size_t nvlen 901 cdef int32_t promised_stream_id 902 903 self.handlers.add(promised_handler) 904 905 nva = NULL 906 nvlen = _make_nva(&nva, promised_handler.headers) 907 908 promised_stream_id = cnghttp2.nghttp2_submit_push_promise\ 909 (self.session, 910 cnghttp2.NGHTTP2_FLAG_NONE, 911 handler.stream_id, 912 nva, nvlen, 913 <void*>promised_handler) 914 if promised_stream_id < 0: 915 raise Exception('nghttp2_submit_push_promise failed: {}'.format\ 916 (_strerror(promised_stream_id))) 917 918 promised_handler.stream_id = promised_stream_id 919 920 logging.debug('push, stream_id:%s', promised_stream_id) 921 922 return promised_handler 923 924 def connection_lost(self): 925 self._stop_settings_timer() 926 927 for handler in self.handlers: 928 handler.on_close(cnghttp2.NGHTTP2_INTERNAL_ERROR) 929 self.handlers = set() 930 931cdef class _HTTP2ClientSessionCore(_HTTP2SessionCoreBase): 932 def __cinit__(self, *args, **kwargs): 933 cdef cnghttp2.nghttp2_session_callbacks *callbacks 934 cdef cnghttp2.nghttp2_settings_entry iv[2] 935 cdef int rv 936 937 super(_HTTP2ClientSessionCore, self).__init__(*args, **kwargs) 938 939 rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks) 940 941 if rv != 0: 942 raise Exception('nghttp2_session_callbacks_new failed: {}'.format\ 943 (_strerror(rv))) 944 945 cnghttp2.nghttp2_session_callbacks_set_on_header_callback( 946 callbacks, client_on_header) 947 cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback( 948 callbacks, client_on_begin_headers) 949 cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback( 950 callbacks, client_on_frame_recv) 951 cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback( 952 callbacks, on_stream_close) 953 cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback( 954 callbacks, client_on_frame_send) 955 cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback( 956 callbacks, on_data_chunk_recv) 957 958 rv = cnghttp2.nghttp2_session_client_new(&self.session, callbacks, 959 <void*>self) 960 961 cnghttp2.nghttp2_session_callbacks_del(callbacks) 962 963 if rv != 0: 964 raise Exception('nghttp2_session_client_new failed: {}'.format\ 965 (_strerror(rv))) 966 967 iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS 968 iv[0].value = 100 969 iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE 970 iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE 971 972 rv = cnghttp2.nghttp2_submit_settings(self.session, 973 cnghttp2.NGHTTP2_FLAG_NONE, 974 iv, sizeof(iv) // sizeof(iv[0])) 975 976 if rv != 0: 977 raise Exception('nghttp2_submit_settings failed: {}'.format\ 978 (_strerror(rv))) 979 980 def send_request(self, method, scheme, host, path, headers, body, handler): 981 cdef cnghttp2.nghttp2_data_provider prd 982 cdef cnghttp2.nghttp2_data_provider *prd_ptr 983 cdef cnghttp2.nghttp2_priority_spec *pri_ptr 984 cdef cnghttp2.nghttp2_nv *nva 985 cdef size_t nvlen 986 cdef int32_t stream_id 987 988 body = wrap_body(body) 989 990 custom_headers = _encode_headers(headers) 991 headers = [ 992 (b':method', method.encode('utf-8')), 993 (b':scheme', scheme.encode('utf-8')), 994 (b':authority', host.encode('utf-8')), 995 (b':path', path.encode('utf-8')) 996 ] 997 headers.extend(custom_headers) 998 999 nva = NULL 1000 nvlen = _make_nva(&nva, headers) 1001 1002 if body: 1003 prd.source.ptr = <void*>body 1004 prd.read_callback = data_source_read 1005 prd_ptr = &prd 1006 else: 1007 prd_ptr = NULL 1008 1009 # TODO: Enable priorities 1010 pri_ptr = NULL 1011 1012 stream_id = cnghttp2.nghttp2_submit_request\ 1013 (self.session, pri_ptr, 1014 nva, nvlen, prd_ptr, 1015 <void*>handler) 1016 free(nva) 1017 1018 if stream_id < 0: 1019 raise Exception('nghttp2_submit_request failed: {}'.format\ 1020 (_strerror(stream_id))) 1021 1022 logging.debug('request, stream_id:%s', stream_id) 1023 1024 self._add_handler(handler, stream_id) 1025 cnghttp2.nghttp2_session_set_stream_user_data(self.session, stream_id, 1026 <void*>handler) 1027 1028 return handler 1029 1030 def push(self, push_promise, handler): 1031 if handler: 1032 # push_promise accepted, fill in the handler with the stored 1033 # headers from the push_promise 1034 handler.status = push_promise.status 1035 handler.scheme = push_promise.scheme 1036 handler.method = push_promise.method 1037 handler.host = push_promise.host 1038 handler.path = push_promise.path 1039 handler.cookies = push_promise.cookies 1040 handler.stream_id = push_promise.stream_id 1041 handler.http2 = self 1042 handler.pushed = True 1043 1044 self._add_handler(handler, handler.stream_id) 1045 1046 cnghttp2.nghttp2_session_set_stream_user_data(self.session, handler.stream_id, 1047 <void*>handler) 1048 else: 1049 # push_promise rejected, reset the stream 1050 self._rst_stream(push_promise.stream_id, 1051 error_code=cnghttp2.NGHTTP2_NO_ERROR) 1052 1053if asyncio: 1054 1055 class BaseRequestHandler: 1056 1057 """HTTP/2 request (stream) handler base class. 1058 1059 The class is used to handle the HTTP/2 stream. By default, it does 1060 not nothing. It must be subclassed to handle each event callback 1061 method. 1062 1063 The first callback method invoked is on_headers(). It is called 1064 when HEADERS frame, which includes request header fields, is 1065 arrived. 1066 1067 If request has request body, on_data(data) is invoked for each 1068 chunk of received data. 1069 1070 When whole request is received, on_request_done() is invoked. 1071 1072 When stream is closed, on_close(error_code) is called. 1073 1074 The application can send response using send_response() method. It 1075 can be used in on_headers(), on_data() or on_request_done(). 1076 1077 The application can push resource using push() method. It must be 1078 used before send_response() call. 1079 1080 The following instance variables are available: 1081 1082 client_address 1083 Contains a tuple of the form (host, port) referring to the client's 1084 address. 1085 1086 client_certificate 1087 May contain the client certifcate in its non-binary form 1088 1089 stream_id 1090 Stream ID of this stream 1091 1092 scheme 1093 Scheme of the request URI. This is a value of :scheme header field. 1094 1095 method 1096 Method of this stream. This is a value of :method header field. 1097 1098 host 1099 This is a value of :authority or host header field. 1100 1101 path 1102 This is a value of :path header field. 1103 1104 headers 1105 Request header fields 1106 1107 """ 1108 1109 def __init__(self, http2, stream_id): 1110 self.headers = [] 1111 self.cookies = [] 1112 # Stream ID. For promised stream, it is initially -1. 1113 self.stream_id = stream_id 1114 self.http2 = http2 1115 # address of the client 1116 self.remote_address = self.http2._get_remote_address() 1117 # certificate of the client 1118 self._client_certificate = self.http2._get_client_certificate() 1119 # :scheme header field in request 1120 self.scheme = None 1121 # :method header field in request 1122 self.method = None 1123 # :authority or host header field in request 1124 self.host = None 1125 # :path header field in request 1126 self.path = None 1127 # HTTP status 1128 self.status = None 1129 # True if this is a handler for pushed resource 1130 self.pushed = False 1131 1132 @property 1133 def client_address(self): 1134 return self.remote_address 1135 1136 @property 1137 def client_certificate(self): 1138 return self._client_certificate 1139 1140 def on_headers(self): 1141 1142 '''Called when request HEADERS is arrived. 1143 1144 ''' 1145 pass 1146 1147 def on_data(self, data): 1148 1149 '''Called when a chunk of request body is arrived. This method 1150 will be called multiple times until all data are received. 1151 1152 ''' 1153 pass 1154 1155 def on_request_done(self): 1156 1157 '''Called when whole request was received 1158 1159 ''' 1160 pass 1161 1162 def on_close(self, error_code): 1163 1164 '''Called when stream is about to close. 1165 1166 ''' 1167 pass 1168 1169 def send_response(self, status=200, headers=None, body=None): 1170 1171 '''Send response. The status is HTTP status code. The headers is 1172 additional response headers. The :status header field is 1173 appended by the library. The body is the response body. It 1174 could be None if response body is empty. Or it must be 1175 instance of either str, bytes, io.IOBase or callable, 1176 called body generator, which takes one parameter, 1177 size. The body generator generates response body. It can 1178 pause generation of response so that it can wait for slow 1179 backend data generation. When invoked, it should return 1180 tuple, byte string and flag. The flag is either DATA_OK, 1181 DATA_EOF and DATA_DEFERRED. For non-empty byte string and 1182 it is not the last chunk of response, DATA_OK is returned 1183 as flag. If this is the last chunk of the response (byte 1184 string is possibly None), DATA_EOF must be returned as 1185 flag. If there is no data available right now, but 1186 additional data are anticipated, return tuple (None, 1187 DATA_DEFERRD). When data arrived, call resume() and 1188 restart response body transmission. 1189 1190 Only the body generator can pause response body 1191 generation; instance of io.IOBase must not block. 1192 1193 If instance of str is specified as body, it is encoded 1194 using UTF-8. 1195 1196 The headers is a list of tuple of the form (name, 1197 value). The name and value can be either unicode string or 1198 byte string. 1199 1200 On error, exception will be thrown. 1201 1202 ''' 1203 if self.status is not None: 1204 raise Exception('response has already been sent') 1205 1206 if not status: 1207 raise Exception('status must not be empty') 1208 1209 body = wrap_body(body) 1210 1211 self._set_response_prop(status, headers, body) 1212 self.http2.send_response(self) 1213 1214 def push(self, path, method='GET', request_headers=None, 1215 status=200, headers=None, body=None): 1216 1217 '''Push a resource. The path is a path portion of request URI 1218 for this 1219 resource. The method is a method to access this 1220 resource. The request_headers is additional request 1221 headers to access this resource. The :scheme, :method, 1222 :authority and :path are appended by the library. The 1223 :scheme and :authority are inherited from the request (not 1224 request_headers parameter). 1225 1226 The status is HTTP status code. The headers is additional 1227 response headers. The :status header field is appended by 1228 the library. The body is the response body. It has the 1229 same semantics of body parameter of send_response(). 1230 1231 The headers and request_headers are a list of tuple of the 1232 form (name, value). The name and value can be either 1233 unicode string or byte string. 1234 1235 On error, exception will be thrown. 1236 1237 ''' 1238 if not status: 1239 raise Exception('status must not be empty') 1240 1241 if not method: 1242 raise Exception('method must not be empty') 1243 1244 if not path: 1245 raise Exception('path must not be empty') 1246 1247 body = wrap_body(body) 1248 1249 promised_handler = self.http2._make_handler(-1) 1250 promised_handler.pushed = True 1251 promised_handler.scheme = self.scheme 1252 promised_handler.method = method.encode('utf-8') 1253 promised_handler.host = self.host 1254 promised_handler.path = path.encode('utf-8') 1255 promised_handler._set_response_prop(status, headers, body) 1256 1257 headers = [ 1258 (b':method', promised_handler.method), 1259 (b':scheme', promised_handler.scheme), 1260 (b':authority', promised_handler.host), 1261 (b':path', promised_handler.path) 1262 ] 1263 headers.extend(_encode_headers(request_headers)) 1264 1265 promised_handler.headers = headers 1266 1267 return self.http2.push(self, promised_handler) 1268 1269 def _set_response_prop(self, status, headers, body): 1270 self.status = status 1271 1272 if headers is None: 1273 headers = [] 1274 1275 self.response_headers = [(b':status', str(status).encode('utf-8'))] 1276 self.response_headers.extend(_encode_headers(headers)) 1277 1278 self.response_body = body 1279 1280 def resume(self): 1281 self.http2.resume(self.stream_id) 1282 1283 def _encode_headers(headers): 1284 if not headers: 1285 return [] 1286 return [(k if isinstance(k, bytes) else k.encode('utf-8'), 1287 v if isinstance(v, bytes) else v.encode('utf-8')) \ 1288 for k, v in headers] 1289 1290 class _HTTP2Session(asyncio.Protocol): 1291 1292 def __init__(self, RequestHandlerClass): 1293 asyncio.Protocol.__init__(self) 1294 self.RequestHandlerClass = RequestHandlerClass 1295 self.http2 = None 1296 1297 def connection_made(self, transport): 1298 address = transport.get_extra_info('peername') 1299 logging.info('connection_made, address:%s, port:%s', address[0], address[1]) 1300 1301 self.transport = transport 1302 sock = self.transport.get_extra_info('socket') 1303 try: 1304 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 1305 except OSError as e: 1306 logging.info('failed to set tcp-nodelay: %s', str(e)) 1307 ssl_ctx = self.transport.get_extra_info('sslcontext') 1308 if ssl_ctx: 1309 ssl_obj = self.transport.get_extra_info('ssl_object') 1310 protocol = negotiated_protocol(ssl_obj) 1311 if protocol is None or protocol.encode('utf-8') != \ 1312 cnghttp2.NGHTTP2_PROTO_VERSION_ID: 1313 self.transport.abort() 1314 return 1315 try: 1316 self.http2 = _HTTP2SessionCore\ 1317 (self.transport, 1318 self.RequestHandlerClass) 1319 except Exception as err: 1320 sys.stderr.write(traceback.format_exc()) 1321 self.transport.abort() 1322 return 1323 1324 1325 def connection_lost(self, exc): 1326 logging.info('connection_lost') 1327 if self.http2: 1328 self.http2.connection_lost() 1329 self.http2 = None 1330 1331 def data_received(self, data): 1332 try: 1333 self.http2.data_received(data) 1334 except Exception as err: 1335 sys.stderr.write(traceback.format_exc()) 1336 self.transport.close() 1337 return 1338 1339 def resume_writing(self): 1340 try: 1341 self.http2.send_data() 1342 except Exception as err: 1343 sys.stderr.write(traceback.format_exc()) 1344 self.transport.close() 1345 return 1346 1347 class HTTP2Server: 1348 1349 '''HTTP/2 server. 1350 1351 This class builds on top of the asyncio event loop. On 1352 construction, RequestHandlerClass must be given, which must be a 1353 subclass of BaseRequestHandler class. 1354 1355 ''' 1356 def __init__(self, address, RequestHandlerClass, ssl=None): 1357 1358 '''address is a tuple of the listening address and port (e.g., 1359 ('127.0.0.1', 8080)). RequestHandlerClass must be a subclass 1360 of BaseRequestHandler class to handle a HTTP/2 stream. The 1361 ssl can be ssl.SSLContext instance. If it is not None, the 1362 resulting server is SSL/TLS capable. 1363 1364 ''' 1365 def session_factory(): 1366 return _HTTP2Session(RequestHandlerClass) 1367 1368 self.loop = asyncio.get_event_loop() 1369 1370 if ssl: 1371 set_application_protocol(ssl) 1372 1373 coro = self.loop.create_server(session_factory, 1374 host=address[0], port=address[1], 1375 ssl=ssl) 1376 self.server = self.loop.run_until_complete(coro) 1377 logging.info('listen, address:%s, port:%s', address[0], address[1]) 1378 1379 def serve_forever(self): 1380 try: 1381 self.loop.run_forever() 1382 finally: 1383 self.server.close() 1384 self.loop.close() 1385 1386 1387 1388 class BaseResponseHandler: 1389 1390 """HTTP/2 response (stream) handler base class. 1391 1392 The class is used to handle the HTTP/2 stream. By default, it does 1393 not nothing. It must be subclassed to handle each event callback 1394 method. 1395 1396 The first callback method invoked is on_headers(). It is called 1397 when HEADERS frame, which includes response header fields, is 1398 arrived. 1399 1400 If response has a body, on_data(data) is invoked for each 1401 chunk of received data. 1402 1403 When whole response is received, on_response_done() is invoked. 1404 1405 When stream is closed or underlying connection is lost, 1406 on_close(error_code) is called. 1407 1408 The application can send follow up requests using HTTP2Client.send_request() method. 1409 1410 The application can handle push resource using on_push_promise() method. 1411 1412 The following instance variables are available: 1413 1414 server_address 1415 Contains a tuple of the form (host, port) referring to the server's 1416 address. 1417 1418 stream_id 1419 Stream ID of this stream 1420 1421 scheme 1422 Scheme of the request URI. This is a value of :scheme header field. 1423 1424 method 1425 Method of this stream. This is a value of :method header field. 1426 1427 host 1428 This is a value of :authority or host header field. 1429 1430 path 1431 This is a value of :path header field. 1432 1433 headers 1434 Response header fields. There is a special exception. If this 1435 object is passed to push_promise(), this instance variable contains 1436 pushed request header fields. 1437 1438 """ 1439 1440 def __init__(self, http2=None, stream_id=-1): 1441 self.headers = [] 1442 self.cookies = [] 1443 # Stream ID. For promised stream, it is initially -1. 1444 self.stream_id = stream_id 1445 self.http2 = http2 1446 # address of the server 1447 self.remote_address = None 1448 # :scheme header field in request 1449 self.scheme = None 1450 # :method header field in request 1451 self.method = None 1452 # :authority or host header field in request 1453 self.host = None 1454 # :path header field in request 1455 self.path = None 1456 # HTTP status 1457 self.status = None 1458 # True if this is a handler for pushed resource 1459 self.pushed = False 1460 1461 @property 1462 def server_address(self): 1463 return self.remote_address 1464 1465 def on_headers(self): 1466 1467 '''Called when response HEADERS is arrived. 1468 1469 ''' 1470 pass 1471 1472 def on_data(self, data): 1473 1474 '''Called when a chunk of response body is arrived. This method 1475 will be called multiple times until all data are received. 1476 1477 ''' 1478 pass 1479 1480 def on_response_done(self): 1481 1482 '''Called when whole response was received 1483 1484 ''' 1485 pass 1486 1487 def on_close(self, error_code): 1488 1489 '''Called when stream is about to close. 1490 1491 ''' 1492 pass 1493 1494 def on_push_promise(self, push_promise): 1495 1496 '''Called when a push is promised. Default behavior is to 1497 cancel the push. If application overrides this method, 1498 it should call either accept_push or reject_push. 1499 1500 ''' 1501 self.reject_push(push_promise) 1502 1503 def reject_push(self, push_promise): 1504 1505 '''Convenience method equivalent to calling accept_push 1506 with a falsy value. 1507 1508 ''' 1509 self.http2.push(push_promise, None) 1510 1511 def accept_push(self, push_promise, handler=None): 1512 1513 '''Accept a push_promise and provider a handler for the 1514 new stream. If a falsy value is supplied for the handler, 1515 the push is rejected. 1516 1517 ''' 1518 self.http2.push(push_promise, handler) 1519 1520 def resume(self): 1521 self.http2.resume(self.stream_id) 1522 1523 class _HTTP2ClientSession(asyncio.Protocol): 1524 1525 def __init__(self, client): 1526 asyncio.Protocol.__init__(self) 1527 self.http2 = None 1528 self.pending = [] 1529 self.client = client 1530 1531 def connection_made(self, transport): 1532 address = transport.get_extra_info('peername') 1533 logging.info('connection_made, address:%s, port:%s', address[0], address[1]) 1534 1535 self.transport = transport 1536 sock = self.transport.get_extra_info('socket') 1537 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 1538 ssl_ctx = self.transport.get_extra_info('sslcontext') 1539 if ssl_ctx: 1540 ssl_obj = self.transport.get_extra_info('ssl_object') 1541 protocol = negotiated_protocol(ssl_obj) 1542 if protocol is None or protocol.encode('utf-8') != \ 1543 cnghttp2.NGHTTP2_PROTO_VERSION_ID: 1544 self.transport.abort() 1545 1546 self.http2 = _HTTP2ClientSessionCore(self.transport) 1547 1548 # Clear pending requests 1549 send_pending = self.pending 1550 self.pending = [] 1551 for method,scheme,host,path,headers,body,handler in send_pending: 1552 self.send_request(method=method, scheme=scheme, host=host, path=path,\ 1553 headers=headers, body=body, handler=handler) 1554 self.http2.send_data() 1555 1556 def connection_lost(self, exc): 1557 logging.info('connection_lost') 1558 if self.http2: 1559 self.http2 = None 1560 self.client.close() 1561 1562 def data_received(self, data): 1563 try: 1564 self.http2.data_received(data) 1565 except Exception as err: 1566 sys.stderr.write(traceback.format_exc()) 1567 self.transport.close() 1568 return 1569 1570 def resume_writing(self): 1571 try: 1572 self.http2.send_data() 1573 except Exception as err: 1574 sys.stderr.write(traceback.format_exc()) 1575 self.transport.close() 1576 return 1577 1578 def send_request(self, method, scheme, host, path, headers, body, handler): 1579 try: 1580 # Waiting until connection established 1581 if not self.http2: 1582 self.pending.append([method, scheme, host, path, headers, body, handler]) 1583 return 1584 1585 self.http2.send_request(method=method, scheme=scheme, host=host, path=path,\ 1586 headers=headers, body=body, handler=handler) 1587 self.http2.send_data() 1588 except Exception as err: 1589 sys.stderr.write(traceback.format_exc()) 1590 self.transport.close() 1591 return 1592 1593 def close(self): 1594 if self.http2: 1595 self.http2.close() 1596 1597 1598 class HTTP2Client: 1599 1600 '''HTTP/2 client. 1601 1602 This class builds on top of the asyncio event loop. 1603 1604 ''' 1605 def __init__(self, address, loop=None, ssl=None): 1606 1607 '''address is a tuple of the connect address and port (e.g., 1608 ('127.0.0.1', 8080)). The ssl can be ssl.SSLContext instance. 1609 If it is not None, the resulting client is SSL/TLS capable. 1610 ''' 1611 1612 self.address = address 1613 self.session = _HTTP2ClientSession(self) 1614 def session_factory(): 1615 return self.session 1616 1617 if ssl: 1618 set_application_protocol(ssl) 1619 1620 self.loop = loop 1621 if not self.loop: 1622 self.loop = asyncio.get_event_loop() 1623 1624 coro = self.loop.create_connection(session_factory, 1625 host=address[0], port=address[1], 1626 ssl=ssl) 1627 1628 if ssl: 1629 self.scheme = 'https' 1630 else: 1631 self.scheme = 'http' 1632 1633 self.transport,_ = self.loop.run_until_complete(coro) 1634 logging.info('connect, address:%s, port:%s', self.address[0], self.address[1]) 1635 1636 @property 1637 def io_loop(self): 1638 return self.loop 1639 1640 def close(self): 1641 self.session.close() 1642 1643 def send_request(self, method='GET', url='/', headers=None, body=None, handler=None): 1644 url = urlparse(url) 1645 scheme = url.scheme if url.scheme else self.scheme 1646 host = url.netloc if url.netloc else self.address[0]+':'+str(self.address[1]) 1647 path = url.path 1648 if url.params: 1649 path += ';'+url.params 1650 if url.query: 1651 path += '?'+url.query 1652 if url.fragment: 1653 path += '#'+url.fragment 1654 1655 self.session.send_request(method=method, scheme=scheme, host=host, path=path,\ 1656 headers=headers, body=body, handler=handler) 1657