• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2022 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.okhttp;
18 
19 import com.google.common.base.MoreObjects;
20 import com.google.common.base.Preconditions;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import io.grpc.InternalChannelz;
24 import io.grpc.InternalInstrumented;
25 import io.grpc.InternalLogId;
26 import io.grpc.ServerStreamTracer;
27 import io.grpc.internal.InternalServer;
28 import io.grpc.internal.ObjectPool;
29 import io.grpc.internal.ServerListener;
30 import java.io.IOException;
31 import java.net.ServerSocket;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40 import javax.net.ServerSocketFactory;
41 
42 final class OkHttpServer implements InternalServer {
43   private static final Logger log = Logger.getLogger(OkHttpServer.class.getName());
44 
45   private final SocketAddress originalListenAddress;
46   private final ServerSocketFactory socketFactory;
47   private final ObjectPool<Executor> transportExecutorPool;
48   private final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
49   private final OkHttpServerTransport.Config transportConfig;
50   private final InternalChannelz channelz;
51   private ServerSocket serverSocket;
52   private SocketAddress actualListenAddress;
53   private InternalInstrumented<InternalChannelz.SocketStats> listenInstrumented;
54   private Executor transportExecutor;
55   private ScheduledExecutorService scheduledExecutorService;
56   private ServerListener listener;
57   private boolean shutdown;
58 
OkHttpServer( OkHttpServerBuilder builder, List<? extends ServerStreamTracer.Factory> streamTracerFactories, InternalChannelz channelz)59   public OkHttpServer(
60       OkHttpServerBuilder builder,
61       List<? extends ServerStreamTracer.Factory> streamTracerFactories,
62       InternalChannelz channelz) {
63     this.originalListenAddress = Preconditions.checkNotNull(builder.listenAddress, "listenAddress");
64     this.socketFactory = Preconditions.checkNotNull(builder.socketFactory, "socketFactory");
65     this.transportExecutorPool =
66         Preconditions.checkNotNull(builder.transportExecutorPool, "transportExecutorPool");
67     this.scheduledExecutorServicePool =
68         Preconditions.checkNotNull(
69             builder.scheduledExecutorServicePool, "scheduledExecutorServicePool");
70     this.transportConfig = new OkHttpServerTransport.Config(builder, streamTracerFactories);
71     this.channelz = Preconditions.checkNotNull(channelz, "channelz");
72   }
73 
74   @Override
start(ServerListener listener)75   public void start(ServerListener listener) throws IOException {
76     this.listener = Preconditions.checkNotNull(listener, "listener");
77     ServerSocket serverSocket = socketFactory.createServerSocket();
78     try {
79       serverSocket.bind(originalListenAddress);
80     } catch (IOException t) {
81       serverSocket.close();
82       throw t;
83     }
84 
85     this.serverSocket = serverSocket;
86     this.actualListenAddress = serverSocket.getLocalSocketAddress();
87     this.listenInstrumented = new ListenSocket(serverSocket);
88     this.transportExecutor = transportExecutorPool.getObject();
89     // Keep reference alive to avoid frequent re-creation by server transports
90     this.scheduledExecutorService = scheduledExecutorServicePool.getObject();
91     channelz.addListenSocket(this.listenInstrumented);
92     transportExecutor.execute(this::acceptConnections);
93   }
94 
acceptConnections()95   private void acceptConnections() {
96     try {
97       while (true) {
98         Socket socket;
99         try {
100           socket = serverSocket.accept();
101         } catch (IOException ex) {
102           if (shutdown) {
103             break;
104           }
105           throw ex;
106         }
107         OkHttpServerTransport transport = new OkHttpServerTransport(transportConfig, socket);
108         transport.start(listener.transportCreated(transport));
109       }
110     } catch (Throwable t) {
111       log.log(Level.SEVERE, "Accept loop failed", t);
112     }
113     listener.serverShutdown();
114   }
115 
116   @Override
shutdown()117   public void shutdown() {
118     if (shutdown) {
119       return;
120     }
121     shutdown = true;
122 
123     if (serverSocket == null) {
124       return;
125     }
126     channelz.removeListenSocket(this.listenInstrumented);
127     try {
128       serverSocket.close();
129     } catch (IOException ex) {
130       log.log(Level.WARNING, "Failed closing server socket", serverSocket);
131     }
132     transportExecutor = transportExecutorPool.returnObject(transportExecutor);
133     scheduledExecutorService = scheduledExecutorServicePool.returnObject(scheduledExecutorService);
134   }
135 
136   @Override
getListenSocketAddress()137   public SocketAddress getListenSocketAddress() {
138     return actualListenAddress;
139   }
140 
141   @Override
getListenSocketStats()142   public InternalInstrumented<InternalChannelz.SocketStats> getListenSocketStats() {
143     return listenInstrumented;
144   }
145 
146   @Override
getListenSocketAddresses()147   public List<? extends SocketAddress> getListenSocketAddresses() {
148     return Collections.singletonList(getListenSocketAddress());
149   }
150 
151   @Override
getListenSocketStatsList()152   public List<InternalInstrumented<InternalChannelz.SocketStats>> getListenSocketStatsList() {
153     return Collections.singletonList(getListenSocketStats());
154   }
155 
156   private static final class ListenSocket
157       implements InternalInstrumented<InternalChannelz.SocketStats> {
158     private final InternalLogId id;
159     private final ServerSocket socket;
160 
ListenSocket(ServerSocket socket)161     public ListenSocket(ServerSocket socket) {
162       this.socket = socket;
163       this.id = InternalLogId.allocate(getClass(), String.valueOf(socket.getLocalSocketAddress()));
164     }
165 
166     @Override
getStats()167     public ListenableFuture<InternalChannelz.SocketStats> getStats() {
168       return Futures.immediateFuture(new InternalChannelz.SocketStats(
169           /*data=*/ null,
170           socket.getLocalSocketAddress(),
171           /*remote=*/ null,
172           new InternalChannelz.SocketOptions.Builder().build(),
173           /*security=*/ null));
174     }
175 
176     @Override
getLogId()177     public InternalLogId getLogId() {
178       return id;
179     }
180 
181     @Override
toString()182     public String toString() {
183       return MoreObjects.toStringHelper(this)
184           .add("logId", id.getId())
185           .add("socket", socket)
186           .toString();
187     }
188   }
189 }
190