1 // Copyright 2014 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.urlconnection; 6 7 import androidx.annotation.VisibleForTesting; 8 9 import java.io.IOException; 10 import java.io.InterruptedIOException; 11 import java.net.SocketTimeoutException; 12 import java.util.concurrent.BlockingQueue; 13 import java.util.concurrent.Executor; 14 import java.util.concurrent.LinkedBlockingQueue; 15 import java.util.concurrent.RejectedExecutionException; 16 import java.util.concurrent.TimeUnit; 17 18 /** 19 * A MessageLoop class for use in {@link CronetHttpURLConnection}. 20 */ 21 @VisibleForTesting 22 public class MessageLoop implements Executor { 23 private final BlockingQueue<Runnable> mQueue; 24 25 // Indicates whether this message loop is currently running. 26 private boolean mLoopRunning; 27 28 // Indicates whether an InterruptedException or a RuntimeException has 29 // occurred in loop(). If true, the loop cannot be safely started because 30 // this might cause the loop to terminate immediately if there is a quit 31 // task enqueued. 32 private boolean mLoopFailed; 33 // The exception that caused mLoopFailed to be set to true. Will be 34 // rethrown if loop() is called again. If mLoopFailed is set then 35 // exactly one of mPriorInterruptedIOException and mPriorRuntimeException 36 // will be set. 37 private InterruptedIOException mPriorInterruptedIOException; 38 private RuntimeException mPriorRuntimeException; 39 40 // Used when assertions are enabled to enforce single-threaded use. 41 private static final long INVALID_THREAD_ID = -1; 42 private long mThreadId = INVALID_THREAD_ID; 43 MessageLoop()44 public MessageLoop() { 45 mQueue = new LinkedBlockingQueue<Runnable>(); 46 } 47 calledOnValidThread()48 private boolean calledOnValidThread() { 49 if (mThreadId == INVALID_THREAD_ID) { 50 mThreadId = Thread.currentThread().getId(); 51 return true; 52 } 53 return mThreadId == Thread.currentThread().getId(); 54 } 55 56 /** 57 * Retrieves a task from the queue with the given timeout. 58 * 59 * @param useTimeout whether to use a timeout. 60 * @param timeoutNano Time to wait, in nanoseconds. 61 * @return A non-{@code null} Runnable from the queue. 62 * @throws InterruptedIOException 63 */ take(boolean useTimeout, long timeoutNano)64 private Runnable take(boolean useTimeout, long timeoutNano) throws InterruptedIOException { 65 Runnable task = null; 66 try { 67 if (!useTimeout) { 68 task = mQueue.take(); // Blocks if the queue is empty. 69 } else { 70 // poll returns null upon timeout. 71 task = mQueue.poll(timeoutNano, TimeUnit.NANOSECONDS); 72 } 73 } catch (InterruptedException e) { 74 InterruptedIOException exception = new InterruptedIOException(); 75 exception.initCause(e); 76 throw exception; 77 } 78 if (task == null) { 79 // This will terminate the loop. 80 throw new SocketTimeoutException(); 81 } 82 return task; 83 } 84 85 /** 86 * Runs the message loop. Be sure to call {@link MessageLoop#quit()} 87 * to end the loop. If an interruptedException occurs, the loop cannot be 88 * started again (see {@link #mLoopFailed}). 89 * @throws IOException 90 */ loop()91 public void loop() throws IOException { 92 loop(0); 93 } 94 95 /** 96 * Runs the message loop. Be sure to call {@link MessageLoop#quit()} 97 * to end the loop. If an interruptedException occurs, the loop cannot be 98 * started again (see {@link #mLoopFailed}). 99 * @param timeoutMilli Timeout, in milliseconds, or 0 for no timeout. 100 * @throws IOException 101 */ loop(int timeoutMilli)102 public void loop(int timeoutMilli) throws IOException { 103 assert calledOnValidThread(); 104 // Use System.nanoTime() which is monotonically increasing. 105 long startNano = System.nanoTime(); 106 long timeoutNano = TimeUnit.NANOSECONDS.convert(timeoutMilli, TimeUnit.MILLISECONDS); 107 if (mLoopFailed) { 108 if (mPriorInterruptedIOException != null) { 109 throw mPriorInterruptedIOException; 110 } else { 111 throw mPriorRuntimeException; 112 } 113 } 114 if (mLoopRunning) { 115 throw new IllegalStateException("Cannot run loop when it is already running."); 116 } 117 mLoopRunning = true; 118 while (mLoopRunning) { 119 try { 120 if (timeoutMilli == 0) { 121 take(false, 0).run(); 122 } else { 123 take(true, timeoutNano - System.nanoTime() + startNano).run(); 124 } 125 } catch (InterruptedIOException e) { 126 mLoopRunning = false; 127 mLoopFailed = true; 128 mPriorInterruptedIOException = e; 129 throw e; 130 } catch (RuntimeException e) { 131 mLoopRunning = false; 132 mLoopFailed = true; 133 mPriorRuntimeException = e; 134 throw e; 135 } 136 } 137 } 138 139 /** 140 * This causes {@link #loop()} to stop executing messages after the current 141 * message being executed. Should only be called from the currently 142 * executing message. 143 */ quit()144 public void quit() { 145 assert calledOnValidThread(); 146 mLoopRunning = false; 147 } 148 149 /** Posts a task to the message loop. */ 150 @Override execute(Runnable task)151 public void execute(Runnable task) throws RejectedExecutionException { 152 if (task == null) { 153 throw new IllegalArgumentException(); 154 } 155 try { 156 mQueue.put(task); 157 } catch (InterruptedException e) { 158 // In theory this exception won't happen, since we have an blocking 159 // queue with Integer.MAX_Value capacity, put() call will not block. 160 throw new RejectedExecutionException(e); 161 } 162 } 163 164 /** Returns whether the loop is currently running. Used in testing. */ isRunning()165 public boolean isRunning() { 166 return mLoopRunning; 167 } 168 169 /** Returns whether an exception occurred in {#loop()}. Used in testing. */ hasLoopFailed()170 public boolean hasLoopFailed() { 171 return mLoopFailed; 172 } 173 } 174