• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2013 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.squareup.okhttp;
17 
18 import com.squareup.okhttp.Call.AsyncCall;
19 import com.squareup.okhttp.internal.Util;
20 import com.squareup.okhttp.internal.http.HttpEngine;
21 import java.util.ArrayDeque;
22 import java.util.Deque;
23 import java.util.Iterator;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 
29 /**
30  * Policy on when async requests are executed.
31  *
32  * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you
33  * supply your own executor, it should be able to run {@linkplain #getMaxRequests the
34  * configured maximum} number of calls concurrently.
35  */
36 public final class Dispatcher {
37   private int maxRequests = 64;
38   private int maxRequestsPerHost = 5;
39 
40   /** Executes calls. Created lazily. */
41   private ExecutorService executorService;
42 
43   /** Ready calls in the order they'll be run. */
44   private final Deque<AsyncCall> readyCalls = new ArrayDeque<>();
45 
46   /** Running calls. Includes canceled calls that haven't finished yet. */
47   private final Deque<AsyncCall> runningCalls = new ArrayDeque<>();
48 
49   /** In-flight synchronous calls. Includes canceled calls that haven't finished yet. */
50   private final Deque<Call> executedCalls = new ArrayDeque<>();
51 
Dispatcher(ExecutorService executorService)52   public Dispatcher(ExecutorService executorService) {
53     this.executorService = executorService;
54   }
55 
Dispatcher()56   public Dispatcher() {
57   }
58 
getExecutorService()59   public synchronized ExecutorService getExecutorService() {
60     if (executorService == null) {
61       executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
62           new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
63     }
64     return executorService;
65   }
66 
67   /**
68    * Set the maximum number of requests to execute concurrently. Above this
69    * requests queue in memory, waiting for the running calls to complete.
70    *
71    * <p>If more than {@code maxRequests} requests are in flight when this is
72    * invoked, those requests will remain in flight.
73    */
setMaxRequests(int maxRequests)74   public synchronized void setMaxRequests(int maxRequests) {
75     if (maxRequests < 1) {
76       throw new IllegalArgumentException("max < 1: " + maxRequests);
77     }
78     this.maxRequests = maxRequests;
79     promoteCalls();
80   }
81 
getMaxRequests()82   public synchronized int getMaxRequests() {
83     return maxRequests;
84   }
85 
86   /**
87    * Set the maximum number of requests for each host to execute concurrently.
88    * This limits requests by the URL's host name. Note that concurrent requests
89    * to a single IP address may still exceed this limit: multiple hostnames may
90    * share an IP address or be routed through the same HTTP proxy.
91    *
92    * <p>If more than {@code maxRequestsPerHost} requests are in flight when this
93    * is invoked, those requests will remain in flight.
94    */
setMaxRequestsPerHost(int maxRequestsPerHost)95   public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
96     if (maxRequestsPerHost < 1) {
97       throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
98     }
99     this.maxRequestsPerHost = maxRequestsPerHost;
100     promoteCalls();
101   }
102 
getMaxRequestsPerHost()103   public synchronized int getMaxRequestsPerHost() {
104     return maxRequestsPerHost;
105   }
106 
enqueue(AsyncCall call)107   synchronized void enqueue(AsyncCall call) {
108     if (runningCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
109       runningCalls.add(call);
110       getExecutorService().execute(call);
111     } else {
112       readyCalls.add(call);
113     }
114   }
115 
116   /** Cancel all calls with the tag {@code tag}. */
cancel(Object tag)117   public synchronized void cancel(Object tag) {
118     for (AsyncCall call : readyCalls) {
119       if (Util.equal(tag, call.tag())) {
120         call.cancel();
121       }
122     }
123 
124     for (AsyncCall call : runningCalls) {
125       if (Util.equal(tag, call.tag())) {
126         call.get().canceled = true;
127         HttpEngine engine = call.get().engine;
128         if (engine != null) engine.cancel();
129       }
130     }
131 
132     for (Call call : executedCalls) {
133       if (Util.equal(tag, call.tag())) {
134         call.cancel();
135       }
136     }
137   }
138 
139   /** Used by {@code AsyncCall#run} to signal completion. */
finished(AsyncCall call)140   synchronized void finished(AsyncCall call) {
141     if (!runningCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
142     promoteCalls();
143   }
144 
promoteCalls()145   private void promoteCalls() {
146     if (runningCalls.size() >= maxRequests) return; // Already running max capacity.
147     if (readyCalls.isEmpty()) return; // No ready calls to promote.
148 
149     for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
150       AsyncCall call = i.next();
151 
152       if (runningCallsForHost(call) < maxRequestsPerHost) {
153         i.remove();
154         runningCalls.add(call);
155         getExecutorService().execute(call);
156       }
157 
158       if (runningCalls.size() >= maxRequests) return; // Reached max capacity.
159     }
160   }
161 
162   /** Returns the number of running calls that share a host with {@code call}. */
runningCallsForHost(AsyncCall call)163   private int runningCallsForHost(AsyncCall call) {
164     int result = 0;
165     for (AsyncCall c : runningCalls) {
166       if (c.host().equals(call.host())) result++;
167     }
168     return result;
169   }
170 
171   /** Used by {@code Call#execute} to signal it is in-flight. */
executed(Call call)172   synchronized void executed(Call call) {
173     executedCalls.add(call);
174   }
175 
176   /** Used by {@code Call#execute} to signal completion. */
finished(Call call)177   synchronized void finished(Call call) {
178     if (!executedCalls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
179   }
180 
getRunningCallCount()181   public synchronized int getRunningCallCount() {
182     return runningCalls.size();
183   }
184 
getQueuedCallCount()185   public synchronized int getQueuedCallCount() {
186     return readyCalls.size();
187   }
188 }
189