1 // 2 // ======================================================================== 3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4 // ------------------------------------------------------------------------ 5 // All rights reserved. This program and the accompanying materials 6 // are made available under the terms of the Eclipse Public License v1.0 7 // and Apache License v2.0 which accompanies this distribution. 8 // 9 // The Eclipse Public License is available at 10 // http://www.eclipse.org/legal/epl-v10.html 11 // 12 // The Apache License v2.0 is available at 13 // http://www.opensource.org/licenses/apache2.0.php 14 // 15 // You may elect to redistribute this code under either of these licenses. 16 // ======================================================================== 17 // 18 19 package org.eclipse.jetty.websocket; 20 21 import java.io.IOException; 22 import java.io.UnsupportedEncodingException; 23 import java.security.MessageDigest; 24 import java.util.Collections; 25 import java.util.List; 26 27 import org.eclipse.jetty.io.AbstractConnection; 28 import org.eclipse.jetty.io.AsyncEndPoint; 29 import org.eclipse.jetty.io.Buffer; 30 import org.eclipse.jetty.io.ByteArrayBuffer; 31 import org.eclipse.jetty.io.Connection; 32 import org.eclipse.jetty.io.EndPoint; 33 import org.eclipse.jetty.util.B64Code; 34 import org.eclipse.jetty.util.StringUtil; 35 import org.eclipse.jetty.util.Utf8Appendable; 36 import org.eclipse.jetty.util.Utf8StringBuilder; 37 import org.eclipse.jetty.util.log.Log; 38 import org.eclipse.jetty.util.log.Logger; 39 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage; 40 import org.eclipse.jetty.websocket.WebSocket.OnControl; 41 import org.eclipse.jetty.websocket.WebSocket.OnFrame; 42 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage; 43 44 45 /* ------------------------------------------------------------ */ 46 /** 47 * <pre> 48 * 0 1 2 3 49 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 50 * +-+-+-+-+-------+-+-------------+-------------------------------+ 51 * |F|R|R|R| opcode|M| Payload len | Extended payload length | 52 * |I|S|S|S| (4) |A| (7) | (16/64) | 53 * |N|V|V|V| |S| | (if payload len==126/127) | 54 * | |1|2|3| |K| | | 55 * +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + 56 * | Extended payload length continued, if payload len == 127 | 57 * + - - - - - - - - - - - - - - - +-------------------------------+ 58 * | |Masking-key, if MASK set to 1 | 59 * +-------------------------------+-------------------------------+ 60 * | Masking-key (continued) | Payload Data | 61 * +-------------------------------- - - - - - - - - - - - - - - - + 62 * : Payload Data continued ... : 63 * + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + 64 * | Payload Data continued ... | 65 * +---------------------------------------------------------------+ 66 * </pre> 67 */ 68 public class WebSocketConnectionRFC6455 extends AbstractConnection implements WebSocketConnection 69 { 70 private static final Logger LOG = Log.getLogger(WebSocketConnectionRFC6455.class); 71 72 final static byte OP_CONTINUATION = 0x00; 73 final static byte OP_TEXT = 0x01; 74 final static byte OP_BINARY = 0x02; 75 final static byte OP_EXT_DATA = 0x03; 76 77 final static byte OP_CONTROL = 0x08; 78 final static byte OP_CLOSE = 0x08; 79 final static byte OP_PING = 0x09; 80 final static byte OP_PONG = 0x0A; 81 final static byte OP_EXT_CTRL = 0x0B; 82 83 final static int CLOSE_NORMAL=1000; 84 final static int CLOSE_SHUTDOWN=1001; 85 final static int CLOSE_PROTOCOL=1002; 86 final static int CLOSE_BAD_DATA=1003; 87 final static int CLOSE_UNDEFINED=1004; 88 final static int CLOSE_NO_CODE=1005; 89 final static int CLOSE_NO_CLOSE=1006; 90 final static int CLOSE_BAD_PAYLOAD=1007; 91 final static int CLOSE_POLICY_VIOLATION=1008; 92 final static int CLOSE_MESSAGE_TOO_LARGE=1009; 93 final static int CLOSE_REQUIRED_EXTENSION=1010; 94 final static int CLOSE_SERVER_ERROR=1011; 95 final static int CLOSE_FAILED_TLS_HANDSHAKE=1015; 96 97 final static int FLAG_FIN=0x8; 98 99 // Per RFC 6455, section 1.3 - Opening Handshake - this version is "13" 100 final static int VERSION=13; 101 isLastFrame(byte flags)102 static boolean isLastFrame(byte flags) 103 { 104 return (flags&FLAG_FIN)!=0; 105 } 106 isControlFrame(byte opcode)107 static boolean isControlFrame(byte opcode) 108 { 109 return (opcode&OP_CONTROL)!=0; 110 } 111 112 private final static byte[] MAGIC; 113 private final List<Extension> _extensions; 114 private final WebSocketParserRFC6455 _parser; 115 private final WebSocketGeneratorRFC6455 _generator; 116 private final WebSocketGenerator _outbound; 117 private final WebSocket _webSocket; 118 private final OnFrame _onFrame; 119 private final OnBinaryMessage _onBinaryMessage; 120 private final OnTextMessage _onTextMessage; 121 private final OnControl _onControl; 122 private final String _protocol; 123 private final int _draft; 124 private final ClassLoader _context; 125 private volatile int _closeCode; 126 private volatile String _closeMessage; 127 private volatile boolean _closedIn; 128 private volatile boolean _closedOut; 129 private int _maxTextMessageSize=-1; 130 private int _maxBinaryMessageSize=-1; 131 132 static 133 { 134 try 135 { 136 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1); 137 } 138 catch (UnsupportedEncodingException e) 139 { 140 throw new RuntimeException(e); 141 } 142 } 143 144 private final WebSocket.FrameConnection _connection = new WSFrameConnection(); 145 146 147 /* ------------------------------------------------------------ */ WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)148 public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft) 149 throws IOException 150 { 151 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null); 152 } 153 154 /* ------------------------------------------------------------ */ WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)155 public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen) 156 throws IOException 157 { 158 super(endpoint,timestamp); 159 160 _context=Thread.currentThread().getContextClassLoader(); 161 162 _draft=draft; 163 _endp.setMaxIdleTime(maxIdleTime); 164 165 _webSocket = websocket; 166 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null; 167 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null; 168 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null; 169 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null; 170 _generator = new WebSocketGeneratorRFC6455(buffers, _endp,maskgen); 171 172 _extensions=extensions; 173 WebSocketParser.FrameHandler frameHandler = new WSFrameHandler(); 174 if (_extensions!=null) 175 { 176 int e=0; 177 for (Extension extension : _extensions) 178 { 179 extension.bind( 180 _connection, 181 e==extensions.size()-1? frameHandler :extensions.get(e+1), 182 e==0?_generator:extensions.get(e-1)); 183 e++; 184 } 185 } 186 187 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1); 188 WebSocketParser.FrameHandler inbound = (_extensions == null || _extensions.size() == 0) ? frameHandler : extensions.get(0); 189 190 _parser = new WebSocketParserRFC6455(buffers, endpoint, inbound,maskgen==null); 191 192 _protocol=protocol; 193 194 } 195 196 /* ------------------------------------------------------------ */ getConnection()197 public WebSocket.Connection getConnection() 198 { 199 return _connection; 200 } 201 202 /* ------------------------------------------------------------ */ getExtensions()203 public List<Extension> getExtensions() 204 { 205 if (_extensions==null) 206 return Collections.emptyList(); 207 208 return _extensions; 209 } 210 211 /* ------------------------------------------------------------ */ handle()212 public Connection handle() throws IOException 213 { 214 Thread current = Thread.currentThread(); 215 ClassLoader oldcontext = current.getContextClassLoader(); 216 current.setContextClassLoader(_context); 217 try 218 { 219 // handle the framing protocol 220 boolean progress=true; 221 222 while (progress) 223 { 224 int flushed=_generator.flushBuffer(); 225 int filled=_parser.parseNext(); 226 227 progress = flushed>0 || filled>0; 228 _endp.flush(); 229 230 if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed()) 231 progress=true; 232 } 233 } 234 catch(IOException e) 235 { 236 try 237 { 238 if (_endp.isOpen()) 239 _endp.close(); 240 } 241 catch(IOException e2) 242 { 243 LOG.ignore(e2); 244 } 245 throw e; 246 } 247 finally 248 { 249 current.setContextClassLoader(oldcontext); 250 _parser.returnBuffer(); 251 _generator.returnBuffer(); 252 if (_endp.isOpen()) 253 { 254 if (_closedIn && _closedOut && _outbound.isBufferEmpty()) 255 _endp.close(); 256 else if (_endp.isInputShutdown() && !_closedIn) 257 closeIn(CLOSE_NO_CLOSE,null); 258 else 259 checkWriteable(); 260 } 261 } 262 return this; 263 } 264 265 /* ------------------------------------------------------------ */ onInputShutdown()266 public void onInputShutdown() throws IOException 267 { 268 if (!_closedIn) 269 _endp.close(); 270 } 271 272 /* ------------------------------------------------------------ */ isIdle()273 public boolean isIdle() 274 { 275 return _parser.isBufferEmpty() && _outbound.isBufferEmpty(); 276 } 277 278 /* ------------------------------------------------------------ */ 279 @Override onIdleExpired(long idleForMs)280 public void onIdleExpired(long idleForMs) 281 { 282 closeOut(WebSocketConnectionRFC6455.CLOSE_NORMAL,"Idle for "+idleForMs+"ms > "+_endp.getMaxIdleTime()+"ms"); 283 } 284 285 /* ------------------------------------------------------------ */ isSuspended()286 public boolean isSuspended() 287 { 288 return false; 289 } 290 291 /* ------------------------------------------------------------ */ onClose()292 public void onClose() 293 { 294 final boolean closed; 295 synchronized (this) 296 { 297 closed=_closeCode==0; 298 if (closed) 299 _closeCode=WebSocketConnectionRFC6455.CLOSE_NO_CLOSE; 300 } 301 if (closed) 302 _webSocket.onClose(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"closed"); 303 } 304 305 /* ------------------------------------------------------------ */ closeIn(int code,String message)306 public void closeIn(int code,String message) 307 { 308 LOG.debug("ClosedIn {} {} {}",this,code,message); 309 310 final boolean closed_out; 311 final boolean tell_app; 312 synchronized (this) 313 { 314 closed_out=_closedOut; 315 _closedIn=true; 316 tell_app=_closeCode==0; 317 if (tell_app) 318 { 319 _closeCode=code; 320 _closeMessage=message; 321 } 322 } 323 324 try 325 { 326 if (!closed_out) 327 closeOut(code,message); 328 } 329 finally 330 { 331 if (tell_app) 332 _webSocket.onClose(code,message); 333 } 334 } 335 336 /* ------------------------------------------------------------ */ closeOut(int code,String message)337 public void closeOut(int code,String message) 338 { 339 LOG.debug("ClosedOut {} {} {}",this,code,message); 340 341 final boolean closed_out; 342 final boolean tell_app; 343 synchronized (this) 344 { 345 closed_out=_closedOut; 346 _closedOut=true; 347 tell_app=_closeCode==0; 348 if (tell_app) 349 { 350 _closeCode=code; 351 _closeMessage=message; 352 } 353 } 354 355 try 356 { 357 if (tell_app) 358 _webSocket.onClose(code,message); 359 } 360 finally 361 { 362 try 363 { 364 if (!closed_out) 365 { 366 // Close code 1005/1006/1015 are never to be sent as a status over 367 // a Close control frame. Code<-1 also means no node. 368 369 if (code < 0 || (code == WebSocketConnectionRFC6455.CLOSE_NO_CODE) || (code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE) 370 || (code == WebSocketConnectionRFC6455.CLOSE_FAILED_TLS_HANDSHAKE)) 371 { 372 code = -1; 373 } 374 else if (code == 0) 375 { 376 code = WebSocketConnectionRFC6455.CLOSE_NORMAL; 377 } 378 379 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1); 380 bytes[0]=(byte)(code/0x100); 381 bytes[1]=(byte)(code%0x100); 382 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_CLOSE,bytes,0,code>0?bytes.length:0); 383 _outbound.flush(); 384 } 385 } 386 catch(IOException e) 387 { 388 LOG.ignore(e); 389 } 390 } 391 } 392 shutdown()393 public void shutdown() 394 { 395 final WebSocket.Connection connection = _connection; 396 if (connection != null) 397 connection.close(CLOSE_SHUTDOWN, null); 398 } 399 400 /* ------------------------------------------------------------ */ fillBuffersFrom(Buffer buffer)401 public void fillBuffersFrom(Buffer buffer) 402 { 403 _parser.fill(buffer); 404 } 405 406 /* ------------------------------------------------------------ */ checkWriteable()407 private void checkWriteable() 408 { 409 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint) 410 { 411 ((AsyncEndPoint)_endp).scheduleWrite(); 412 } 413 } 414 onFrameHandshake()415 protected void onFrameHandshake() 416 { 417 if (_onFrame != null) 418 { 419 _onFrame.onHandshake(_connection); 420 } 421 } 422 onWebSocketOpen()423 protected void onWebSocketOpen() 424 { 425 _webSocket.onOpen(_connection); 426 } 427 428 /* ------------------------------------------------------------ */ 429 private class WSFrameConnection implements WebSocket.FrameConnection 430 { 431 private volatile boolean _disconnecting; 432 433 /* ------------------------------------------------------------ */ sendMessage(String content)434 public void sendMessage(String content) throws IOException 435 { 436 if (_closedOut) 437 throw new IOException("closedOut "+_closeCode+":"+_closeMessage); 438 byte[] data = content.getBytes(StringUtil.__UTF8); 439 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_TEXT,data,0,data.length); 440 checkWriteable(); 441 } 442 443 /* ------------------------------------------------------------ */ sendMessage(byte[] content, int offset, int length)444 public void sendMessage(byte[] content, int offset, int length) throws IOException 445 { 446 if (_closedOut) 447 throw new IOException("closedOut "+_closeCode+":"+_closeMessage); 448 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_BINARY,content,offset,length); 449 checkWriteable(); 450 } 451 452 /* ------------------------------------------------------------ */ sendFrame(byte flags,byte opcode, byte[] content, int offset, int length)453 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException 454 { 455 if (_closedOut) 456 throw new IOException("closedOut "+_closeCode+":"+_closeMessage); 457 _outbound.addFrame(flags,opcode,content,offset,length); 458 checkWriteable(); 459 } 460 461 /* ------------------------------------------------------------ */ sendControl(byte ctrl, byte[] data, int offset, int length)462 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException 463 { 464 // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented 465 if (_closedOut) 466 throw new IOException("closedOut "+_closeCode+":"+_closeMessage); 467 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length); 468 checkWriteable(); 469 } 470 471 /* ------------------------------------------------------------ */ isMessageComplete(byte flags)472 public boolean isMessageComplete(byte flags) 473 { 474 return isLastFrame(flags); 475 } 476 477 /* ------------------------------------------------------------ */ isOpen()478 public boolean isOpen() 479 { 480 return _endp!=null&&_endp.isOpen(); 481 } 482 483 /* ------------------------------------------------------------ */ close(int code, String message)484 public void close(int code, String message) 485 { 486 if (_disconnecting) 487 return; 488 _disconnecting=true; 489 WebSocketConnectionRFC6455.this.closeOut(code,message); 490 } 491 492 /* ------------------------------------------------------------ */ setMaxIdleTime(int ms)493 public void setMaxIdleTime(int ms) 494 { 495 try 496 { 497 _endp.setMaxIdleTime(ms); 498 } 499 catch(IOException e) 500 { 501 LOG.warn(e); 502 } 503 } 504 505 /* ------------------------------------------------------------ */ setMaxTextMessageSize(int size)506 public void setMaxTextMessageSize(int size) 507 { 508 _maxTextMessageSize=size; 509 } 510 511 /* ------------------------------------------------------------ */ setMaxBinaryMessageSize(int size)512 public void setMaxBinaryMessageSize(int size) 513 { 514 _maxBinaryMessageSize=size; 515 } 516 517 /* ------------------------------------------------------------ */ getMaxIdleTime()518 public int getMaxIdleTime() 519 { 520 return _endp.getMaxIdleTime(); 521 } 522 523 /* ------------------------------------------------------------ */ getMaxTextMessageSize()524 public int getMaxTextMessageSize() 525 { 526 return _maxTextMessageSize; 527 } 528 529 /* ------------------------------------------------------------ */ getMaxBinaryMessageSize()530 public int getMaxBinaryMessageSize() 531 { 532 return _maxBinaryMessageSize; 533 } 534 535 /* ------------------------------------------------------------ */ getProtocol()536 public String getProtocol() 537 { 538 return _protocol; 539 } 540 541 /* ------------------------------------------------------------ */ binaryOpcode()542 public byte binaryOpcode() 543 { 544 return OP_BINARY; 545 } 546 547 /* ------------------------------------------------------------ */ textOpcode()548 public byte textOpcode() 549 { 550 return OP_TEXT; 551 } 552 553 /* ------------------------------------------------------------ */ continuationOpcode()554 public byte continuationOpcode() 555 { 556 return OP_CONTINUATION; 557 } 558 559 /* ------------------------------------------------------------ */ finMask()560 public byte finMask() 561 { 562 return FLAG_FIN; 563 } 564 565 /* ------------------------------------------------------------ */ isControl(byte opcode)566 public boolean isControl(byte opcode) 567 { 568 return isControlFrame(opcode); 569 } 570 571 /* ------------------------------------------------------------ */ isText(byte opcode)572 public boolean isText(byte opcode) 573 { 574 return opcode==OP_TEXT; 575 } 576 577 /* ------------------------------------------------------------ */ isBinary(byte opcode)578 public boolean isBinary(byte opcode) 579 { 580 return opcode==OP_BINARY; 581 } 582 583 /* ------------------------------------------------------------ */ isContinuation(byte opcode)584 public boolean isContinuation(byte opcode) 585 { 586 return opcode==OP_CONTINUATION; 587 } 588 589 /* ------------------------------------------------------------ */ isClose(byte opcode)590 public boolean isClose(byte opcode) 591 { 592 return opcode==OP_CLOSE; 593 } 594 595 /* ------------------------------------------------------------ */ isPing(byte opcode)596 public boolean isPing(byte opcode) 597 { 598 return opcode==OP_PING; 599 } 600 601 /* ------------------------------------------------------------ */ isPong(byte opcode)602 public boolean isPong(byte opcode) 603 { 604 return opcode==OP_PONG; 605 } 606 607 /* ------------------------------------------------------------ */ disconnect()608 public void disconnect() 609 { 610 close(CLOSE_NORMAL,null); 611 } 612 613 /* ------------------------------------------------------------ */ close()614 public void close() 615 { 616 close(CLOSE_NORMAL,null); 617 } 618 619 /* ------------------------------------------------------------ */ setAllowFrameFragmentation(boolean allowFragmentation)620 public void setAllowFrameFragmentation(boolean allowFragmentation) 621 { 622 _parser.setFakeFragments(allowFragmentation); 623 } 624 625 /* ------------------------------------------------------------ */ isAllowFrameFragmentation()626 public boolean isAllowFrameFragmentation() 627 { 628 return _parser.isFakeFragments(); 629 } 630 631 /* ------------------------------------------------------------ */ 632 @Override toString()633 public String toString() 634 { 635 return String.format("%s@%x l(%s:%d)<->r(%s:%d)", 636 getClass().getSimpleName(), 637 hashCode(), 638 _endp.getLocalAddr(), 639 _endp.getLocalPort(), 640 _endp.getRemoteAddr(), 641 _endp.getRemotePort()); 642 } 643 } 644 645 /* ------------------------------------------------------------ */ 646 /* ------------------------------------------------------------ */ 647 /* ------------------------------------------------------------ */ 648 private class WSFrameHandler implements WebSocketParser.FrameHandler 649 { 650 private static final int MAX_CONTROL_FRAME_PAYLOAD = 125; 651 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(512); // TODO configure initial capacity 652 private ByteArrayBuffer _aggregate; 653 private byte _opcode=-1; 654 onFrame(final byte flags, final byte opcode, final Buffer buffer)655 public void onFrame(final byte flags, final byte opcode, final Buffer buffer) 656 { 657 boolean lastFrame = isLastFrame(flags); 658 659 synchronized(WebSocketConnectionRFC6455.this) 660 { 661 // Ignore incoming after a close 662 if (_closedIn) 663 return; 664 } 665 try 666 { 667 byte[] array=buffer.array(); 668 669 if (isControlFrame(opcode) && buffer.length()>MAX_CONTROL_FRAME_PAYLOAD) 670 { 671 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Control frame too large: " + buffer.length() + " > " + MAX_CONTROL_FRAME_PAYLOAD); 672 return; 673 } 674 675 // TODO: check extensions for RSV bit(s) meanings 676 if ((flags&0x7)!=0) 677 { 678 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"RSV bits set 0x"+Integer.toHexString(flags)); 679 return; 680 } 681 682 // Ignore all frames after error close 683 if (_closeCode!=0 && _closeCode!=CLOSE_NORMAL && opcode!=OP_CLOSE) 684 { 685 return; 686 } 687 688 // Deliver frame if websocket is a FrameWebSocket 689 if (_onFrame!=null) 690 { 691 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length())) 692 return; 693 } 694 695 if (_onControl!=null && isControlFrame(opcode)) 696 { 697 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) 698 return; 699 } 700 701 switch(opcode) 702 { 703 case WebSocketConnectionRFC6455.OP_CONTINUATION: 704 { 705 if (_opcode==-1) 706 { 707 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad Continuation"); 708 return; 709 } 710 711 // If text, append to the message buffer 712 if (_onTextMessage!=null && _opcode==WebSocketConnectionRFC6455.OP_TEXT) 713 { 714 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) 715 { 716 // If this is the last fragment, deliver the text buffer 717 if (lastFrame) 718 { 719 _opcode=-1; 720 String msg =_utf8.toString(); 721 _utf8.reset(); 722 _onTextMessage.onMessage(msg); 723 } 724 } 725 else 726 textMessageTooLarge(); 727 } 728 729 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) 730 { 731 if (_aggregate!=null && checkBinaryMessageSize(_aggregate.length(),buffer.length())) 732 { 733 _aggregate.put(buffer); 734 735 // If this is the last fragment, deliver 736 if (lastFrame && _onBinaryMessage!=null) 737 { 738 try 739 { 740 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length()); 741 } 742 finally 743 { 744 _opcode=-1; 745 _aggregate.clear(); 746 } 747 } 748 } 749 } 750 break; 751 } 752 case WebSocketConnectionRFC6455.OP_PING: 753 { 754 LOG.debug("PING {}",this); 755 if (!_closedOut) 756 { 757 _connection.sendControl(WebSocketConnectionRFC6455.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); 758 } 759 break; 760 } 761 762 case WebSocketConnectionRFC6455.OP_PONG: 763 { 764 LOG.debug("PONG {}",this); 765 break; 766 } 767 768 case WebSocketConnectionRFC6455.OP_CLOSE: 769 { 770 int code=WebSocketConnectionRFC6455.CLOSE_NO_CODE; 771 String message=null; 772 if (buffer.length()>=2) 773 { 774 code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]); 775 776 // Validate close status codes. 777 if (code < WebSocketConnectionRFC6455.CLOSE_NORMAL || 778 code == WebSocketConnectionRFC6455.CLOSE_UNDEFINED || 779 code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE || 780 code == WebSocketConnectionRFC6455.CLOSE_NO_CODE || 781 ( code > 1011 && code <= 2999 ) || 782 code >= 5000 ) 783 { 784 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid close code " + code); 785 return; 786 } 787 788 if (buffer.length()>2) 789 { 790 if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize())) 791 { 792 message = _utf8.toString(); 793 _utf8.reset(); 794 } 795 } 796 } 797 else if(buffer.length() == 1) 798 { 799 // Invalid length. use status code 1002 (Protocol error) 800 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid payload length of 1"); 801 return; 802 } 803 closeIn(code,message); 804 break; 805 } 806 807 case WebSocketConnectionRFC6455.OP_TEXT: 808 { 809 if (_opcode!=-1) 810 { 811 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode)); 812 return; 813 } 814 815 if(_onTextMessage!=null) 816 { 817 if (_connection.getMaxTextMessageSize()<=0) 818 { 819 // No size limit, so handle only final frames 820 if (lastFrame) 821 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); 822 else 823 { 824 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp); 825 errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled"); 826 } 827 } 828 // append bytes to message buffer (if they fit) 829 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) 830 { 831 if (lastFrame) 832 { 833 String msg =_utf8.toString(); 834 _utf8.reset(); 835 _onTextMessage.onMessage(msg); 836 } 837 else 838 { 839 _opcode=WebSocketConnectionRFC6455.OP_TEXT; 840 } 841 } 842 else 843 textMessageTooLarge(); 844 } 845 break; 846 } 847 848 case WebSocketConnectionRFC6455.OP_BINARY: 849 { 850 if (_opcode!=-1) 851 { 852 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode)); 853 return; 854 } 855 856 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length())) 857 { 858 if (lastFrame) 859 { 860 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); 861 } 862 else if (_connection.getMaxBinaryMessageSize()>=0) 863 { 864 _opcode=opcode; 865 // TODO use a growing buffer rather than a fixed one. 866 if (_aggregate==null) 867 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); 868 _aggregate.put(buffer); 869 } 870 else 871 { 872 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp); 873 errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Binary frame aggregation disabled"); 874 } 875 } 876 break; 877 } 878 879 default: 880 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode)); 881 break; 882 } 883 } 884 catch(Utf8Appendable.NotUtf8Exception notUtf8) 885 { 886 LOG.warn("NOTUTF8 - {} for {}",notUtf8,_endp, notUtf8); 887 LOG.debug(notUtf8); 888 errorClose(WebSocketConnectionRFC6455.CLOSE_BAD_PAYLOAD,"Invalid UTF-8"); 889 } 890 catch(Throwable e) 891 { 892 LOG.warn("{} for {}",e,_endp, e); 893 LOG.debug(e); 894 errorClose(WebSocketConnectionRFC6455.CLOSE_SERVER_ERROR,"Internal Server Error: "+e); 895 } 896 } 897 errorClose(int code, String message)898 private void errorClose(int code, String message) 899 { 900 _connection.close(code,message); 901 902 // Brutally drop the connection 903 try 904 { 905 _endp.close(); 906 } 907 catch (IOException e) 908 { 909 LOG.warn(e.toString()); 910 LOG.debug(e); 911 } 912 } 913 checkBinaryMessageSize(int bufferLen, int length)914 private boolean checkBinaryMessageSize(int bufferLen, int length) 915 { 916 int max = _connection.getMaxBinaryMessageSize(); 917 if (max>0 && (bufferLen+length)>max) 918 { 919 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp); 920 _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); 921 _opcode=-1; 922 if (_aggregate!=null) 923 _aggregate.clear(); 924 return false; 925 } 926 return true; 927 } 928 textMessageTooLarge()929 private void textMessageTooLarge() 930 { 931 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp); 932 _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); 933 934 _opcode=-1; 935 _utf8.reset(); 936 } 937 close(int code,String message)938 public void close(int code,String message) 939 { 940 if (code!=CLOSE_NORMAL) 941 LOG.warn("Close: "+code+" "+message); 942 _connection.close(code,message); 943 } 944 945 @Override toString()946 public String toString() 947 { 948 return WebSocketConnectionRFC6455.this.toString()+"FH"; 949 } 950 } 951 952 /* ------------------------------------------------------------ */ hashKey(String key)953 public static String hashKey(String key) 954 { 955 try 956 { 957 MessageDigest md = MessageDigest.getInstance("SHA1"); 958 md.update(key.getBytes("UTF-8")); 959 md.update(MAGIC); 960 return new String(B64Code.encode(md.digest())); 961 } 962 catch (Exception e) 963 { 964 throw new RuntimeException(e); 965 } 966 } 967 968 /* ------------------------------------------------------------ */ 969 @Override toString()970 public String toString() 971 { 972 return String.format("%s p=%s g=%s", getClass().getSimpleName(), _parser, _generator); 973 } 974 } 975