• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2013 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.caliper.runner;
16 
17 import static com.google.common.base.Preconditions.checkState;
18 
19 import com.google.caliper.bridge.OpenedSocket;
20 import com.google.caliper.bridge.StartupAnnounceMessage;
21 import com.google.common.base.Supplier;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Multimaps;
24 import com.google.common.collect.SetMultimap;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.AbstractExecutionThreadService;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.Service;
29 import com.google.common.util.concurrent.SettableFuture;
30 
31 import java.io.IOException;
32 import java.net.ServerSocket;
33 import java.net.Socket;
34 import java.net.SocketException;
35 import java.util.Collection;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.UUID;
39 import java.util.concurrent.locks.Lock;
40 import java.util.concurrent.locks.ReentrantLock;
41 
42 import javax.annotation.concurrent.GuardedBy;
43 import javax.inject.Inject;
44 import javax.inject.Singleton;
45 
46 /**
47  * A {@link Service} that manages a {@link ServerSocket}.
48  *
49  * <p> This service provides two pieces of functionality:
50  * <ol>
51  *   <li>It adapts {@link ServerSocket#accept()} to a {@link ListenableFuture} of an opened socket.
52  *   <li>It demultiplexes incoming connections based on a {@link StartupAnnounceMessage} that is
53  *       sent over the socket.
54  * </ol>
55  *
56  * <p>The {@linkplain State states} of this service are as follows:
57  * <ul>
58  *   <li>{@linkplain State#NEW NEW} : Idle state, the {@link ServerSocket} is not open yet.
59  *   <li>{@linkplain State#STARTING STARTING} : {@link ServerSocket} is opened
60  *   <li>{@linkplain State#RUNNING RUNNING} : We are continuously accepting and parsing connections
61  *       from the socket.
62  *   <li>{@linkplain State#STOPPING STOPPING} : The server socket is closing and all pending
63  *       connection requests are terminated, connection requests will fail immediately.
64  *   <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, the socket is closed.
65  *   <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters
66  *       any errors while accepting connections or reading from connections.
67  * </ul>
68  *
69  * <p>Note to future self.  There have been a few attempts to make it so that it is no longer
70  * necessary to dedicate a thread to this service (basically turn it into an AbstractIdleService).
71  * The general idea has been to make callers to getConnection invoke accept, here is why it didn't
72  * work.
73  * <ul>
74  *     <li>If you make getConnection a blocking method that calls accept until it finds the
75  *         connection with its id, then there is no way to deal with connections that never arrive.
76  *         For example, if the worker crashes before connecting then the thread calling accept will
77  *         block forever waiting for it.  The only way to unblock a thread stuck on accept() is to
78  *         close the socket (this holds for ServerSocketChannels and normal ServerSockets), but we
79  *         cannot do that in this case because the socket is a shared resource.
80  *     <li>If you make getConnection a non-blocking, polling based method then you expose yourself
81  *         to potential deadlocks (due to missed signals) depending on what thread you poll from.
82  *         If the polling thread is any of the threads that are involved with processing messages
83  *         from the worker I believe there to be a deadlock risk.  Basically, if the worker sends
84  *         messages over its output streams and then calls Socket.connect, and no printing to stdout
85  *         or stderr occurs while connecting. Then if the runner polls, but misses the connection
86  *         and then tries to read again, it will deadlock.
87  * </ul>
88  */
89 @Singleton
90 final class ServerSocketService extends AbstractExecutionThreadService {
91   private enum Source { REQUEST, ACCEPT}
92 
93   private final Lock lock = new ReentrantLock();
94 
95   /**
96    * Contains futures that have either only been accepted or requested.  Once both occur they are
97    * removed from this map.
98    */
99   @GuardedBy("lock")
100   private final Map<UUID, SettableFuture<OpenedSocket>> halfFinishedConnections = Maps.newHashMap();
101 
102   /**
103    * Contains the history of connections so we can ensure that each id is only accepted once and
104    * requested once.
105    */
106   @GuardedBy("lock")
107   private final SetMultimap<Source, UUID> connectionState = Multimaps.newSetMultimap(
108       Maps.<Source, Collection<UUID>>newEnumMap(Source.class),
109       new Supplier<Set<UUID>>(){
110         @Override public Set<UUID> get() {
111           return Sets.newHashSet();
112         }
113       });
114 
115   private ServerSocket serverSocket;
116 
ServerSocketService()117   @Inject ServerSocketService() {}
118 
getPort()119   int getPort() {
120     awaitRunning();
121     checkState(serverSocket != null, "Socket has not been opened yet");
122     return serverSocket.getLocalPort();
123   }
124 
125   /**
126    * Returns a {@link ListenableFuture} for an open connection corresponding to the given id.
127    *
128    * <p>N.B. calling this method 'consumes' the connection and as such calling it twice with the
129    * same id will not work, the second future returned will never complete.  Similarly calling it
130    * with an id that does not correspond to a worker trying to connect will also fail.
131    */
getConnection(UUID id)132   public ListenableFuture<OpenedSocket> getConnection(UUID id) {
133     checkState(isRunning(), "You can only get connections from a running service: %s", this);
134     return getConnectionImpl(id, Source.REQUEST);
135   }
136 
startUp()137   @Override protected void startUp() throws Exception {
138     serverSocket = new ServerSocket(0 /* bind to any available port */);
139   }
140 
run()141   @Override protected void run() throws Exception {
142     while (isRunning()) {
143       Socket socket;
144       try {
145         socket = serverSocket.accept();
146       } catch (SocketException e) {
147         // we were closed
148         return;
149       }
150       OpenedSocket openedSocket = OpenedSocket.fromSocket(socket);
151 
152       UUID id = ((StartupAnnounceMessage) openedSocket.reader().read()).trialId();
153       // N.B. you should not call set with the lock held, to prevent same thread executors from
154       // running with the lock.
155       getConnectionImpl(id, Source.ACCEPT).set(openedSocket);
156     }
157   }
158 
159   /**
160    * Returns a {@link SettableFuture} from the map of connections.
161    *
162    * <p>This method has the following properties:
163    * <ul>
164    *    <li>If the id is present in {@link #connectionState}, this will throw an
165    *        {@link IllegalStateException}.
166    *    <li>The id and source are recorded in {@link #connectionState}
167    *    <li>If the future is already in {@link #halfFinishedConnections}, it is removed and
168    *        returned.
169    *    <li>If the future is not in {@link #halfFinishedConnections}, a new {@link SettableFuture}
170    *        is added and then returned.
171    *
172    * <p>These features together ensure that each connection can only be accepted once, only
173    * requested once and once both have happened it will be removed from
174    * {@link #halfFinishedConnections}.
175    */
getConnectionImpl(UUID id, Source source)176   private SettableFuture<OpenedSocket> getConnectionImpl(UUID id, Source source) {
177     lock.lock();
178     try {
179       checkState(connectionState.put(source, id), "Connection for %s has already been %s",
180           id, source);
181       SettableFuture<OpenedSocket> future = halfFinishedConnections.get(id);
182       if (future == null) {
183         future = SettableFuture.create();
184         halfFinishedConnections.put(id, future);
185       } else {
186         halfFinishedConnections.remove(id);
187       }
188       return future;
189     } finally {
190       lock.unlock();
191     }
192   }
193 
triggerShutdown()194   @Override protected void triggerShutdown() {
195     try {
196       serverSocket.close();
197     } catch (IOException e) {
198       // best effort...
199     }
200   }
201 
shutDown()202   @Override protected void shutDown() throws Exception {
203     serverSocket.close();
204     // Now we have either been asked to stop or have failed with some kind of exception, we want to
205     // notify all pending requests, so if there are any references outside of this class they will
206     // notice.
207     lock.lock();
208     try {
209       for (SettableFuture<OpenedSocket> future : halfFinishedConnections.values()) {
210         future.setException(new Exception("The socket has been closed"));
211       }
212       halfFinishedConnections.clear();
213       connectionState.clear();
214     } finally {
215       lock.unlock();
216     }
217   }
218 }
219