1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.util.concurrent.MoreExecutors; 24 import io.grpc.BinaryLog; 25 import io.grpc.BindableService; 26 import io.grpc.CompressorRegistry; 27 import io.grpc.Context; 28 import io.grpc.DecompressorRegistry; 29 import io.grpc.HandlerRegistry; 30 import io.grpc.Internal; 31 import io.grpc.InternalChannelz; 32 import io.grpc.InternalNotifyOnServerBuild; 33 import io.grpc.Server; 34 import io.grpc.ServerBuilder; 35 import io.grpc.ServerInterceptor; 36 import io.grpc.ServerMethodDefinition; 37 import io.grpc.ServerServiceDefinition; 38 import io.grpc.ServerStreamTracer; 39 import io.grpc.ServerTransportFilter; 40 import io.opencensus.trace.Tracing; 41 import java.util.ArrayList; 42 import java.util.Collections; 43 import java.util.List; 44 import java.util.concurrent.Executor; 45 import java.util.concurrent.TimeUnit; 46 import javax.annotation.Nullable; 47 48 /** 49 * The base class for server builders. 50 * 51 * @param <T> The concrete type for this builder. 52 */ 53 public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuilder<T>> 54 extends ServerBuilder<T> { 55 forPort(int port)56 public static ServerBuilder<?> forPort(int port) { 57 throw new UnsupportedOperationException("Subclass failed to hide static factory"); 58 } 59 60 private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL = 61 SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR); 62 private static final HandlerRegistry DEFAULT_FALLBACK_REGISTRY = new HandlerRegistry() { 63 @Override 64 public List<ServerServiceDefinition> getServices() { 65 return Collections.emptyList(); 66 } 67 68 @Override 69 @Nullable 70 public ServerMethodDefinition<?, ?> lookupMethod( 71 String methodName, @Nullable String authority) { 72 return null; 73 } 74 }; 75 private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY = 76 DecompressorRegistry.getDefaultInstance(); 77 private static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY = 78 CompressorRegistry.getDefaultInstance(); 79 private static final long DEFAULT_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(120); 80 81 final InternalHandlerRegistry.Builder registryBuilder = 82 new InternalHandlerRegistry.Builder(); 83 84 final List<ServerTransportFilter> transportFilters = 85 new ArrayList<>(); 86 87 final List<ServerInterceptor> interceptors = new ArrayList<>(); 88 89 private final List<InternalNotifyOnServerBuild> notifyOnBuildList = 90 new ArrayList<>(); 91 92 private final List<ServerStreamTracer.Factory> streamTracerFactories = 93 new ArrayList<ServerStreamTracer.Factory>(); 94 95 HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY; 96 97 ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL; 98 99 DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; 100 101 CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; 102 103 long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS; 104 105 @Nullable 106 private CensusStatsModule censusStatsOverride; 107 108 private boolean statsEnabled = true; 109 private boolean recordStartedRpcs = true; 110 private boolean recordFinishedRpcs = true; 111 private boolean tracingEnabled = true; 112 113 @Nullable 114 protected BinaryLog binlog; 115 protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory(); 116 117 protected InternalChannelz channelz = InternalChannelz.instance(); 118 protected CallTracer.Factory callTracerFactory = CallTracer.getDefaultFactory(); 119 120 @Override directExecutor()121 public final T directExecutor() { 122 return executor(MoreExecutors.directExecutor()); 123 } 124 125 @Override executor(@ullable Executor executor)126 public final T executor(@Nullable Executor executor) { 127 if (executor != null) { 128 this.executorPool = new FixedObjectPool<Executor>(executor); 129 } else { 130 this.executorPool = DEFAULT_EXECUTOR_POOL; 131 } 132 return thisT(); 133 } 134 135 @Override addService(ServerServiceDefinition service)136 public final T addService(ServerServiceDefinition service) { 137 registryBuilder.addService(service); 138 return thisT(); 139 } 140 141 @Override addService(BindableService bindableService)142 public final T addService(BindableService bindableService) { 143 if (bindableService instanceof InternalNotifyOnServerBuild) { 144 notifyOnBuildList.add((InternalNotifyOnServerBuild) bindableService); 145 } 146 return addService(bindableService.bindService()); 147 } 148 149 @Override addTransportFilter(ServerTransportFilter filter)150 public final T addTransportFilter(ServerTransportFilter filter) { 151 transportFilters.add(checkNotNull(filter, "filter")); 152 return thisT(); 153 } 154 155 @Override intercept(ServerInterceptor interceptor)156 public final T intercept(ServerInterceptor interceptor) { 157 interceptors.add(interceptor); 158 return thisT(); 159 } 160 161 @Override addStreamTracerFactory(ServerStreamTracer.Factory factory)162 public final T addStreamTracerFactory(ServerStreamTracer.Factory factory) { 163 streamTracerFactories.add(checkNotNull(factory, "factory")); 164 return thisT(); 165 } 166 167 @Override fallbackHandlerRegistry(HandlerRegistry registry)168 public final T fallbackHandlerRegistry(HandlerRegistry registry) { 169 if (registry != null) { 170 this.fallbackRegistry = registry; 171 } else { 172 this.fallbackRegistry = DEFAULT_FALLBACK_REGISTRY; 173 } 174 return thisT(); 175 } 176 177 @Override decompressorRegistry(DecompressorRegistry registry)178 public final T decompressorRegistry(DecompressorRegistry registry) { 179 if (registry != null) { 180 decompressorRegistry = registry; 181 } else { 182 decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY; 183 } 184 return thisT(); 185 } 186 187 @Override compressorRegistry(CompressorRegistry registry)188 public final T compressorRegistry(CompressorRegistry registry) { 189 if (registry != null) { 190 compressorRegistry = registry; 191 } else { 192 compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; 193 } 194 return thisT(); 195 } 196 197 @Override handshakeTimeout(long timeout, TimeUnit unit)198 public final T handshakeTimeout(long timeout, TimeUnit unit) { 199 checkArgument(timeout > 0, "handshake timeout is %s, but must be positive", timeout); 200 handshakeTimeoutMillis = unit.toMillis(timeout); 201 return thisT(); 202 } 203 204 @Override setBinaryLog(BinaryLog binaryLog)205 public final T setBinaryLog(BinaryLog binaryLog) { 206 this.binlog = binaryLog; 207 return thisT(); 208 } 209 210 /** 211 * Override the default stats implementation. 212 */ 213 @VisibleForTesting overrideCensusStatsModule(CensusStatsModule censusStats)214 protected T overrideCensusStatsModule(CensusStatsModule censusStats) { 215 this.censusStatsOverride = censusStats; 216 return thisT(); 217 } 218 219 /** 220 * Disable or enable stats features. Enabled by default. 221 */ setStatsEnabled(boolean value)222 protected void setStatsEnabled(boolean value) { 223 statsEnabled = value; 224 } 225 226 /** 227 * Disable or enable stats recording for RPC upstarts. Effective only if {@link 228 * #setStatsEnabled} is set to true. Enabled by default. 229 */ setStatsRecordStartedRpcs(boolean value)230 protected void setStatsRecordStartedRpcs(boolean value) { 231 recordStartedRpcs = value; 232 } 233 234 /** 235 * Disable or enable stats recording for RPC completions. Effective only if {@link 236 * #setStatsEnabled} is set to true. Enabled by default. 237 */ setStatsRecordFinishedRpcs(boolean value)238 protected void setStatsRecordFinishedRpcs(boolean value) { 239 recordFinishedRpcs = value; 240 } 241 242 /** 243 * Disable or enable tracing features. Enabled by default. 244 */ setTracingEnabled(boolean value)245 protected void setTracingEnabled(boolean value) { 246 tracingEnabled = value; 247 } 248 249 @Override build()250 public Server build() { 251 ServerImpl server = new ServerImpl( 252 this, 253 buildTransportServer(Collections.unmodifiableList(getTracerFactories())), 254 Context.ROOT); 255 for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) { 256 notifyTarget.notifyOnBuild(server); 257 } 258 return server; 259 } 260 261 @VisibleForTesting getTracerFactories()262 final List<ServerStreamTracer.Factory> getTracerFactories() { 263 ArrayList<ServerStreamTracer.Factory> tracerFactories = 264 new ArrayList<ServerStreamTracer.Factory>(); 265 if (statsEnabled) { 266 CensusStatsModule censusStats = this.censusStatsOverride; 267 if (censusStats == null) { 268 censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); 269 } 270 tracerFactories.add( 271 censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs)); 272 } 273 if (tracingEnabled) { 274 CensusTracingModule censusTracing = 275 new CensusTracingModule(Tracing.getTracer(), 276 Tracing.getPropagationComponent().getBinaryFormat()); 277 tracerFactories.add(censusTracing.getServerTracerFactory()); 278 } 279 tracerFactories.addAll(streamTracerFactories); 280 return tracerFactories; 281 } 282 283 /** 284 * Children of AbstractServerBuilder should override this method to provide transport specific 285 * information for the server. This method is mean for Transport implementors and should not be 286 * used by normal users. 287 * 288 * @param streamTracerFactories an immutable list of stream tracer factories 289 */ 290 @Internal buildTransportServer( List<ServerStreamTracer.Factory> streamTracerFactories)291 protected abstract io.grpc.internal.InternalServer buildTransportServer( 292 List<ServerStreamTracer.Factory> streamTracerFactories); 293 thisT()294 private T thisT() { 295 @SuppressWarnings("unchecked") 296 T thisT = (T) this; 297 return thisT; 298 } 299 } 300