1 /* 2 * Copyright 2022 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.okhttp; 18 19 import com.google.common.base.MoreObjects; 20 import com.google.common.base.Preconditions; 21 import com.google.common.util.concurrent.Futures; 22 import com.google.common.util.concurrent.ListenableFuture; 23 import io.grpc.InternalChannelz; 24 import io.grpc.InternalInstrumented; 25 import io.grpc.InternalLogId; 26 import io.grpc.ServerStreamTracer; 27 import io.grpc.internal.InternalServer; 28 import io.grpc.internal.ObjectPool; 29 import io.grpc.internal.ServerListener; 30 import java.io.IOException; 31 import java.net.ServerSocket; 32 import java.net.Socket; 33 import java.net.SocketAddress; 34 import java.util.Collections; 35 import java.util.List; 36 import java.util.concurrent.Executor; 37 import java.util.concurrent.ScheduledExecutorService; 38 import java.util.logging.Level; 39 import java.util.logging.Logger; 40 import javax.net.ServerSocketFactory; 41 42 final class OkHttpServer implements InternalServer { 43 private static final Logger log = Logger.getLogger(OkHttpServer.class.getName()); 44 45 private final SocketAddress originalListenAddress; 46 private final ServerSocketFactory socketFactory; 47 private final ObjectPool<Executor> transportExecutorPool; 48 private final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool; 49 private final OkHttpServerTransport.Config transportConfig; 50 private final InternalChannelz channelz; 51 private ServerSocket serverSocket; 52 private SocketAddress actualListenAddress; 53 private InternalInstrumented<InternalChannelz.SocketStats> listenInstrumented; 54 private Executor transportExecutor; 55 private ScheduledExecutorService scheduledExecutorService; 56 private ServerListener listener; 57 private boolean shutdown; 58 OkHttpServer( OkHttpServerBuilder builder, List<? extends ServerStreamTracer.Factory> streamTracerFactories, InternalChannelz channelz)59 public OkHttpServer( 60 OkHttpServerBuilder builder, 61 List<? extends ServerStreamTracer.Factory> streamTracerFactories, 62 InternalChannelz channelz) { 63 this.originalListenAddress = Preconditions.checkNotNull(builder.listenAddress, "listenAddress"); 64 this.socketFactory = Preconditions.checkNotNull(builder.socketFactory, "socketFactory"); 65 this.transportExecutorPool = 66 Preconditions.checkNotNull(builder.transportExecutorPool, "transportExecutorPool"); 67 this.scheduledExecutorServicePool = 68 Preconditions.checkNotNull( 69 builder.scheduledExecutorServicePool, "scheduledExecutorServicePool"); 70 this.transportConfig = new OkHttpServerTransport.Config(builder, streamTracerFactories); 71 this.channelz = Preconditions.checkNotNull(channelz, "channelz"); 72 } 73 74 @Override start(ServerListener listener)75 public void start(ServerListener listener) throws IOException { 76 this.listener = Preconditions.checkNotNull(listener, "listener"); 77 ServerSocket serverSocket = socketFactory.createServerSocket(); 78 try { 79 serverSocket.bind(originalListenAddress); 80 } catch (IOException t) { 81 serverSocket.close(); 82 throw t; 83 } 84 85 this.serverSocket = serverSocket; 86 this.actualListenAddress = serverSocket.getLocalSocketAddress(); 87 this.listenInstrumented = new ListenSocket(serverSocket); 88 this.transportExecutor = transportExecutorPool.getObject(); 89 // Keep reference alive to avoid frequent re-creation by server transports 90 this.scheduledExecutorService = scheduledExecutorServicePool.getObject(); 91 channelz.addListenSocket(this.listenInstrumented); 92 transportExecutor.execute(this::acceptConnections); 93 } 94 acceptConnections()95 private void acceptConnections() { 96 try { 97 while (true) { 98 Socket socket; 99 try { 100 socket = serverSocket.accept(); 101 } catch (IOException ex) { 102 if (shutdown) { 103 break; 104 } 105 throw ex; 106 } 107 OkHttpServerTransport transport = new OkHttpServerTransport(transportConfig, socket); 108 transport.start(listener.transportCreated(transport)); 109 } 110 } catch (Throwable t) { 111 log.log(Level.SEVERE, "Accept loop failed", t); 112 } 113 listener.serverShutdown(); 114 } 115 116 @Override shutdown()117 public void shutdown() { 118 if (shutdown) { 119 return; 120 } 121 shutdown = true; 122 123 if (serverSocket == null) { 124 return; 125 } 126 channelz.removeListenSocket(this.listenInstrumented); 127 try { 128 serverSocket.close(); 129 } catch (IOException ex) { 130 log.log(Level.WARNING, "Failed closing server socket", serverSocket); 131 } 132 transportExecutor = transportExecutorPool.returnObject(transportExecutor); 133 scheduledExecutorService = scheduledExecutorServicePool.returnObject(scheduledExecutorService); 134 } 135 136 @Override getListenSocketAddress()137 public SocketAddress getListenSocketAddress() { 138 return actualListenAddress; 139 } 140 141 @Override getListenSocketStats()142 public InternalInstrumented<InternalChannelz.SocketStats> getListenSocketStats() { 143 return listenInstrumented; 144 } 145 146 @Override getListenSocketAddresses()147 public List<? extends SocketAddress> getListenSocketAddresses() { 148 return Collections.singletonList(getListenSocketAddress()); 149 } 150 151 @Override getListenSocketStatsList()152 public List<InternalInstrumented<InternalChannelz.SocketStats>> getListenSocketStatsList() { 153 return Collections.singletonList(getListenSocketStats()); 154 } 155 156 private static final class ListenSocket 157 implements InternalInstrumented<InternalChannelz.SocketStats> { 158 private final InternalLogId id; 159 private final ServerSocket socket; 160 ListenSocket(ServerSocket socket)161 public ListenSocket(ServerSocket socket) { 162 this.socket = socket; 163 this.id = InternalLogId.allocate(getClass(), String.valueOf(socket.getLocalSocketAddress())); 164 } 165 166 @Override getStats()167 public ListenableFuture<InternalChannelz.SocketStats> getStats() { 168 return Futures.immediateFuture(new InternalChannelz.SocketStats( 169 /*data=*/ null, 170 socket.getLocalSocketAddress(), 171 /*remote=*/ null, 172 new InternalChannelz.SocketOptions.Builder().build(), 173 /*security=*/ null)); 174 } 175 176 @Override getLogId()177 public InternalLogId getLogId() { 178 return id; 179 } 180 181 @Override toString()182 public String toString() { 183 return MoreObjects.toStringHelper(this) 184 .add("logId", id.getId()) 185 .add("socket", socket) 186 .toString(); 187 } 188 } 189 } 190