1 /* 2 * Copyright (C) 2014 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.squareup.okhttp.internal.ws; 17 18 import com.squareup.okhttp.MediaType; 19 import com.squareup.okhttp.RequestBody; 20 import com.squareup.okhttp.ResponseBody; 21 import com.squareup.okhttp.internal.NamedRunnable; 22 import com.squareup.okhttp.ws.WebSocket; 23 import com.squareup.okhttp.ws.WebSocketListener; 24 import java.io.IOException; 25 import java.net.ProtocolException; 26 import java.util.Random; 27 import java.util.concurrent.Executor; 28 import java.util.concurrent.atomic.AtomicBoolean; 29 import okio.Buffer; 30 import okio.BufferedSink; 31 import okio.BufferedSource; 32 import okio.Okio; 33 34 import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY; 35 import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT; 36 import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback; 37 38 public abstract class RealWebSocket implements WebSocket { 39 private static final int CLOSE_PROTOCOL_EXCEPTION = 1002; 40 41 private final WebSocketWriter writer; 42 private final WebSocketReader reader; 43 private final WebSocketListener listener; 44 45 /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */ 46 private volatile boolean writerSentClose; 47 /** True after {@link IOException}. {@link #close(int, String)} becomes only valid call. */ 48 private boolean writerWantsClose; 49 /** True after a close frame was read by the reader. No frames will follow it. */ 50 private boolean readerSentClose; 51 52 /** True after calling {@link #close()} to free connection resources. */ 53 private final AtomicBoolean connectionClosed = new AtomicBoolean(); 54 RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random, final Executor replyExecutor, final WebSocketListener listener, final String url)55 public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random, 56 final Executor replyExecutor, final WebSocketListener listener, final String url) { 57 this.listener = listener; 58 59 writer = new WebSocketWriter(isClient, sink, random); 60 reader = new WebSocketReader(isClient, source, new FrameCallback() { 61 @Override public void onMessage(ResponseBody message) throws IOException { 62 listener.onMessage(message); 63 } 64 65 @Override public void onPing(final Buffer buffer) { 66 replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Pong Reply", url) { 67 @Override protected void execute() { 68 try { 69 writer.writePong(buffer); 70 } catch (IOException ignored) { 71 } 72 } 73 }); 74 } 75 76 @Override public void onPong(Buffer buffer) { 77 listener.onPong(buffer); 78 } 79 80 @Override public void onClose(final int code, final String reason) { 81 readerSentClose = true; 82 replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) { 83 @Override protected void execute() { 84 peerClose(code, reason); 85 } 86 }); 87 } 88 }); 89 } 90 91 /** 92 * Read a single message from the web socket and deliver it to the listener. This method should 93 * be called in a loop with the return value indicating whether looping should continue. 94 */ readMessage()95 public boolean readMessage() { 96 try { 97 reader.processNextFrame(); 98 return !readerSentClose; 99 } catch (IOException e) { 100 readerErrorClose(e); 101 return false; 102 } 103 } 104 sendMessage(RequestBody message)105 @Override public void sendMessage(RequestBody message) throws IOException { 106 if (message == null) throw new NullPointerException("message == null"); 107 if (writerSentClose) throw new IllegalStateException("closed"); 108 if (writerWantsClose) throw new IllegalStateException("must call close()"); 109 110 MediaType contentType = message.contentType(); 111 if (contentType == null) { 112 throw new IllegalArgumentException( 113 "Message content type was null. Must use WebSocket.TEXT or WebSocket.BINARY."); 114 } 115 String contentSubtype = contentType.subtype(); 116 117 int formatOpcode; 118 if (WebSocket.TEXT.subtype().equals(contentSubtype)) { 119 formatOpcode = OPCODE_TEXT; 120 } else if (WebSocket.BINARY.subtype().equals(contentSubtype)) { 121 formatOpcode = OPCODE_BINARY; 122 } else { 123 throw new IllegalArgumentException("Unknown message content type: " 124 + contentType.type() + "/" + contentType.subtype() // Omit any implicitly added charset. 125 + ". Must use WebSocket.TEXT or WebSocket.BINARY."); 126 } 127 128 BufferedSink sink = Okio.buffer(writer.newMessageSink(formatOpcode)); 129 try { 130 message.writeTo(sink); 131 sink.close(); 132 } catch (IOException e) { 133 writerWantsClose = true; 134 throw e; 135 } 136 } 137 sendPing(Buffer payload)138 @Override public void sendPing(Buffer payload) throws IOException { 139 if (writerSentClose) throw new IllegalStateException("closed"); 140 if (writerWantsClose) throw new IllegalStateException("must call close()"); 141 142 try { 143 writer.writePing(payload); 144 } catch (IOException e) { 145 writerWantsClose = true; 146 throw e; 147 } 148 } 149 150 /** Send an unsolicited pong with the specified payload. */ sendPong(Buffer payload)151 public void sendPong(Buffer payload) throws IOException { 152 if (writerSentClose) throw new IllegalStateException("closed"); 153 if (writerWantsClose) throw new IllegalStateException("must call close()"); 154 155 try { 156 writer.writePong(payload); 157 } catch (IOException e) { 158 writerWantsClose = true; 159 throw e; 160 } 161 } 162 close(int code, String reason)163 @Override public void close(int code, String reason) throws IOException { 164 if (writerSentClose) throw new IllegalStateException("closed"); 165 writerSentClose = true; 166 167 try { 168 writer.writeClose(code, reason); 169 } catch (IOException e) { 170 if (connectionClosed.compareAndSet(false, true)) { 171 // Try to close the connection without masking the original exception. 172 try { 173 close(); 174 } catch (IOException ignored) { 175 } 176 } 177 throw e; 178 } 179 } 180 181 /** Replies and closes this web socket when a close frame is read from the peer. */ peerClose(int code, String reason)182 private void peerClose(int code, String reason) { 183 if (!writerSentClose) { 184 try { 185 writer.writeClose(code, reason); 186 } catch (IOException ignored) { 187 } 188 } 189 190 if (connectionClosed.compareAndSet(false, true)) { 191 try { 192 close(); 193 } catch (IOException ignored) { 194 } 195 } 196 197 listener.onClose(code, reason); 198 } 199 200 /** Called on the reader thread when an error occurs. */ readerErrorClose(IOException e)201 private void readerErrorClose(IOException e) { 202 // For protocol exceptions, try to inform the server of such. 203 if (!writerSentClose && e instanceof ProtocolException) { 204 try { 205 writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null); 206 } catch (IOException ignored) { 207 } 208 } 209 210 if (connectionClosed.compareAndSet(false, true)) { 211 try { 212 close(); 213 } catch (IOException ignored) { 214 } 215 } 216 217 listener.onFailure(e, null); 218 } 219 220 /** Perform any tear-down work (close the connection, shutdown executors). */ close()221 protected abstract void close() throws IOException; 222 } 223