• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 package org.chromium.net.urlconnection;
6 
7 import androidx.annotation.VisibleForTesting;
8 
9 import java.io.IOException;
10 import java.io.InterruptedIOException;
11 import java.net.SocketTimeoutException;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.Executor;
14 import java.util.concurrent.LinkedBlockingQueue;
15 import java.util.concurrent.RejectedExecutionException;
16 import java.util.concurrent.TimeUnit;
17 
18 /**
19  * A MessageLoop class for use in {@link CronetHttpURLConnection}.
20  */
21 @VisibleForTesting
22 public class MessageLoop implements Executor {
23     private final BlockingQueue<Runnable> mQueue;
24 
25     // Indicates whether this message loop is currently running.
26     private boolean mLoopRunning;
27 
28     // Indicates whether an InterruptedException or a RuntimeException has
29     // occurred in loop(). If true, the loop cannot be safely started because
30     // this might cause the loop to terminate immediately if there is a quit
31     // task enqueued.
32     private boolean mLoopFailed;
33     // The exception that caused mLoopFailed to be set to true. Will be
34     // rethrown if loop() is called again. If mLoopFailed is set then
35     // exactly one of mPriorInterruptedIOException and mPriorRuntimeException
36     // will be set.
37     private InterruptedIOException mPriorInterruptedIOException;
38     private RuntimeException mPriorRuntimeException;
39 
40     // Used when assertions are enabled to enforce single-threaded use.
41     private static final long INVALID_THREAD_ID = -1;
42     private long mThreadId = INVALID_THREAD_ID;
43 
MessageLoop()44     public MessageLoop() {
45         mQueue = new LinkedBlockingQueue<Runnable>();
46     }
47 
calledOnValidThread()48     private boolean calledOnValidThread() {
49         if (mThreadId == INVALID_THREAD_ID) {
50             mThreadId = Thread.currentThread().getId();
51             return true;
52         }
53         return mThreadId == Thread.currentThread().getId();
54     }
55 
56     /**
57      * Retrieves a task from the queue with the given timeout.
58      *
59      * @param useTimeout whether to use a timeout.
60      * @param timeoutNano Time to wait, in nanoseconds.
61      * @return A non-{@code null} Runnable from the queue.
62      * @throws InterruptedIOException
63      */
take(boolean useTimeout, long timeoutNano)64     private Runnable take(boolean useTimeout, long timeoutNano) throws InterruptedIOException {
65         Runnable task = null;
66         try {
67             if (!useTimeout) {
68                 task = mQueue.take(); // Blocks if the queue is empty.
69             } else {
70                 // poll returns null upon timeout.
71                 task = mQueue.poll(timeoutNano, TimeUnit.NANOSECONDS);
72             }
73         } catch (InterruptedException e) {
74             InterruptedIOException exception = new InterruptedIOException();
75             exception.initCause(e);
76             throw exception;
77         }
78         if (task == null) {
79             // This will terminate the loop.
80             throw new SocketTimeoutException();
81         }
82         return task;
83     }
84 
85     /**
86      * Runs the message loop. Be sure to call {@link MessageLoop#quit()}
87      * to end the loop. If an interruptedException occurs, the loop cannot be
88      * started again (see {@link #mLoopFailed}).
89      * @throws IOException
90      */
loop()91     public void loop() throws IOException {
92         loop(0);
93     }
94 
95     /**
96      * Runs the message loop. Be sure to call {@link MessageLoop#quit()}
97      * to end the loop. If an interruptedException occurs, the loop cannot be
98      * started again (see {@link #mLoopFailed}).
99      * @param timeoutMilli Timeout, in milliseconds, or 0 for no timeout.
100      * @throws IOException
101      */
loop(int timeoutMilli)102     public void loop(int timeoutMilli) throws IOException {
103         assert calledOnValidThread();
104         // Use System.nanoTime() which is monotonically increasing.
105         long startNano = System.nanoTime();
106         long timeoutNano = TimeUnit.NANOSECONDS.convert(timeoutMilli, TimeUnit.MILLISECONDS);
107         if (mLoopFailed) {
108             if (mPriorInterruptedIOException != null) {
109                 throw mPriorInterruptedIOException;
110             } else {
111                 throw mPriorRuntimeException;
112             }
113         }
114         if (mLoopRunning) {
115             throw new IllegalStateException(
116                     "Cannot run loop when it is already running.");
117         }
118         mLoopRunning = true;
119         while (mLoopRunning) {
120             try {
121                 if (timeoutMilli == 0) {
122                     take(false, 0).run();
123                 } else {
124                     take(true, timeoutNano - System.nanoTime() + startNano).run();
125                 }
126             } catch (InterruptedIOException e) {
127                 mLoopRunning = false;
128                 mLoopFailed = true;
129                 mPriorInterruptedIOException = e;
130                 throw e;
131             } catch (RuntimeException e) {
132                 mLoopRunning = false;
133                 mLoopFailed = true;
134                 mPriorRuntimeException = e;
135                 throw e;
136             }
137         }
138     }
139 
140     /**
141      * This causes {@link #loop()} to stop executing messages after the current
142      * message being executed.  Should only be called from the currently
143      * executing message.
144      */
quit()145     public void quit() {
146         assert calledOnValidThread();
147         mLoopRunning = false;
148     }
149 
150     /**
151      * Posts a task to the message loop.
152      */
153     @Override
execute(Runnable task)154     public void execute(Runnable task) throws RejectedExecutionException {
155         if (task == null) {
156             throw new IllegalArgumentException();
157         }
158         try {
159             mQueue.put(task);
160         } catch (InterruptedException e) {
161             // In theory this exception won't happen, since we have an blocking
162             // queue with Integer.MAX_Value capacity, put() call will not block.
163             throw new RejectedExecutionException(e);
164         }
165     }
166 
167     /**
168      * Returns whether the loop is currently running. Used in testing.
169      */
isRunning()170     public boolean isRunning() {
171         return mLoopRunning;
172     }
173 
174     /**
175      * Returns whether an exception occurred in {#loop()}. Used in testing.
176      */
hasLoopFailed()177     public boolean hasLoopFailed() {
178         return mLoopFailed;
179     }
180 }
181