1# Copyright 2012, Google Inc. 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above 11# copyright notice, this list of conditions and the following disclaimer 12# in the documentation and/or other materials provided with the 13# distribution. 14# * Neither the name of Google Inc. nor the names of its 15# contributors may be used to endorse or promote products derived from 16# this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31"""This file provides classes and helper functions for parsing/building frames 32of the WebSocket protocol (RFC 6455). 33 34Specification: 35http://tools.ietf.org/html/rfc6455 36""" 37 38 39from collections import deque 40import logging 41import os 42import struct 43import time 44 45from mod_pywebsocket import common 46from mod_pywebsocket import util 47from mod_pywebsocket._stream_base import BadOperationException 48from mod_pywebsocket._stream_base import ConnectionTerminatedException 49from mod_pywebsocket._stream_base import InvalidFrameException 50from mod_pywebsocket._stream_base import InvalidUTF8Exception 51from mod_pywebsocket._stream_base import StreamBase 52from mod_pywebsocket._stream_base import UnsupportedFrameException 53 54 55_NOOP_MASKER = util.NoopMasker() 56 57 58class Frame(object): 59 60 def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0, 61 opcode=None, payload=''): 62 self.fin = fin 63 self.rsv1 = rsv1 64 self.rsv2 = rsv2 65 self.rsv3 = rsv3 66 self.opcode = opcode 67 self.payload = payload 68 69 70# Helper functions made public to be used for writing unittests for WebSocket 71# clients. 72 73 74def create_length_header(length, mask): 75 """Creates a length header. 76 77 Args: 78 length: Frame length. Must be less than 2^63. 79 mask: Mask bit. Must be boolean. 80 81 Raises: 82 ValueError: when bad data is given. 83 """ 84 85 if mask: 86 mask_bit = 1 << 7 87 else: 88 mask_bit = 0 89 90 if length < 0: 91 raise ValueError('length must be non negative integer') 92 elif length <= 125: 93 return chr(mask_bit | length) 94 elif length < (1 << 16): 95 return chr(mask_bit | 126) + struct.pack('!H', length) 96 elif length < (1 << 63): 97 return chr(mask_bit | 127) + struct.pack('!Q', length) 98 else: 99 raise ValueError('Payload is too big for one frame') 100 101 102def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask): 103 """Creates a frame header. 104 105 Raises: 106 Exception: when bad data is given. 107 """ 108 109 if opcode < 0 or 0xf < opcode: 110 raise ValueError('Opcode out of range') 111 112 if payload_length < 0 or (1 << 63) <= payload_length: 113 raise ValueError('payload_length out of range') 114 115 if (fin | rsv1 | rsv2 | rsv3) & ~1: 116 raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1') 117 118 header = '' 119 120 first_byte = ((fin << 7) 121 | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4) 122 | opcode) 123 header += chr(first_byte) 124 header += create_length_header(payload_length, mask) 125 126 return header 127 128 129def _build_frame(header, body, mask): 130 if not mask: 131 return header + body 132 133 masking_nonce = os.urandom(4) 134 masker = util.RepeatedXorMasker(masking_nonce) 135 136 return header + masking_nonce + masker.mask(body) 137 138 139def _filter_and_format_frame_object(frame, mask, frame_filters): 140 for frame_filter in frame_filters: 141 frame_filter.filter(frame) 142 143 header = create_header( 144 frame.opcode, len(frame.payload), frame.fin, 145 frame.rsv1, frame.rsv2, frame.rsv3, mask) 146 return _build_frame(header, frame.payload, mask) 147 148 149def create_binary_frame( 150 message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]): 151 """Creates a simple binary frame with no extension, reserved bit.""" 152 153 frame = Frame(fin=fin, opcode=opcode, payload=message) 154 return _filter_and_format_frame_object(frame, mask, frame_filters) 155 156 157def create_text_frame( 158 message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]): 159 """Creates a simple text frame with no extension, reserved bit.""" 160 161 encoded_message = message.encode('utf-8') 162 return create_binary_frame(encoded_message, opcode, fin, mask, 163 frame_filters) 164 165 166def parse_frame(receive_bytes, logger=None, 167 ws_version=common.VERSION_HYBI_LATEST, 168 unmask_receive=True): 169 """Parses a frame. Returns a tuple containing each header field and 170 payload. 171 172 Args: 173 receive_bytes: a function that reads frame data from a stream or 174 something similar. The function takes length of the bytes to be 175 read. The function must raise ConnectionTerminatedException if 176 there is not enough data to be read. 177 logger: a logging object. 178 ws_version: the version of WebSocket protocol. 179 unmask_receive: unmask received frames. When received unmasked 180 frame, raises InvalidFrameException. 181 182 Raises: 183 ConnectionTerminatedException: when receive_bytes raises it. 184 InvalidFrameException: when the frame contains invalid data. 185 """ 186 187 if not logger: 188 logger = logging.getLogger() 189 190 logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame') 191 192 received = receive_bytes(2) 193 194 first_byte = ord(received[0]) 195 fin = (first_byte >> 7) & 1 196 rsv1 = (first_byte >> 6) & 1 197 rsv2 = (first_byte >> 5) & 1 198 rsv3 = (first_byte >> 4) & 1 199 opcode = first_byte & 0xf 200 201 second_byte = ord(received[1]) 202 mask = (second_byte >> 7) & 1 203 payload_length = second_byte & 0x7f 204 205 logger.log(common.LOGLEVEL_FINE, 206 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, ' 207 'Mask=%s, Payload_length=%s', 208 fin, rsv1, rsv2, rsv3, opcode, mask, payload_length) 209 210 if (mask == 1) != unmask_receive: 211 raise InvalidFrameException( 212 'Mask bit on the received frame did\'nt match masking ' 213 'configuration for received frames') 214 215 # The HyBi and later specs disallow putting a value in 0x0-0xFFFF 216 # into the 8-octet extended payload length field (or 0x0-0xFD in 217 # 2-octet field). 218 valid_length_encoding = True 219 length_encoding_bytes = 1 220 if payload_length == 127: 221 logger.log(common.LOGLEVEL_FINE, 222 'Receive 8-octet extended payload length') 223 224 extended_payload_length = receive_bytes(8) 225 payload_length = struct.unpack( 226 '!Q', extended_payload_length)[0] 227 if payload_length > 0x7FFFFFFFFFFFFFFF: 228 raise InvalidFrameException( 229 'Extended payload length >= 2^63') 230 if ws_version >= 13 and payload_length < 0x10000: 231 valid_length_encoding = False 232 length_encoding_bytes = 8 233 234 logger.log(common.LOGLEVEL_FINE, 235 'Decoded_payload_length=%s', payload_length) 236 elif payload_length == 126: 237 logger.log(common.LOGLEVEL_FINE, 238 'Receive 2-octet extended payload length') 239 240 extended_payload_length = receive_bytes(2) 241 payload_length = struct.unpack( 242 '!H', extended_payload_length)[0] 243 if ws_version >= 13 and payload_length < 126: 244 valid_length_encoding = False 245 length_encoding_bytes = 2 246 247 logger.log(common.LOGLEVEL_FINE, 248 'Decoded_payload_length=%s', payload_length) 249 250 if not valid_length_encoding: 251 logger.warning( 252 'Payload length is not encoded using the minimal number of ' 253 'bytes (%d is encoded using %d bytes)', 254 payload_length, 255 length_encoding_bytes) 256 257 if mask == 1: 258 logger.log(common.LOGLEVEL_FINE, 'Receive mask') 259 260 masking_nonce = receive_bytes(4) 261 masker = util.RepeatedXorMasker(masking_nonce) 262 263 logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce) 264 else: 265 masker = _NOOP_MASKER 266 267 logger.log(common.LOGLEVEL_FINE, 'Receive payload data') 268 if logger.isEnabledFor(common.LOGLEVEL_FINE): 269 receive_start = time.time() 270 271 raw_payload_bytes = receive_bytes(payload_length) 272 273 if logger.isEnabledFor(common.LOGLEVEL_FINE): 274 logger.log( 275 common.LOGLEVEL_FINE, 276 'Done receiving payload data at %s MB/s', 277 payload_length / (time.time() - receive_start) / 1000 / 1000) 278 logger.log(common.LOGLEVEL_FINE, 'Unmask payload data') 279 280 if logger.isEnabledFor(common.LOGLEVEL_FINE): 281 unmask_start = time.time() 282 283 bytes = masker.mask(raw_payload_bytes) 284 285 if logger.isEnabledFor(common.LOGLEVEL_FINE): 286 logger.log( 287 common.LOGLEVEL_FINE, 288 'Done unmasking payload data at %s MB/s', 289 payload_length / (time.time() - unmask_start) / 1000 / 1000) 290 291 return opcode, bytes, fin, rsv1, rsv2, rsv3 292 293 294class FragmentedFrameBuilder(object): 295 """A stateful class to send a message as fragments.""" 296 297 def __init__(self, mask, frame_filters=[], encode_utf8=True): 298 """Constructs an instance.""" 299 300 self._mask = mask 301 self._frame_filters = frame_filters 302 # This is for skipping UTF-8 encoding when building text type frames 303 # from compressed data. 304 self._encode_utf8 = encode_utf8 305 306 self._started = False 307 308 # Hold opcode of the first frame in messages to verify types of other 309 # frames in the message are all the same. 310 self._opcode = common.OPCODE_TEXT 311 312 def build(self, message, end, binary): 313 if binary: 314 frame_type = common.OPCODE_BINARY 315 else: 316 frame_type = common.OPCODE_TEXT 317 if self._started: 318 if self._opcode != frame_type: 319 raise ValueError('Message types are different in frames for ' 320 'the same message') 321 opcode = common.OPCODE_CONTINUATION 322 else: 323 opcode = frame_type 324 self._opcode = frame_type 325 326 if end: 327 self._started = False 328 fin = 1 329 else: 330 self._started = True 331 fin = 0 332 333 if binary or not self._encode_utf8: 334 return create_binary_frame( 335 message, opcode, fin, self._mask, self._frame_filters) 336 else: 337 return create_text_frame( 338 message, opcode, fin, self._mask, self._frame_filters) 339 340 341def _create_control_frame(opcode, body, mask, frame_filters): 342 frame = Frame(opcode=opcode, payload=body) 343 344 for frame_filter in frame_filters: 345 frame_filter.filter(frame) 346 347 if len(frame.payload) > 125: 348 raise BadOperationException( 349 'Payload data size of control frames must be 125 bytes or less') 350 351 header = create_header( 352 frame.opcode, len(frame.payload), frame.fin, 353 frame.rsv1, frame.rsv2, frame.rsv3, mask) 354 return _build_frame(header, frame.payload, mask) 355 356 357def create_ping_frame(body, mask=False, frame_filters=[]): 358 return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters) 359 360 361def create_pong_frame(body, mask=False, frame_filters=[]): 362 return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters) 363 364 365def create_close_frame(body, mask=False, frame_filters=[]): 366 return _create_control_frame( 367 common.OPCODE_CLOSE, body, mask, frame_filters) 368 369 370def create_closing_handshake_body(code, reason): 371 body = '' 372 if code is not None: 373 if (code > common.STATUS_USER_PRIVATE_MAX or 374 code < common.STATUS_NORMAL_CLOSURE): 375 raise BadOperationException('Status code is out of range') 376 if (code == common.STATUS_NO_STATUS_RECEIVED or 377 code == common.STATUS_ABNORMAL_CLOSURE or 378 code == common.STATUS_TLS_HANDSHAKE): 379 raise BadOperationException('Status code is reserved pseudo ' 380 'code') 381 encoded_reason = reason.encode('utf-8') 382 body = struct.pack('!H', code) + encoded_reason 383 return body 384 385 386class StreamOptions(object): 387 """Holds option values to configure Stream objects.""" 388 389 def __init__(self): 390 """Constructs StreamOptions.""" 391 392 # Enables deflate-stream extension. 393 self.deflate_stream = False 394 395 # Filters applied to frames. 396 self.outgoing_frame_filters = [] 397 self.incoming_frame_filters = [] 398 399 # Filters applied to messages. Control frames are not affected by them. 400 self.outgoing_message_filters = [] 401 self.incoming_message_filters = [] 402 403 self.encode_text_message_to_utf8 = True 404 self.mask_send = False 405 self.unmask_receive = True 406 # RFC6455 disallows fragmented control frames, but mux extension 407 # relaxes the restriction. 408 self.allow_fragmented_control_frame = False 409 410 411class Stream(StreamBase): 412 """A class for parsing/building frames of the WebSocket protocol 413 (RFC 6455). 414 """ 415 416 def __init__(self, request, options): 417 """Constructs an instance. 418 419 Args: 420 request: mod_python request. 421 """ 422 423 StreamBase.__init__(self, request) 424 425 self._logger = util.get_class_logger(self) 426 427 self._options = options 428 429 if self._options.deflate_stream: 430 self._logger.debug('Setup filter for deflate-stream') 431 self._request = util.DeflateRequest(self._request) 432 433 self._request.client_terminated = False 434 self._request.server_terminated = False 435 436 # Holds body of received fragments. 437 self._received_fragments = [] 438 # Holds the opcode of the first fragment. 439 self._original_opcode = None 440 441 self._writer = FragmentedFrameBuilder( 442 self._options.mask_send, self._options.outgoing_frame_filters, 443 self._options.encode_text_message_to_utf8) 444 445 self._ping_queue = deque() 446 447 def _receive_frame(self): 448 """Receives a frame and return data in the frame as a tuple containing 449 each header field and payload separately. 450 451 Raises: 452 ConnectionTerminatedException: when read returns empty 453 string. 454 InvalidFrameException: when the frame contains invalid data. 455 """ 456 457 def _receive_bytes(length): 458 return self.receive_bytes(length) 459 460 return parse_frame(receive_bytes=_receive_bytes, 461 logger=self._logger, 462 ws_version=self._request.ws_version, 463 unmask_receive=self._options.unmask_receive) 464 465 def _receive_frame_as_frame_object(self): 466 opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame() 467 468 return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3, 469 opcode=opcode, payload=bytes) 470 471 def send_message(self, message, end=True, binary=False): 472 """Send message. 473 474 Args: 475 message: text in unicode or binary in str to send. 476 binary: send message as binary frame. 477 478 Raises: 479 BadOperationException: when called on a server-terminated 480 connection or called with inconsistent message type or 481 binary parameter. 482 """ 483 484 if self._request.server_terminated: 485 raise BadOperationException( 486 'Requested send_message after sending out a closing handshake') 487 488 if binary and isinstance(message, unicode): 489 raise BadOperationException( 490 'Message for binary frame must be instance of str') 491 492 for message_filter in self._options.outgoing_message_filters: 493 message = message_filter.filter(message, end, binary) 494 495 try: 496 self._write(self._writer.build(message, end, binary)) 497 except ValueError, e: 498 raise BadOperationException(e) 499 500 def _get_message_from_frame(self, frame): 501 """Gets a message from frame. If the message is composed of fragmented 502 frames and the frame is not the last fragmented frame, this method 503 returns None. The whole message will be returned when the last 504 fragmented frame is passed to this method. 505 506 Raises: 507 InvalidFrameException: when the frame doesn't match defragmentation 508 context, or the frame contains invalid data. 509 """ 510 511 if frame.opcode == common.OPCODE_CONTINUATION: 512 if not self._received_fragments: 513 if frame.fin: 514 raise InvalidFrameException( 515 'Received a termination frame but fragmentation ' 516 'not started') 517 else: 518 raise InvalidFrameException( 519 'Received an intermediate frame but ' 520 'fragmentation not started') 521 522 if frame.fin: 523 # End of fragmentation frame 524 self._received_fragments.append(frame.payload) 525 message = ''.join(self._received_fragments) 526 self._received_fragments = [] 527 return message 528 else: 529 # Intermediate frame 530 self._received_fragments.append(frame.payload) 531 return None 532 else: 533 if self._received_fragments: 534 if frame.fin: 535 raise InvalidFrameException( 536 'Received an unfragmented frame without ' 537 'terminating existing fragmentation') 538 else: 539 raise InvalidFrameException( 540 'New fragmentation started without terminating ' 541 'existing fragmentation') 542 543 if frame.fin: 544 # Unfragmented frame 545 546 self._original_opcode = frame.opcode 547 return frame.payload 548 else: 549 # Start of fragmentation frame 550 551 if (not self._options.allow_fragmented_control_frame and 552 common.is_control_opcode(frame.opcode)): 553 raise InvalidFrameException( 554 'Control frames must not be fragmented') 555 556 self._original_opcode = frame.opcode 557 self._received_fragments.append(frame.payload) 558 return None 559 560 def _process_close_message(self, message): 561 """Processes close message. 562 563 Args: 564 message: close message. 565 566 Raises: 567 InvalidFrameException: when the message is invalid. 568 """ 569 570 self._request.client_terminated = True 571 572 # Status code is optional. We can have status reason only if we 573 # have status code. Status reason can be empty string. So, 574 # allowed cases are 575 # - no application data: no code no reason 576 # - 2 octet of application data: has code but no reason 577 # - 3 or more octet of application data: both code and reason 578 if len(message) == 0: 579 self._logger.debug('Received close frame (empty body)') 580 self._request.ws_close_code = ( 581 common.STATUS_NO_STATUS_RECEIVED) 582 elif len(message) == 1: 583 raise InvalidFrameException( 584 'If a close frame has status code, the length of ' 585 'status code must be 2 octet') 586 elif len(message) >= 2: 587 self._request.ws_close_code = struct.unpack( 588 '!H', message[0:2])[0] 589 self._request.ws_close_reason = message[2:].decode( 590 'utf-8', 'replace') 591 self._logger.debug( 592 'Received close frame (code=%d, reason=%r)', 593 self._request.ws_close_code, 594 self._request.ws_close_reason) 595 596 # Drain junk data after the close frame if necessary. 597 self._drain_received_data() 598 599 if self._request.server_terminated: 600 self._logger.debug( 601 'Received ack for server-initiated closing handshake') 602 return 603 604 self._logger.debug( 605 'Received client-initiated closing handshake') 606 607 code = common.STATUS_NORMAL_CLOSURE 608 reason = '' 609 if hasattr(self._request, '_dispatcher'): 610 dispatcher = self._request._dispatcher 611 code, reason = dispatcher.passive_closing_handshake( 612 self._request) 613 if code is None and reason is not None and len(reason) > 0: 614 self._logger.warning( 615 'Handler specified reason despite code being None') 616 reason = '' 617 if reason is None: 618 reason = '' 619 self._send_closing_handshake(code, reason) 620 self._logger.debug( 621 'Sent ack for client-initiated closing handshake ' 622 '(code=%r, reason=%r)', code, reason) 623 624 def _process_ping_message(self, message): 625 """Processes ping message. 626 627 Args: 628 message: ping message. 629 """ 630 631 try: 632 handler = self._request.on_ping_handler 633 if handler: 634 handler(self._request, message) 635 return 636 except AttributeError, e: 637 pass 638 self._send_pong(message) 639 640 def _process_pong_message(self, message): 641 """Processes pong message. 642 643 Args: 644 message: pong message. 645 """ 646 647 # TODO(tyoshino): Add ping timeout handling. 648 649 inflight_pings = deque() 650 651 while True: 652 try: 653 expected_body = self._ping_queue.popleft() 654 if expected_body == message: 655 # inflight_pings contains pings ignored by the 656 # other peer. Just forget them. 657 self._logger.debug( 658 'Ping %r is acked (%d pings were ignored)', 659 expected_body, len(inflight_pings)) 660 break 661 else: 662 inflight_pings.append(expected_body) 663 except IndexError, e: 664 # The received pong was unsolicited pong. Keep the 665 # ping queue as is. 666 self._ping_queue = inflight_pings 667 self._logger.debug('Received a unsolicited pong') 668 break 669 670 try: 671 handler = self._request.on_pong_handler 672 if handler: 673 handler(self._request, message) 674 except AttributeError, e: 675 pass 676 677 def receive_message(self): 678 """Receive a WebSocket frame and return its payload as a text in 679 unicode or a binary in str. 680 681 Returns: 682 payload data of the frame 683 - as unicode instance if received text frame 684 - as str instance if received binary frame 685 or None iff received closing handshake. 686 Raises: 687 BadOperationException: when called on a client-terminated 688 connection. 689 ConnectionTerminatedException: when read returns empty 690 string. 691 InvalidFrameException: when the frame contains invalid 692 data. 693 UnsupportedFrameException: when the received frame has 694 flags, opcode we cannot handle. You can ignore this 695 exception and continue receiving the next frame. 696 """ 697 698 if self._request.client_terminated: 699 raise BadOperationException( 700 'Requested receive_message after receiving a closing ' 701 'handshake') 702 703 while True: 704 # mp_conn.read will block if no bytes are available. 705 # Timeout is controlled by TimeOut directive of Apache. 706 707 frame = self._receive_frame_as_frame_object() 708 709 # Check the constraint on the payload size for control frames 710 # before extension processes the frame. 711 # See also http://tools.ietf.org/html/rfc6455#section-5.5 712 if (common.is_control_opcode(frame.opcode) and 713 len(frame.payload) > 125): 714 raise InvalidFrameException( 715 'Payload data size of control frames must be 125 bytes or ' 716 'less') 717 718 for frame_filter in self._options.incoming_frame_filters: 719 frame_filter.filter(frame) 720 721 if frame.rsv1 or frame.rsv2 or frame.rsv3: 722 raise UnsupportedFrameException( 723 'Unsupported flag is set (rsv = %d%d%d)' % 724 (frame.rsv1, frame.rsv2, frame.rsv3)) 725 726 message = self._get_message_from_frame(frame) 727 if message is None: 728 continue 729 730 for message_filter in self._options.incoming_message_filters: 731 message = message_filter.filter(message) 732 733 if self._original_opcode == common.OPCODE_TEXT: 734 # The WebSocket protocol section 4.4 specifies that invalid 735 # characters must be replaced with U+fffd REPLACEMENT 736 # CHARACTER. 737 try: 738 return message.decode('utf-8') 739 except UnicodeDecodeError, e: 740 raise InvalidUTF8Exception(e) 741 elif self._original_opcode == common.OPCODE_BINARY: 742 return message 743 elif self._original_opcode == common.OPCODE_CLOSE: 744 self._process_close_message(message) 745 return None 746 elif self._original_opcode == common.OPCODE_PING: 747 self._process_ping_message(message) 748 elif self._original_opcode == common.OPCODE_PONG: 749 self._process_pong_message(message) 750 else: 751 raise UnsupportedFrameException( 752 'Opcode %d is not supported' % self._original_opcode) 753 754 def _send_closing_handshake(self, code, reason): 755 body = create_closing_handshake_body(code, reason) 756 frame = create_close_frame( 757 body, mask=self._options.mask_send, 758 frame_filters=self._options.outgoing_frame_filters) 759 760 self._request.server_terminated = True 761 762 self._write(frame) 763 764 def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''): 765 """Closes a WebSocket connection. 766 767 Args: 768 code: Status code for close frame. If code is None, a close 769 frame with empty body will be sent. 770 reason: string representing close reason. 771 Raises: 772 BadOperationException: when reason is specified with code None 773 or reason is not an instance of both str and unicode. 774 """ 775 776 if self._request.server_terminated: 777 self._logger.debug( 778 'Requested close_connection but server is already terminated') 779 return 780 781 if code is None: 782 if reason is not None and len(reason) > 0: 783 raise BadOperationException( 784 'close reason must not be specified if code is None') 785 reason = '' 786 else: 787 if not isinstance(reason, str) and not isinstance(reason, unicode): 788 raise BadOperationException( 789 'close reason must be an instance of str or unicode') 790 791 self._send_closing_handshake(code, reason) 792 self._logger.debug( 793 'Sent server-initiated closing handshake (code=%r, reason=%r)', 794 code, reason) 795 796 if (code == common.STATUS_GOING_AWAY or 797 code == common.STATUS_PROTOCOL_ERROR): 798 # It doesn't make sense to wait for a close frame if the reason is 799 # protocol error or that the server is going away. For some of 800 # other reasons, it might not make sense to wait for a close frame, 801 # but it's not clear, yet. 802 return 803 804 # TODO(ukai): 2. wait until the /client terminated/ flag has been set, 805 # or until a server-defined timeout expires. 806 # 807 # For now, we expect receiving closing handshake right after sending 808 # out closing handshake. 809 message = self.receive_message() 810 if message is not None: 811 raise ConnectionTerminatedException( 812 'Didn\'t receive valid ack for closing handshake') 813 # TODO: 3. close the WebSocket connection. 814 # note: mod_python Connection (mp_conn) doesn't have close method. 815 816 def send_ping(self, body=''): 817 frame = create_ping_frame( 818 body, 819 self._options.mask_send, 820 self._options.outgoing_frame_filters) 821 self._write(frame) 822 823 self._ping_queue.append(body) 824 825 def _send_pong(self, body): 826 frame = create_pong_frame( 827 body, 828 self._options.mask_send, 829 self._options.outgoing_frame_filters) 830 self._write(frame) 831 832 def get_last_received_opcode(self): 833 """Returns the opcode of the WebSocket message which the last received 834 frame belongs to. The return value is valid iff immediately after 835 receive_message call. 836 """ 837 838 return self._original_opcode 839 840 def _drain_received_data(self): 841 """Drains unread data in the receive buffer to avoid sending out TCP 842 RST packet. This is because when deflate-stream is enabled, some 843 DEFLATE block for flushing data may follow a close frame. If any data 844 remains in the receive buffer of a socket when the socket is closed, 845 it sends out TCP RST packet to the other peer. 846 847 Since mod_python's mp_conn object doesn't support non-blocking read, 848 we perform this only when pywebsocket is running in standalone mode. 849 """ 850 851 # If self._options.deflate_stream is true, self._request is 852 # DeflateRequest, so we can get wrapped request object by 853 # self._request._request. 854 # 855 # Only _StandaloneRequest has _drain_received_data method. 856 if (self._options.deflate_stream and 857 ('_drain_received_data' in dir(self._request._request))): 858 self._request._request._drain_received_data() 859 860 861# vi:sts=4 sw=4 et 862