1 // Copyright 2014 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.urlconnection; 6 7 import androidx.annotation.VisibleForTesting; 8 9 import java.io.IOException; 10 import java.io.InterruptedIOException; 11 import java.net.SocketTimeoutException; 12 import java.util.concurrent.BlockingQueue; 13 import java.util.concurrent.Executor; 14 import java.util.concurrent.LinkedBlockingQueue; 15 import java.util.concurrent.RejectedExecutionException; 16 import java.util.concurrent.TimeUnit; 17 18 /** 19 * A MessageLoop class for use in {@link CronetHttpURLConnection}. 20 */ 21 @VisibleForTesting 22 public class MessageLoop implements Executor { 23 private final BlockingQueue<Runnable> mQueue; 24 25 // Indicates whether this message loop is currently running. 26 private boolean mLoopRunning; 27 28 // Indicates whether an InterruptedException or a RuntimeException has 29 // occurred in loop(). If true, the loop cannot be safely started because 30 // this might cause the loop to terminate immediately if there is a quit 31 // task enqueued. 32 private boolean mLoopFailed; 33 // The exception that caused mLoopFailed to be set to true. Will be 34 // rethrown if loop() is called again. If mLoopFailed is set then 35 // exactly one of mPriorInterruptedIOException and mPriorRuntimeException 36 // will be set. 37 private InterruptedIOException mPriorInterruptedIOException; 38 private RuntimeException mPriorRuntimeException; 39 40 // Used when assertions are enabled to enforce single-threaded use. 41 private static final long INVALID_THREAD_ID = -1; 42 private long mThreadId = INVALID_THREAD_ID; 43 MessageLoop()44 public MessageLoop() { 45 mQueue = new LinkedBlockingQueue<Runnable>(); 46 } 47 calledOnValidThread()48 private boolean calledOnValidThread() { 49 if (mThreadId == INVALID_THREAD_ID) { 50 mThreadId = Thread.currentThread().getId(); 51 return true; 52 } 53 return mThreadId == Thread.currentThread().getId(); 54 } 55 56 /** 57 * Retrieves a task from the queue with the given timeout. 58 * 59 * @param useTimeout whether to use a timeout. 60 * @param timeoutNano Time to wait, in nanoseconds. 61 * @return A non-{@code null} Runnable from the queue. 62 * @throws InterruptedIOException 63 */ take(boolean useTimeout, long timeoutNano)64 private Runnable take(boolean useTimeout, long timeoutNano) throws InterruptedIOException { 65 Runnable task = null; 66 try { 67 if (!useTimeout) { 68 task = mQueue.take(); // Blocks if the queue is empty. 69 } else { 70 // poll returns null upon timeout. 71 task = mQueue.poll(timeoutNano, TimeUnit.NANOSECONDS); 72 } 73 } catch (InterruptedException e) { 74 InterruptedIOException exception = new InterruptedIOException(); 75 exception.initCause(e); 76 throw exception; 77 } 78 if (task == null) { 79 // This will terminate the loop. 80 throw new SocketTimeoutException(); 81 } 82 return task; 83 } 84 85 /** 86 * Runs the message loop. Be sure to call {@link MessageLoop#quit()} 87 * to end the loop. If an interruptedException occurs, the loop cannot be 88 * started again (see {@link #mLoopFailed}). 89 * @throws IOException 90 */ loop()91 public void loop() throws IOException { 92 loop(0); 93 } 94 95 /** 96 * Runs the message loop. Be sure to call {@link MessageLoop#quit()} 97 * to end the loop. If an interruptedException occurs, the loop cannot be 98 * started again (see {@link #mLoopFailed}). 99 * @param timeoutMilli Timeout, in milliseconds, or 0 for no timeout. 100 * @throws IOException 101 */ loop(int timeoutMilli)102 public void loop(int timeoutMilli) throws IOException { 103 assert calledOnValidThread(); 104 // Use System.nanoTime() which is monotonically increasing. 105 long startNano = System.nanoTime(); 106 long timeoutNano = TimeUnit.NANOSECONDS.convert(timeoutMilli, TimeUnit.MILLISECONDS); 107 if (mLoopFailed) { 108 if (mPriorInterruptedIOException != null) { 109 throw mPriorInterruptedIOException; 110 } else { 111 throw mPriorRuntimeException; 112 } 113 } 114 if (mLoopRunning) { 115 throw new IllegalStateException( 116 "Cannot run loop when it is already running."); 117 } 118 mLoopRunning = true; 119 while (mLoopRunning) { 120 try { 121 if (timeoutMilli == 0) { 122 take(false, 0).run(); 123 } else { 124 take(true, timeoutNano - System.nanoTime() + startNano).run(); 125 } 126 } catch (InterruptedIOException e) { 127 mLoopRunning = false; 128 mLoopFailed = true; 129 mPriorInterruptedIOException = e; 130 throw e; 131 } catch (RuntimeException e) { 132 mLoopRunning = false; 133 mLoopFailed = true; 134 mPriorRuntimeException = e; 135 throw e; 136 } 137 } 138 } 139 140 /** 141 * This causes {@link #loop()} to stop executing messages after the current 142 * message being executed. Should only be called from the currently 143 * executing message. 144 */ quit()145 public void quit() { 146 assert calledOnValidThread(); 147 mLoopRunning = false; 148 } 149 150 /** 151 * Posts a task to the message loop. 152 */ 153 @Override execute(Runnable task)154 public void execute(Runnable task) throws RejectedExecutionException { 155 if (task == null) { 156 throw new IllegalArgumentException(); 157 } 158 try { 159 mQueue.put(task); 160 } catch (InterruptedException e) { 161 // In theory this exception won't happen, since we have an blocking 162 // queue with Integer.MAX_Value capacity, put() call will not block. 163 throw new RejectedExecutionException(e); 164 } 165 } 166 167 /** 168 * Returns whether the loop is currently running. Used in testing. 169 */ isRunning()170 public boolean isRunning() { 171 return mLoopRunning; 172 } 173 174 /** 175 * Returns whether an exception occurred in {#loop()}. Used in testing. 176 */ hasLoopFailed()177 public boolean hasLoopFailed() { 178 return mLoopFailed; 179 } 180 } 181