• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2014 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.squareup.okhttp.internal.ws;
17 
18 import com.squareup.okhttp.MediaType;
19 import com.squareup.okhttp.RequestBody;
20 import com.squareup.okhttp.ResponseBody;
21 import com.squareup.okhttp.internal.NamedRunnable;
22 import com.squareup.okhttp.ws.WebSocket;
23 import com.squareup.okhttp.ws.WebSocketListener;
24 import java.io.IOException;
25 import java.net.ProtocolException;
26 import java.util.Random;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import okio.Buffer;
30 import okio.BufferedSink;
31 import okio.BufferedSource;
32 import okio.Okio;
33 
34 import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY;
35 import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT;
36 import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback;
37 
38 public abstract class RealWebSocket implements WebSocket {
39   private static final int CLOSE_PROTOCOL_EXCEPTION = 1002;
40 
41   private final WebSocketWriter writer;
42   private final WebSocketReader reader;
43   private final WebSocketListener listener;
44 
45   /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */
46   private volatile boolean writerSentClose;
47   /** True after {@link IOException}. {@link #close(int, String)} becomes only valid call. */
48   private boolean writerWantsClose;
49   /** True after a close frame was read by the reader. No frames will follow it. */
50   private boolean readerSentClose;
51 
52   /** True after calling {@link #close()} to free connection resources. */
53   private final AtomicBoolean connectionClosed = new AtomicBoolean();
54 
RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random, final Executor replyExecutor, final WebSocketListener listener, final String url)55   public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random,
56       final Executor replyExecutor, final WebSocketListener listener, final String url) {
57     this.listener = listener;
58 
59     writer = new WebSocketWriter(isClient, sink, random);
60     reader = new WebSocketReader(isClient, source, new FrameCallback() {
61       @Override public void onMessage(ResponseBody message) throws IOException {
62         listener.onMessage(message);
63       }
64 
65       @Override public void onPing(final Buffer buffer) {
66         replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Pong Reply", url) {
67           @Override protected void execute() {
68             try {
69               writer.writePong(buffer);
70             } catch (IOException ignored) {
71             }
72           }
73         });
74       }
75 
76       @Override public void onPong(Buffer buffer) {
77         listener.onPong(buffer);
78       }
79 
80       @Override public void onClose(final int code, final String reason) {
81         readerSentClose = true;
82         replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) {
83           @Override protected void execute() {
84             peerClose(code, reason);
85           }
86         });
87       }
88     });
89   }
90 
91   /**
92    * Read a single message from the web socket and deliver it to the listener. This method should
93    * be called in a loop with the return value indicating whether looping should continue.
94    */
readMessage()95   public boolean readMessage() {
96     try {
97       reader.processNextFrame();
98       return !readerSentClose;
99     } catch (IOException e) {
100       readerErrorClose(e);
101       return false;
102     }
103   }
104 
sendMessage(RequestBody message)105   @Override public void sendMessage(RequestBody message) throws IOException {
106     if (message == null) throw new NullPointerException("message == null");
107     if (writerSentClose) throw new IllegalStateException("closed");
108     if (writerWantsClose) throw new IllegalStateException("must call close()");
109 
110     MediaType contentType = message.contentType();
111     if (contentType == null) {
112       throw new IllegalArgumentException(
113           "Message content type was null. Must use WebSocket.TEXT or WebSocket.BINARY.");
114     }
115     String contentSubtype = contentType.subtype();
116 
117     int formatOpcode;
118     if (WebSocket.TEXT.subtype().equals(contentSubtype)) {
119       formatOpcode = OPCODE_TEXT;
120     } else if (WebSocket.BINARY.subtype().equals(contentSubtype)) {
121       formatOpcode = OPCODE_BINARY;
122     } else {
123       throw new IllegalArgumentException("Unknown message content type: "
124           + contentType.type() + "/" + contentType.subtype() // Omit any implicitly added charset.
125           + ". Must use WebSocket.TEXT or WebSocket.BINARY.");
126     }
127 
128     BufferedSink sink = Okio.buffer(writer.newMessageSink(formatOpcode));
129     try {
130       message.writeTo(sink);
131       sink.close();
132     } catch (IOException e) {
133       writerWantsClose = true;
134       throw e;
135     }
136   }
137 
sendPing(Buffer payload)138   @Override public void sendPing(Buffer payload) throws IOException {
139     if (writerSentClose) throw new IllegalStateException("closed");
140     if (writerWantsClose) throw new IllegalStateException("must call close()");
141 
142     try {
143       writer.writePing(payload);
144     } catch (IOException e) {
145       writerWantsClose = true;
146       throw e;
147     }
148   }
149 
150   /** Send an unsolicited pong with the specified payload. */
sendPong(Buffer payload)151   public void sendPong(Buffer payload) throws IOException {
152     if (writerSentClose) throw new IllegalStateException("closed");
153     if (writerWantsClose) throw new IllegalStateException("must call close()");
154 
155     try {
156       writer.writePong(payload);
157     } catch (IOException e) {
158       writerWantsClose = true;
159       throw e;
160     }
161   }
162 
close(int code, String reason)163   @Override public void close(int code, String reason) throws IOException {
164     if (writerSentClose) throw new IllegalStateException("closed");
165     writerSentClose = true;
166 
167     try {
168       writer.writeClose(code, reason);
169     } catch (IOException e) {
170       if (connectionClosed.compareAndSet(false, true)) {
171         // Try to close the connection without masking the original exception.
172         try {
173           close();
174         } catch (IOException ignored) {
175         }
176       }
177       throw e;
178     }
179   }
180 
181   /** Replies and closes this web socket when a close frame is read from the peer. */
peerClose(int code, String reason)182   private void peerClose(int code, String reason) {
183     if (!writerSentClose) {
184       try {
185         writer.writeClose(code, reason);
186       } catch (IOException ignored) {
187       }
188     }
189 
190     if (connectionClosed.compareAndSet(false, true)) {
191       try {
192         close();
193       } catch (IOException ignored) {
194       }
195     }
196 
197     listener.onClose(code, reason);
198   }
199 
200   /** Called on the reader thread when an error occurs. */
readerErrorClose(IOException e)201   private void readerErrorClose(IOException e) {
202     // For protocol exceptions, try to inform the server of such.
203     if (!writerSentClose && e instanceof ProtocolException) {
204       try {
205         writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null);
206       } catch (IOException ignored) {
207       }
208     }
209 
210     if (connectionClosed.compareAndSet(false, true)) {
211       try {
212         close();
213       } catch (IOException ignored) {
214       }
215     }
216 
217     listener.onFailure(e, null);
218   }
219 
220   /** Perform any tear-down work (close the connection, shutdown executors). */
close()221   protected abstract void close() throws IOException;
222 }
223