1# Copyright 2012, Google Inc. 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above 11# copyright notice, this list of conditions and the following disclaimer 12# in the documentation and/or other materials provided with the 13# distribution. 14# * Neither the name of Google Inc. nor the names of its 15# contributors may be used to endorse or promote products derived from 16# this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31"""This file provides classes and helper functions for multiplexing extension. 32 33Specification: 34http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-03 35""" 36 37 38import collections 39import copy 40import email 41import email.parser 42import logging 43import math 44import struct 45import threading 46import traceback 47 48from mod_pywebsocket import common 49from mod_pywebsocket import handshake 50from mod_pywebsocket import util 51from mod_pywebsocket._stream_base import BadOperationException 52from mod_pywebsocket._stream_base import ConnectionTerminatedException 53from mod_pywebsocket._stream_hybi import Frame 54from mod_pywebsocket._stream_hybi import Stream 55from mod_pywebsocket._stream_hybi import StreamOptions 56from mod_pywebsocket._stream_hybi import create_binary_frame 57from mod_pywebsocket._stream_hybi import create_closing_handshake_body 58from mod_pywebsocket._stream_hybi import create_header 59from mod_pywebsocket._stream_hybi import parse_frame 60from mod_pywebsocket.handshake import hybi 61 62 63_CONTROL_CHANNEL_ID = 0 64_DEFAULT_CHANNEL_ID = 1 65 66_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0 67_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1 68_MUX_OPCODE_FLOW_CONTROL = 2 69_MUX_OPCODE_DROP_CHANNEL = 3 70_MUX_OPCODE_NEW_CHANNEL_SLOT = 4 71 72_MAX_CHANNEL_ID = 2 ** 29 - 1 73 74_INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64 75_INITIAL_QUOTA_FOR_CLIENT = 8 * 1024 76 77# We need only these status code for now. 78_HTTP_BAD_RESPONSE_MESSAGES = { 79 common.HTTP_STATUS_BAD_REQUEST: 'Bad Request', 80} 81 82 83class MuxUnexpectedException(Exception): 84 """Exception in handling multiplexing extension.""" 85 pass 86 87 88# Temporary 89class MuxNotImplementedException(Exception): 90 """Raised when a flow enters unimplemented code path.""" 91 pass 92 93 94class InvalidMuxFrameException(Exception): 95 """Raised when an invalid multiplexed frame received.""" 96 pass 97 98 99class InvalidMuxControlBlockException(Exception): 100 """Raised when an invalid multiplexing control block received.""" 101 pass 102 103 104class LogicalConnectionClosedException(Exception): 105 """Raised when logical connection is gracefully closed.""" 106 pass 107 108 109def _encode_channel_id(channel_id): 110 if channel_id < 0: 111 raise ValueError('Channel id %d must not be negative' % channel_id) 112 113 if channel_id < 2 ** 7: 114 return chr(channel_id) 115 if channel_id < 2 ** 14: 116 return struct.pack('!H', 0x8000 + channel_id) 117 if channel_id < 2 ** 21: 118 first = chr(0xc0 + (channel_id >> 16)) 119 return first + struct.pack('!H', channel_id & 0xffff) 120 if channel_id < 2 ** 29: 121 return struct.pack('!L', 0xe0000000 + channel_id) 122 123 raise ValueError('Channel id %d is too large' % channel_id) 124 125 126def _size_of_number_in_bytes_minus_1(number): 127 # Calculate the minimum number of bytes minus 1 that are required to store 128 # the data. 129 if number < 0: 130 raise ValueError('Invalid number: %d' % number) 131 elif number < 2 ** 8: 132 return 0 133 elif number < 2 ** 16: 134 return 1 135 elif number < 2 ** 24: 136 return 2 137 elif number < 2 ** 32: 138 return 3 139 else: 140 raise ValueError('Invalid number %d' % number) 141 142 143def _encode_number(number): 144 if number < 2 ** 8: 145 return chr(number) 146 elif number < 2 ** 16: 147 return struct.pack('!H', number) 148 elif number < 2 ** 24: 149 return chr(number >> 16) + struct.pack('!H', number & 0xffff) 150 else: 151 return struct.pack('!L', number) 152 153 154def _create_control_block_length_value(channel_id, opcode, flags, value): 155 """Creates a control block that consists of objective channel id, opcode, 156 flags, encoded length of opcode specific value, and the value. 157 Most of control blocks have this structure. 158 159 Args: 160 channel_id: objective channel id. 161 opcode: opcode of the control block. 162 flags: 3bit opcode specific flags. 163 value: opcode specific data. 164 """ 165 166 if channel_id < 0 or channel_id > _MAX_CHANNEL_ID: 167 raise ValueError('Invalid channel id: %d' % channel_id) 168 if (opcode != _MUX_OPCODE_ADD_CHANNEL_REQUEST and 169 opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE and 170 opcode != _MUX_OPCODE_DROP_CHANNEL): 171 raise ValueError('Invalid opcode: %d' % opcode) 172 if flags < 0 or flags > 7: 173 raise ValueError('Invalid flags: %x' % flags) 174 length = len(value) 175 if length < 0 or length > 2 ** 32 - 1: 176 raise ValueError('Invalid length: %d' % length) 177 178 # The first byte consists of opcode, opcode specific flags, and size of 179 # the size of value in bytes minus 1. 180 bytes_of_length = _size_of_number_in_bytes_minus_1(length) 181 first_byte = (opcode << 5) | (flags << 2) | bytes_of_length 182 183 encoded_length = _encode_number(length) 184 185 return (chr(first_byte) + _encode_channel_id(channel_id) + 186 encoded_length + value) 187 188 189def _create_add_channel_response(channel_id, encoded_handshake, 190 encoding=0, rejected=False, 191 outer_frame_mask=False): 192 if encoding != 0 and encoding != 1: 193 raise ValueError('Invalid encoding %d' % encoding) 194 195 flags = (rejected << 2) | encoding 196 block = _create_control_block_length_value( 197 channel_id, _MUX_OPCODE_ADD_CHANNEL_RESPONSE, flags, encoded_handshake) 198 payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block 199 return create_binary_frame(payload, mask=outer_frame_mask) 200 201 202def _create_drop_channel(channel_id, reason='', mux_error=False, 203 outer_frame_mask=False): 204 if not mux_error and len(reason) > 0: 205 raise ValueError('Reason must be empty if mux_error is False') 206 207 flags = mux_error << 2 208 block = _create_control_block_length_value( 209 channel_id, _MUX_OPCODE_DROP_CHANNEL, flags, reason) 210 payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block 211 return create_binary_frame(payload, mask=outer_frame_mask) 212 213 214def _create_flow_control(channel_id, replenished_quota, 215 outer_frame_mask=False): 216 if replenished_quota < 0 or replenished_quota >= 2 ** 32: 217 raise ValueError('Invalid quota: %d' % replenished_quota) 218 first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | 219 _size_of_number_in_bytes_minus_1(replenished_quota)) 220 payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) + 221 _encode_channel_id(channel_id) + 222 _encode_number(replenished_quota)) 223 return create_binary_frame(payload, mask=outer_frame_mask) 224 225 226def _create_new_channel_slot(slots, send_quota, outer_frame_mask=False): 227 if slots < 0 or slots >= 2 ** 32: 228 raise ValueError('Invalid number of slots: %d' % slots) 229 if send_quota < 0 or send_quota >= 2 ** 32: 230 raise ValueError('Invalid send quota: %d' % send_quota) 231 slots_size = _size_of_number_in_bytes_minus_1(slots) 232 send_quota_size = _size_of_number_in_bytes_minus_1(send_quota) 233 234 first_byte = ((_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) | 235 (slots_size << 2) | send_quota_size) 236 payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) + 237 _encode_number(slots) + _encode_number(send_quota)) 238 return create_binary_frame(payload, mask=outer_frame_mask) 239 240 241def _parse_request_text(request_text): 242 request_line, header_lines = request_text.split('\r\n', 1) 243 244 words = request_line.split(' ') 245 if len(words) != 3: 246 raise ValueError('Bad Request-Line syntax %r' % request_line) 247 [command, path, version] = words 248 if version != 'HTTP/1.1': 249 raise ValueError('Bad request version %r' % version) 250 251 # email.parser.Parser() parses RFC 2822 (RFC 822) style headers. 252 # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers 253 # RFC 822. 254 headers = email.parser.Parser().parsestr(header_lines) 255 return command, path, version, headers 256 257 258class _ControlBlock(object): 259 """A structure that holds parsing result of multiplexing control block. 260 Control block specific attributes will be added by _MuxFramePayloadParser. 261 (e.g. encoded_handshake will be added for AddChannelRequest and 262 AddChannelResponse) 263 """ 264 265 def __init__(self, opcode): 266 self.opcode = opcode 267 268 269class _MuxFramePayloadParser(object): 270 """A class that parses multiplexed frame payload.""" 271 272 def __init__(self, payload): 273 self._data = payload 274 self._read_position = 0 275 self._logger = util.get_class_logger(self) 276 277 def read_channel_id(self): 278 """Reads channel id. 279 280 Raises: 281 InvalidMuxFrameException: when the payload doesn't contain 282 valid channel id. 283 """ 284 285 remaining_length = len(self._data) - self._read_position 286 pos = self._read_position 287 if remaining_length == 0: 288 raise InvalidMuxFrameException('No channel id found') 289 290 channel_id = ord(self._data[pos]) 291 channel_id_length = 1 292 if channel_id & 0xe0 == 0xe0: 293 if remaining_length < 4: 294 raise InvalidMuxFrameException( 295 'Invalid channel id format') 296 channel_id = struct.unpack('!L', 297 self._data[pos:pos+4])[0] & 0x1fffffff 298 channel_id_length = 4 299 elif channel_id & 0xc0 == 0xc0: 300 if remaining_length < 3: 301 raise InvalidMuxFrameException( 302 'Invalid channel id format') 303 channel_id = (((channel_id & 0x1f) << 16) + 304 struct.unpack('!H', self._data[pos+1:pos+3])[0]) 305 channel_id_length = 3 306 elif channel_id & 0x80 == 0x80: 307 if remaining_length < 2: 308 raise InvalidMuxFrameException( 309 'Invalid channel id format') 310 channel_id = struct.unpack('!H', 311 self._data[pos:pos+2])[0] & 0x3fff 312 channel_id_length = 2 313 self._read_position += channel_id_length 314 315 return channel_id 316 317 def read_inner_frame(self): 318 """Reads an inner frame. 319 320 Raises: 321 InvalidMuxFrameException: when the inner frame is invalid. 322 """ 323 324 if len(self._data) == self._read_position: 325 raise InvalidMuxFrameException('No inner frame bits found') 326 bits = ord(self._data[self._read_position]) 327 self._read_position += 1 328 fin = (bits & 0x80) == 0x80 329 rsv1 = (bits & 0x40) == 0x40 330 rsv2 = (bits & 0x20) == 0x20 331 rsv3 = (bits & 0x10) == 0x10 332 opcode = bits & 0xf 333 payload = self.remaining_data() 334 # Consume rest of the message which is payload data of the original 335 # frame. 336 self._read_position = len(self._data) 337 return fin, rsv1, rsv2, rsv3, opcode, payload 338 339 def _read_number(self, size): 340 if self._read_position + size > len(self._data): 341 raise InvalidMuxControlBlock( 342 'Cannot read %d byte(s) number' % size) 343 344 pos = self._read_position 345 if size == 1: 346 self._read_position += 1 347 return ord(self._data[pos]) 348 elif size == 2: 349 self._read_position += 2 350 return struct.unpack('!H', self._data[pos:pos+2])[0] 351 elif size == 3: 352 self._read_position += 3 353 return ((ord(self._data[pos]) << 16) 354 + struct.unpack('!H', self._data[pos+1:pos+3])[0]) 355 elif size == 4: 356 self._read_position += 4 357 return struct.unpack('!L', self._data[pos:pos+4])[0] 358 else: 359 raise InvalidMuxControlBlockException( 360 'Cannot read %d byte(s) number' % size) 361 362 def _read_opcode_specific_data(self, opcode, size_of_size): 363 """Reads opcode specific data that consists of followings: 364 - the size of the opcode specific data (1-4 bytes) 365 - the opcode specific data 366 AddChannelRequest and DropChannel have this structure. 367 """ 368 369 if self._read_position + size_of_size > len(self._data): 370 raise InvalidMuxControlBlockException( 371 'No size field for opcode %d' % opcode) 372 373 size = self._read_number(size_of_size) 374 375 pos = self._read_position 376 if pos + size > len(self._data): 377 raise InvalidMuxControlBlockException( 378 'No data field for opcode %d (%d + %d > %d)' % 379 (opcode, pos, size, len(self._data))) 380 381 specific_data = self._data[pos:pos+size] 382 self._read_position += size 383 return specific_data 384 385 def _read_add_channel_request(self, first_byte, control_block): 386 reserved = (first_byte >> 4) & 0x1 387 encoding = (first_byte >> 2) & 0x3 388 size_of_handshake_size = (first_byte & 0x3) + 1 389 390 control_block.channel_id = self.read_channel_id() 391 encoded_handshake = self._read_opcode_specific_data( 392 _MUX_OPCODE_ADD_CHANNEL_REQUEST, 393 size_of_handshake_size) 394 control_block.encoding = encoding 395 control_block.encoded_handshake = encoded_handshake 396 return control_block 397 398 def _read_flow_control(self, first_byte, control_block): 399 quota_size = (first_byte & 0x3) + 1 400 control_block.channel_id = self.read_channel_id() 401 control_block.send_quota = self._read_number(quota_size) 402 return control_block 403 404 def _read_drop_channel(self, first_byte, control_block): 405 mux_error = (first_byte >> 4) & 0x1 406 reserved = (first_byte >> 2) & 0x3 407 size_of_reason_size = (first_byte & 0x3) + 1 408 409 control_block.channel_id = self.read_channel_id() 410 reason = self._read_opcode_specific_data( 411 _MUX_OPCODE_ADD_CHANNEL_RESPONSE, 412 size_of_reason_size) 413 if mux_error and len(reason) > 0: 414 raise InvalidMuxControlBlockException( 415 'Reason must be empty when F bit is set') 416 control_block.mux_error = mux_error 417 control_block.reason = reason 418 return control_block 419 420 def _read_new_channel_slot(self, first_byte, control_block): 421 # TODO(bashi): Implement 422 raise MuxNotImplementedException('NewChannelSlot is not implemented') 423 424 def read_control_blocks(self): 425 """Reads control block(s). 426 427 Raises: 428 InvalidMuxControlBlock: when the payload contains invalid control 429 block(s). 430 StopIteration: when no control blocks left. 431 """ 432 433 while self._read_position < len(self._data): 434 if self._read_position >= len(self._data): 435 raise InvalidMuxControlBlockException( 436 'No control opcode found') 437 first_byte = ord(self._data[self._read_position]) 438 self._read_position += 1 439 opcode = (first_byte >> 5) & 0x7 440 control_block = _ControlBlock(opcode=opcode) 441 if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: 442 yield self._read_add_channel_request(first_byte, control_block) 443 elif opcode == _MUX_OPCODE_FLOW_CONTROL: 444 yield self._read_flow_control(first_byte, control_block) 445 elif opcode == _MUX_OPCODE_DROP_CHANNEL: 446 yield self._read_drop_channel(first_byte, control_block) 447 elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 448 yield self._read_new_channel_slot(first_byte, control_block) 449 else: 450 raise InvalidMuxControlBlockException( 451 'Invalid opcode %d' % opcode) 452 assert self._read_position == len(self._data) 453 raise StopIteration 454 455 def remaining_data(self): 456 """Returns remaining data.""" 457 458 return self._data[self._read_position:] 459 460 461class _LogicalRequest(object): 462 """Mimics mod_python request.""" 463 464 def __init__(self, channel_id, command, path, headers, connection): 465 """Constructs an instance. 466 467 Args: 468 channel_id: the channel id of the logical channel. 469 command: HTTP request command. 470 path: HTTP request path. 471 headers: HTTP headers. 472 connection: _LogicalConnection instance. 473 """ 474 475 self.channel_id = channel_id 476 self.method = command 477 self.uri = path 478 self.headers_in = headers 479 self.connection = connection 480 self.server_terminated = False 481 self.client_terminated = False 482 483 def is_https(self): 484 """Mimics request.is_https(). Returns False because this method is 485 used only by old protocols (hixie and hybi00). 486 """ 487 488 return False 489 490 491class _LogicalConnection(object): 492 """Mimics mod_python mp_conn.""" 493 494 # For details, see the comment of set_read_state(). 495 STATE_ACTIVE = 1 496 STATE_GRACEFULLY_CLOSED = 2 497 STATE_TERMINATED = 3 498 499 def __init__(self, mux_handler, channel_id): 500 """Constructs an instance. 501 502 Args: 503 mux_handler: _MuxHandler instance. 504 channel_id: channel id of this connection. 505 """ 506 507 self._mux_handler = mux_handler 508 self._channel_id = channel_id 509 self._incoming_data = '' 510 self._write_condition = threading.Condition() 511 self._waiting_write_completion = False 512 self._read_condition = threading.Condition() 513 self._read_state = self.STATE_ACTIVE 514 515 def get_local_addr(self): 516 """Getter to mimic mp_conn.local_addr.""" 517 518 return self._mux_handler.physical_connection.get_local_addr() 519 local_addr = property(get_local_addr) 520 521 def get_remote_addr(self): 522 """Getter to mimic mp_conn.remote_addr.""" 523 524 return self._mux_handler.physical_connection.get_remote_addr() 525 remote_addr = property(get_remote_addr) 526 527 def get_memorized_lines(self): 528 """Gets memorized lines. Not supported.""" 529 530 raise MuxUnexpectedException('_LogicalConnection does not support ' 531 'get_memorized_lines') 532 533 def write(self, data): 534 """Writes data. mux_handler sends data asynchronously. The caller will 535 be suspended until write done. 536 537 Args: 538 data: data to be written. 539 540 Raises: 541 MuxUnexpectedException: when called before finishing the previous 542 write. 543 """ 544 545 try: 546 self._write_condition.acquire() 547 if self._waiting_write_completion: 548 raise MuxUnexpectedException( 549 'Logical connection %d is already waiting the completion ' 550 'of write' % self._channel_id) 551 552 self._waiting_write_completion = True 553 self._mux_handler.send_data(self._channel_id, data) 554 self._write_condition.wait() 555 finally: 556 self._write_condition.release() 557 558 def write_control_data(self, data): 559 """Writes data via the control channel. Don't wait finishing write 560 because this method can be called by mux dispatcher. 561 562 Args: 563 data: data to be written. 564 """ 565 566 self._mux_handler.send_control_data(data) 567 568 def notify_write_done(self): 569 """Called when sending data is completed.""" 570 571 try: 572 self._write_condition.acquire() 573 if not self._waiting_write_completion: 574 raise MuxUnexpectedException( 575 'Invalid call of notify_write_done for logical connection' 576 ' %d' % self._channel_id) 577 self._waiting_write_completion = False 578 self._write_condition.notify() 579 finally: 580 self._write_condition.release() 581 582 def append_frame_data(self, frame_data): 583 """Appends incoming frame data. Called when mux_handler dispatches 584 frame data to the corresponding application. 585 586 Args: 587 frame_data: incoming frame data. 588 """ 589 590 self._read_condition.acquire() 591 self._incoming_data += frame_data 592 self._read_condition.notify() 593 self._read_condition.release() 594 595 def read(self, length): 596 """Reads data. Blocks until enough data has arrived via physical 597 connection. 598 599 Args: 600 length: length of data to be read. 601 Raises: 602 LogicalConnectionClosedException: when closing handshake for this 603 logical channel has been received. 604 ConnectionTerminatedException: when the physical connection has 605 closed, or an error is caused on the reader thread. 606 """ 607 608 self._read_condition.acquire() 609 while (self._read_state == self.STATE_ACTIVE and 610 len(self._incoming_data) < length): 611 self._read_condition.wait() 612 613 try: 614 if self._read_state == self.STATE_GRACEFULLY_CLOSED: 615 raise LogicalConnectionClosedException( 616 'Logical channel %d has closed.' % self._channel_id) 617 elif self._read_state == self.STATE_TERMINATED: 618 raise ConnectionTerminatedException( 619 'Receiving %d byte failed. Logical channel (%d) closed' % 620 (length, self._channel_id)) 621 622 value = self._incoming_data[:length] 623 self._incoming_data = self._incoming_data[length:] 624 finally: 625 self._read_condition.release() 626 627 return value 628 629 def set_read_state(self, new_state): 630 """Sets the state of this connection. Called when an event for this 631 connection has occurred. 632 633 Args: 634 new_state: state to be set. new_state must be one of followings: 635 - STATE_GRACEFULLY_CLOSED: when closing handshake for this 636 connection has been received. 637 - STATE_TERMINATED: when the physical connection has closed or 638 DropChannel of this connection has received. 639 """ 640 641 self._read_condition.acquire() 642 self._read_state = new_state 643 self._read_condition.notify() 644 self._read_condition.release() 645 646 647class _LogicalStream(Stream): 648 """Mimics the Stream class. This class interprets multiplexed WebSocket 649 frames. 650 """ 651 652 def __init__(self, request, send_quota, receive_quota): 653 """Constructs an instance. 654 655 Args: 656 request: _LogicalRequest instance. 657 send_quota: Initial send quota. 658 receive_quota: Initial receive quota. 659 """ 660 661 # TODO(bashi): Support frame filters. 662 stream_options = StreamOptions() 663 # Physical stream is responsible for masking. 664 stream_options.unmask_receive = False 665 # Control frames can be fragmented on logical channel. 666 stream_options.allow_fragmented_control_frame = True 667 Stream.__init__(self, request, stream_options) 668 self._send_quota = send_quota 669 self._send_quota_condition = threading.Condition() 670 self._receive_quota = receive_quota 671 self._write_inner_frame_semaphore = threading.Semaphore() 672 673 def _create_inner_frame(self, opcode, payload, end=True): 674 # TODO(bashi): Support extensions that use reserved bits. 675 first_byte = (end << 7) | opcode 676 return (_encode_channel_id(self._request.channel_id) + 677 chr(first_byte) + payload) 678 679 def _write_inner_frame(self, opcode, payload, end=True): 680 payload_length = len(payload) 681 write_position = 0 682 683 try: 684 # An inner frame will be fragmented if there is no enough send 685 # quota. This semaphore ensures that fragmented inner frames are 686 # sent in order on the logical channel. 687 # Note that frames that come from other logical channels or 688 # multiplexing control blocks can be inserted between fragmented 689 # inner frames on the physical channel. 690 self._write_inner_frame_semaphore.acquire() 691 while write_position < payload_length: 692 try: 693 self._send_quota_condition.acquire() 694 while self._send_quota == 0: 695 self._logger.debug( 696 'No quota. Waiting FlowControl message for %d.' % 697 self._request.channel_id) 698 self._send_quota_condition.wait() 699 700 remaining = payload_length - write_position 701 write_length = min(self._send_quota, remaining) 702 inner_frame_end = ( 703 end and 704 (write_position + write_length == payload_length)) 705 706 inner_frame = self._create_inner_frame( 707 opcode, 708 payload[write_position:write_position+write_length], 709 inner_frame_end) 710 frame_data = self._writer.build( 711 inner_frame, end=True, binary=True) 712 self._send_quota -= write_length 713 self._logger.debug('Consumed quota=%d, remaining=%d' % 714 (write_length, self._send_quota)) 715 finally: 716 self._send_quota_condition.release() 717 718 # Writing data will block the worker so we need to release 719 # _send_quota_condition before writing. 720 self._logger.debug('Sending inner frame: %r' % frame_data) 721 self._request.connection.write(frame_data) 722 write_position += write_length 723 724 opcode = common.OPCODE_CONTINUATION 725 726 except ValueError, e: 727 raise BadOperationException(e) 728 finally: 729 self._write_inner_frame_semaphore.release() 730 731 def replenish_send_quota(self, send_quota): 732 """Replenish send quota.""" 733 734 self._send_quota_condition.acquire() 735 self._send_quota += send_quota 736 self._logger.debug('Replenished send quota for channel id %d: %d' % 737 (self._request.channel_id, self._send_quota)) 738 self._send_quota_condition.notify() 739 self._send_quota_condition.release() 740 741 def consume_receive_quota(self, amount): 742 """Consumes receive quota. Returns False on failure.""" 743 744 if self._receive_quota < amount: 745 self._logger.debug('Violate quota on channel id %d: %d < %d' % 746 (self._request.channel_id, 747 self._receive_quota, amount)) 748 return False 749 self._receive_quota -= amount 750 return True 751 752 def send_message(self, message, end=True, binary=False): 753 """Override Stream.send_message.""" 754 755 if self._request.server_terminated: 756 raise BadOperationException( 757 'Requested send_message after sending out a closing handshake') 758 759 if binary and isinstance(message, unicode): 760 raise BadOperationException( 761 'Message for binary frame must be instance of str') 762 763 if binary: 764 opcode = common.OPCODE_BINARY 765 else: 766 opcode = common.OPCODE_TEXT 767 message = message.encode('utf-8') 768 769 self._write_inner_frame(opcode, message, end) 770 771 def _receive_frame(self): 772 """Overrides Stream._receive_frame. 773 774 In addition to call Stream._receive_frame, this method adds the amount 775 of payload to receiving quota and sends FlowControl to the client. 776 We need to do it here because Stream.receive_message() handles 777 control frames internally. 778 """ 779 780 opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self) 781 amount = len(payload) 782 self._receive_quota += amount 783 frame_data = _create_flow_control(self._request.channel_id, 784 amount) 785 self._logger.debug('Sending flow control for %d, replenished=%d' % 786 (self._request.channel_id, amount)) 787 self._request.connection.write_control_data(frame_data) 788 return opcode, payload, fin, rsv1, rsv2, rsv3 789 790 def receive_message(self): 791 """Overrides Stream.receive_message.""" 792 793 # Just call Stream.receive_message(), but catch 794 # LogicalConnectionClosedException, which is raised when the logical 795 # connection has closed gracefully. 796 try: 797 return Stream.receive_message(self) 798 except LogicalConnectionClosedException, e: 799 self._logger.debug('%s', e) 800 return None 801 802 def _send_closing_handshake(self, code, reason): 803 """Overrides Stream._send_closing_handshake.""" 804 805 body = create_closing_handshake_body(code, reason) 806 self._logger.debug('Sending closing handshake for %d: (%r, %r)' % 807 (self._request.channel_id, code, reason)) 808 self._write_inner_frame(common.OPCODE_CLOSE, body, end=True) 809 810 self._request.server_terminated = True 811 812 def send_ping(self, body=''): 813 """Overrides Stream.send_ping""" 814 815 self._logger.debug('Sending ping on logical channel %d: %r' % 816 (self._request.channel_id, body)) 817 self._write_inner_frame(common.OPCODE_PING, body, end=True) 818 819 self._ping_queue.append(body) 820 821 def _send_pong(self, body): 822 """Overrides Stream._send_pong""" 823 824 self._logger.debug('Sending pong on logical channel %d: %r' % 825 (self._request.channel_id, body)) 826 self._write_inner_frame(common.OPCODE_PONG, body, end=True) 827 828 def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''): 829 """Overrides Stream.close_connection.""" 830 831 # TODO(bashi): Implement 832 self._logger.debug('Closing logical connection %d' % 833 self._request.channel_id) 834 self._request.server_terminated = True 835 836 def _drain_received_data(self): 837 """Overrides Stream._drain_received_data. Nothing need to be done for 838 logical channel. 839 """ 840 841 pass 842 843 844class _OutgoingData(object): 845 """A structure that holds data to be sent via physical connection and 846 origin of the data. 847 """ 848 849 def __init__(self, channel_id, data): 850 self.channel_id = channel_id 851 self.data = data 852 853 854class _PhysicalConnectionWriter(threading.Thread): 855 """A thread that is responsible for writing data to physical connection. 856 857 TODO(bashi): Make sure there is no thread-safety problem when the reader 858 thread reads data from the same socket at a time. 859 """ 860 861 def __init__(self, mux_handler): 862 """Constructs an instance. 863 864 Args: 865 mux_handler: _MuxHandler instance. 866 """ 867 868 threading.Thread.__init__(self) 869 self._logger = util.get_class_logger(self) 870 self._mux_handler = mux_handler 871 self.setDaemon(True) 872 self._stop_requested = False 873 self._deque = collections.deque() 874 self._deque_condition = threading.Condition() 875 876 def put_outgoing_data(self, data): 877 """Puts outgoing data. 878 879 Args: 880 data: _OutgoingData instance. 881 882 Raises: 883 BadOperationException: when the thread has been requested to 884 terminate. 885 """ 886 887 try: 888 self._deque_condition.acquire() 889 if self._stop_requested: 890 raise BadOperationException('Cannot write data anymore') 891 892 self._deque.append(data) 893 self._deque_condition.notify() 894 finally: 895 self._deque_condition.release() 896 897 def _write_data(self, outgoing_data): 898 try: 899 self._mux_handler.physical_connection.write(outgoing_data.data) 900 except Exception, e: 901 util.prepend_message_to_exception( 902 'Failed to send message to %r: ' % 903 (self._mux_handler.physical_connection.remote_addr,), e) 904 raise 905 906 # TODO(bashi): It would be better to block the thread that sends 907 # control data as well. 908 if outgoing_data.channel_id != _CONTROL_CHANNEL_ID: 909 self._mux_handler.notify_write_done(outgoing_data.channel_id) 910 911 def run(self): 912 self._deque_condition.acquire() 913 while not self._stop_requested: 914 if len(self._deque) == 0: 915 self._deque_condition.wait() 916 continue 917 918 outgoing_data = self._deque.popleft() 919 self._deque_condition.release() 920 self._write_data(outgoing_data) 921 self._deque_condition.acquire() 922 923 # Flush deque 924 try: 925 while len(self._deque) > 0: 926 outgoing_data = self._deque.popleft() 927 self._write_data(outgoing_data) 928 finally: 929 self._deque_condition.release() 930 931 def stop(self): 932 """Stops the writer thread.""" 933 934 self._deque_condition.acquire() 935 self._stop_requested = True 936 self._deque_condition.notify() 937 self._deque_condition.release() 938 939 940class _PhysicalConnectionReader(threading.Thread): 941 """A thread that is responsible for reading data from physical connection. 942 """ 943 944 def __init__(self, mux_handler): 945 """Constructs an instance. 946 947 Args: 948 mux_handler: _MuxHandler instance. 949 """ 950 951 threading.Thread.__init__(self) 952 self._logger = util.get_class_logger(self) 953 self._mux_handler = mux_handler 954 self.setDaemon(True) 955 956 def run(self): 957 while True: 958 try: 959 physical_stream = self._mux_handler.physical_stream 960 message = physical_stream.receive_message() 961 if message is None: 962 break 963 opcode = physical_stream.get_last_received_opcode() 964 if opcode == common.OPCODE_TEXT: 965 raise MuxUnexpectedException( 966 'Received a text message on physical connection') 967 except ConnectionTerminatedException, e: 968 self._logger.debug('%s', e) 969 break 970 971 try: 972 self._mux_handler.dispatch_message(message) 973 except Exception, e: 974 self._logger.debug(traceback.format_exc()) 975 break 976 977 self._mux_handler.notify_reader_done() 978 979 980class _Worker(threading.Thread): 981 """A thread that is responsible for running the corresponding application 982 handler. 983 """ 984 985 def __init__(self, mux_handler, request): 986 """Constructs an instance. 987 988 Args: 989 mux_handler: _MuxHandler instance. 990 request: _LogicalRequest instance. 991 """ 992 993 threading.Thread.__init__(self) 994 self._logger = util.get_class_logger(self) 995 self._mux_handler = mux_handler 996 self._request = request 997 self.setDaemon(True) 998 999 def run(self): 1000 self._logger.debug('Logical channel worker started. (id=%d)' % 1001 self._request.channel_id) 1002 try: 1003 # Non-critical exceptions will be handled by dispatcher. 1004 self._mux_handler.dispatcher.transfer_data(self._request) 1005 finally: 1006 self._mux_handler.notify_worker_done(self._request.channel_id) 1007 1008 1009class _MuxHandshaker(hybi.Handshaker): 1010 """Opening handshake processor for multiplexing.""" 1011 1012 def __init__(self, request, dispatcher, send_quota, receive_quota): 1013 """Constructs an instance. 1014 Args: 1015 request: _LogicalRequest instance. 1016 dispatcher: Dispatcher instance (dispatch.Dispatcher). 1017 send_quota: Initial send quota. 1018 receive_quota: Initial receive quota. 1019 """ 1020 1021 hybi.Handshaker.__init__(self, request, dispatcher) 1022 self._send_quota = send_quota 1023 self._receive_quota = receive_quota 1024 1025 def _create_stream(self, stream_options): 1026 """Override hybi.Handshaker._create_stream.""" 1027 1028 self._logger.debug('Creating logical stream for %d' % 1029 self._request.channel_id) 1030 return _LogicalStream(self._request, self._send_quota, 1031 self._receive_quota) 1032 1033 def _send_handshake(self, accept): 1034 """Override hybi.Handshaker._send_handshake.""" 1035 1036 # Don't send handshake response for the default channel 1037 if self._request.channel_id == _DEFAULT_CHANNEL_ID: 1038 return 1039 1040 handshake_response = self._create_handshake_response(accept) 1041 frame_data = _create_add_channel_response( 1042 self._request.channel_id, 1043 handshake_response) 1044 self._logger.debug('Sending handshake response for %d: %r' % 1045 (self._request.channel_id, frame_data)) 1046 self._request.connection.write_control_data(frame_data) 1047 1048 1049class _LogicalChannelData(object): 1050 """A structure that holds information about logical channel. 1051 """ 1052 1053 def __init__(self, request, worker): 1054 self.request = request 1055 self.worker = worker 1056 self.mux_error_occurred = False 1057 self.mux_error_reason = '' 1058 1059 1060class _MuxHandler(object): 1061 """Multiplexing handler. When a handler starts, it launches three 1062 threads; the reader thread, the writer thread, and a worker thread. 1063 1064 The reader thread reads data from the physical stream, i.e., the 1065 ws_stream object of the underlying websocket connection. The reader 1066 thread interprets multiplexed frames and dispatches them to logical 1067 channels. Methods of this class are mostly called by the reader thread. 1068 1069 The writer thread sends multiplexed frames which are created by 1070 logical channels via the physical connection. 1071 1072 The worker thread launched at the starting point handles the 1073 "Implicitly Opened Connection". If multiplexing handler receives 1074 an AddChannelRequest and accepts it, the handler will launch a new worker 1075 thread and dispatch the request to it. 1076 """ 1077 1078 def __init__(self, request, dispatcher): 1079 """Constructs an instance. 1080 1081 Args: 1082 request: mod_python request of the physical connection. 1083 dispatcher: Dispatcher instance (dispatch.Dispatcher). 1084 """ 1085 1086 self.original_request = request 1087 self.dispatcher = dispatcher 1088 self.physical_connection = request.connection 1089 self.physical_stream = request.ws_stream 1090 self._logger = util.get_class_logger(self) 1091 self._logical_channels = {} 1092 self._logical_channels_condition = threading.Condition() 1093 # Holds client's initial quota 1094 self._channel_slots = collections.deque() 1095 self._worker_done_notify_received = False 1096 self._reader = None 1097 self._writer = None 1098 1099 def start(self): 1100 """Starts the handler. 1101 1102 Raises: 1103 MuxUnexpectedException: when the handler already started, or when 1104 opening handshake of the default channel fails. 1105 """ 1106 1107 if self._reader or self._writer: 1108 raise MuxUnexpectedException('MuxHandler already started') 1109 1110 self._reader = _PhysicalConnectionReader(self) 1111 self._writer = _PhysicalConnectionWriter(self) 1112 self._reader.start() 1113 self._writer.start() 1114 1115 # Create "Implicitly Opened Connection". 1116 logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID) 1117 headers_in = copy.copy(self.original_request.headers_in) 1118 # TODO(bashi): Support extensions 1119 headers_in['Sec-WebSocket-Extensions'] = '' 1120 logical_request = _LogicalRequest(_DEFAULT_CHANNEL_ID, 1121 self.original_request.method, 1122 self.original_request.uri, 1123 headers_in, 1124 logical_connection) 1125 # Client's send quota for the implicitly opened connection is zero, 1126 # but we will send FlowControl later so set the initial quota to 1127 # _INITIAL_QUOTA_FOR_CLIENT. 1128 self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT) 1129 if not self._do_handshake_for_logical_request( 1130 logical_request, send_quota=self.original_request.mux_quota): 1131 raise MuxUnexpectedException( 1132 'Failed handshake on the default channel id') 1133 self._add_logical_channel(logical_request) 1134 1135 # Send FlowControl for the implicitly opened connection. 1136 frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID, 1137 _INITIAL_QUOTA_FOR_CLIENT) 1138 logical_request.connection.write_control_data(frame_data) 1139 1140 def add_channel_slots(self, slots, send_quota): 1141 """Adds channel slots. 1142 1143 Args: 1144 slots: number of slots to be added. 1145 send_quota: initial send quota for slots. 1146 """ 1147 1148 self._channel_slots.extend([send_quota] * slots) 1149 # Send NewChannelSlot to client. 1150 frame_data = _create_new_channel_slot(slots, send_quota) 1151 self.send_control_data(frame_data) 1152 1153 def wait_until_done(self, timeout=None): 1154 """Waits until all workers are done. Returns False when timeout has 1155 occurred. Returns True on success. 1156 1157 Args: 1158 timeout: timeout in sec. 1159 """ 1160 1161 self._logical_channels_condition.acquire() 1162 try: 1163 while len(self._logical_channels) > 0: 1164 self._logger.debug('Waiting workers(%d)...' % 1165 len(self._logical_channels)) 1166 self._worker_done_notify_received = False 1167 self._logical_channels_condition.wait(timeout) 1168 if not self._worker_done_notify_received: 1169 self._logger.debug('Waiting worker(s) timed out') 1170 return False 1171 1172 finally: 1173 self._logical_channels_condition.release() 1174 1175 # Flush pending outgoing data 1176 self._writer.stop() 1177 self._writer.join() 1178 1179 return True 1180 1181 def notify_write_done(self, channel_id): 1182 """Called by the writer thread when a write operation has done. 1183 1184 Args: 1185 channel_id: objective channel id. 1186 """ 1187 1188 try: 1189 self._logical_channels_condition.acquire() 1190 if channel_id in self._logical_channels: 1191 channel_data = self._logical_channels[channel_id] 1192 channel_data.request.connection.notify_write_done() 1193 else: 1194 self._logger.debug('Seems that logical channel for %d has gone' 1195 % channel_id) 1196 finally: 1197 self._logical_channels_condition.release() 1198 1199 def send_control_data(self, data): 1200 """Sends data via the control channel. 1201 1202 Args: 1203 data: data to be sent. 1204 """ 1205 1206 self._writer.put_outgoing_data(_OutgoingData( 1207 channel_id=_CONTROL_CHANNEL_ID, data=data)) 1208 1209 def send_data(self, channel_id, data): 1210 """Sends data via given logical channel. This method is called by 1211 worker threads. 1212 1213 Args: 1214 channel_id: objective channel id. 1215 data: data to be sent. 1216 """ 1217 1218 self._writer.put_outgoing_data(_OutgoingData( 1219 channel_id=channel_id, data=data)) 1220 1221 def _send_drop_channel(self, channel_id, reason='', mux_error=False): 1222 frame_data = _create_drop_channel(channel_id, reason, mux_error) 1223 self._logger.debug( 1224 'Sending drop channel for channel id %d' % channel_id) 1225 self.send_control_data(frame_data) 1226 1227 def _send_error_add_channel_response(self, channel_id, status=None): 1228 if status is None: 1229 status = common.HTTP_STATUS_BAD_REQUEST 1230 1231 if status in _HTTP_BAD_RESPONSE_MESSAGES: 1232 message = _HTTP_BAD_RESPONSE_MESSAGES[status] 1233 else: 1234 self._logger.debug('Response message for %d is not found' % status) 1235 message = '???' 1236 1237 response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message) 1238 frame_data = _create_add_channel_response(channel_id, 1239 encoded_handshake=response, 1240 encoding=0, rejected=True) 1241 self.send_control_data(frame_data) 1242 1243 def _create_logical_request(self, block): 1244 if block.channel_id == _CONTROL_CHANNEL_ID: 1245 raise MuxUnexpectedException( 1246 'Received the control channel id (0) as objective channel ' 1247 'id for AddChannel') 1248 1249 if block.encoding != 0: 1250 raise MuxNotImplementedException( 1251 'delta-encoding not supported yet') 1252 connection = _LogicalConnection(self, block.channel_id) 1253 command, path, version, headers = _parse_request_text( 1254 block.encoded_handshake) 1255 request = _LogicalRequest(block.channel_id, command, path, 1256 headers, connection) 1257 1258 return request 1259 1260 def _do_handshake_for_logical_request(self, request, send_quota=0): 1261 try: 1262 receive_quota = self._channel_slots.popleft() 1263 except IndexError: 1264 raise MuxUnexpectedException('No room in channel pool') 1265 1266 handshaker = _MuxHandshaker(request, self.dispatcher, 1267 send_quota, receive_quota) 1268 try: 1269 handshaker.do_handshake() 1270 except handshake.VersionException, e: 1271 self._logger.info('%s', e) 1272 self._send_error_add_channel_response( 1273 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 1274 return False 1275 except handshake.HandshakeException, e: 1276 self._logger.info('%s', e) 1277 self._send_error_add_channel_response(request.channel_id, 1278 status=e.status) 1279 return False 1280 except handshake.AbortedByUserException, e: 1281 self._logger.info('%s', e) 1282 self._send_error_add_channel_response(request.channel_id) 1283 return False 1284 1285 return True 1286 1287 def _add_logical_channel(self, logical_request): 1288 try: 1289 self._logical_channels_condition.acquire() 1290 if logical_request.channel_id in self._logical_channels: 1291 raise MuxUnexpectedException('Channel id %d already exists' % 1292 logical_request.channel_id) 1293 worker = _Worker(self, logical_request) 1294 channel_data = _LogicalChannelData(logical_request, worker) 1295 self._logical_channels[logical_request.channel_id] = channel_data 1296 worker.start() 1297 finally: 1298 self._logical_channels_condition.release() 1299 1300 def _process_add_channel_request(self, block): 1301 try: 1302 logical_request = self._create_logical_request(block) 1303 except ValueError, e: 1304 self._logger.debug('Failed to create logical request: %r' % e) 1305 self._send_error_add_channel_response( 1306 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 1307 return 1308 if self._do_handshake_for_logical_request(logical_request): 1309 self._add_logical_channel(logical_request) 1310 else: 1311 self._send_error_add_channel_response( 1312 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 1313 1314 def _process_flow_control(self, block): 1315 try: 1316 self._logical_channels_condition.acquire() 1317 if not block.channel_id in self._logical_channels: 1318 return 1319 channel_data = self._logical_channels[block.channel_id] 1320 channel_data.request.ws_stream.replenish_send_quota( 1321 block.send_quota) 1322 finally: 1323 self._logical_channels_condition.release() 1324 1325 def _process_drop_channel(self, block): 1326 self._logger.debug('DropChannel received for %d: reason=%r' % 1327 (block.channel_id, block.reason)) 1328 try: 1329 self._logical_channels_condition.acquire() 1330 if not block.channel_id in self._logical_channels: 1331 return 1332 channel_data = self._logical_channels[block.channel_id] 1333 if not block.mux_error: 1334 channel_data.request.connection.set_read_state( 1335 _LogicalConnection.STATE_TERMINATED) 1336 else: 1337 # TODO(bashi): What should we do? 1338 channel_data.request.connection.set_read_state( 1339 _LogicalConnection.STATE_TERMINATED) 1340 finally: 1341 self._logical_channels_condition.release() 1342 1343 def _process_new_channel_slot(self, block): 1344 raise MuxUnexpectedException('Client should not send NewChannelSlot') 1345 1346 def _process_control_blocks(self, parser): 1347 for control_block in parser.read_control_blocks(): 1348 opcode = control_block.opcode 1349 self._logger.debug('control block received, opcode: %d' % opcode) 1350 if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: 1351 self._process_add_channel_request(control_block) 1352 elif opcode == _MUX_OPCODE_FLOW_CONTROL: 1353 self._process_flow_control(control_block) 1354 elif opcode == _MUX_OPCODE_DROP_CHANNEL: 1355 self._process_drop_channel(control_block) 1356 elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 1357 self._process_new_channel_slot(control_block) 1358 else: 1359 raise InvalidMuxControlBlockException( 1360 'Invalid opcode') 1361 1362 def _process_logical_frame(self, channel_id, parser): 1363 self._logger.debug('Received a frame. channel id=%d' % channel_id) 1364 try: 1365 self._logical_channels_condition.acquire() 1366 if not channel_id in self._logical_channels: 1367 raise MuxUnexpectedException( 1368 'Channel id %d not found' % channel_id) 1369 channel_data = self._logical_channels[channel_id] 1370 fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame() 1371 if not channel_data.request.ws_stream.consume_receive_quota( 1372 len(payload)): 1373 # The client violates quota. Close logical channel. 1374 channel_data.mux_error_occurred = True 1375 channel_data.mux_error_reason = 'Quota violation' 1376 channel_data.request.connection.set_read_state( 1377 _LogicalConnection.STATE_TERMINATED) 1378 return 1379 header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3, 1380 mask=False) 1381 frame_data = header + payload 1382 channel_data.request.connection.append_frame_data(frame_data) 1383 finally: 1384 self._logical_channels_condition.release() 1385 1386 def dispatch_message(self, message): 1387 """Dispatches message. The reader thread calls this method. 1388 1389 Args: 1390 message: a message that contains encapsulated frame. 1391 Raises: 1392 InvalidMuxFrameException: if the message is invalid. 1393 """ 1394 1395 parser = _MuxFramePayloadParser(message) 1396 channel_id = parser.read_channel_id() 1397 if channel_id == _CONTROL_CHANNEL_ID: 1398 self._process_control_blocks(parser) 1399 else: 1400 self._process_logical_frame(channel_id, parser) 1401 1402 def notify_worker_done(self, channel_id): 1403 """Called when a worker has finished. 1404 1405 Args: 1406 channel_id: channel id corresponded with the worker. 1407 """ 1408 1409 self._logger.debug('Worker for channel id %d terminated' % channel_id) 1410 try: 1411 self._logical_channels_condition.acquire() 1412 if not channel_id in self._logical_channels: 1413 raise MuxUnexpectedException( 1414 'Channel id %d not found' % channel_id) 1415 channel_data = self._logical_channels.pop(channel_id) 1416 finally: 1417 self._worker_done_notify_received = True 1418 self._logical_channels_condition.notify() 1419 self._logical_channels_condition.release() 1420 1421 if not channel_data.request.server_terminated: 1422 if channel_data.mux_error_occurred: 1423 self._send_drop_channel( 1424 channel_id, reason=channel_data.mux_error_reason, 1425 mux_error=True) 1426 else: 1427 self._send_drop_channel(channel_id) 1428 1429 def notify_reader_done(self): 1430 """This method is called by the reader thread when the reader has 1431 finished. 1432 """ 1433 1434 # Terminate all logical connections 1435 self._logger.debug('termiating all logical connections...') 1436 self._logical_channels_condition.acquire() 1437 for channel_data in self._logical_channels.values(): 1438 try: 1439 channel_data.request.connection.set_read_state( 1440 _LogicalConnection.STATE_TERMINATED) 1441 except Exception: 1442 pass 1443 self._logical_channels_condition.release() 1444 1445 1446def use_mux(request): 1447 return hasattr(request, 'mux') and request.mux 1448 1449 1450def start(request, dispatcher): 1451 mux_handler = _MuxHandler(request, dispatcher) 1452 mux_handler.start() 1453 1454 mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS, 1455 _INITIAL_QUOTA_FOR_CLIENT) 1456 1457 mux_handler.wait_until_done() 1458 1459 1460# vi:sts=4 sw=4 et 1461