• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import io.grpc.BinaryLog;
25 import io.grpc.BindableService;
26 import io.grpc.CompressorRegistry;
27 import io.grpc.Context;
28 import io.grpc.DecompressorRegistry;
29 import io.grpc.HandlerRegistry;
30 import io.grpc.Internal;
31 import io.grpc.InternalChannelz;
32 import io.grpc.InternalNotifyOnServerBuild;
33 import io.grpc.Server;
34 import io.grpc.ServerBuilder;
35 import io.grpc.ServerInterceptor;
36 import io.grpc.ServerMethodDefinition;
37 import io.grpc.ServerServiceDefinition;
38 import io.grpc.ServerStreamTracer;
39 import io.grpc.ServerTransportFilter;
40 import io.opencensus.trace.Tracing;
41 import java.util.ArrayList;
42 import java.util.Collections;
43 import java.util.List;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.TimeUnit;
46 import javax.annotation.Nullable;
47 
48 /**
49  * The base class for server builders.
50  *
51  * @param <T> The concrete type for this builder.
52  */
53 public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuilder<T>>
54         extends ServerBuilder<T> {
55 
forPort(int port)56   public static ServerBuilder<?> forPort(int port) {
57     throw new UnsupportedOperationException("Subclass failed to hide static factory");
58   }
59 
60   private static final ObjectPool<? extends Executor> DEFAULT_EXECUTOR_POOL =
61       SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
62   private static final HandlerRegistry DEFAULT_FALLBACK_REGISTRY = new HandlerRegistry() {
63       @Override
64       public List<ServerServiceDefinition> getServices() {
65         return Collections.emptyList();
66       }
67 
68       @Override
69       @Nullable
70       public ServerMethodDefinition<?, ?> lookupMethod(
71           String methodName, @Nullable String authority) {
72         return null;
73       }
74     };
75   private static final DecompressorRegistry DEFAULT_DECOMPRESSOR_REGISTRY =
76       DecompressorRegistry.getDefaultInstance();
77   private static final CompressorRegistry DEFAULT_COMPRESSOR_REGISTRY =
78       CompressorRegistry.getDefaultInstance();
79   private static final long DEFAULT_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(120);
80 
81   final InternalHandlerRegistry.Builder registryBuilder =
82       new InternalHandlerRegistry.Builder();
83 
84   final List<ServerTransportFilter> transportFilters =
85       new ArrayList<>();
86 
87   final List<ServerInterceptor> interceptors = new ArrayList<>();
88 
89   private final List<InternalNotifyOnServerBuild> notifyOnBuildList =
90       new ArrayList<>();
91 
92   private final List<ServerStreamTracer.Factory> streamTracerFactories =
93       new ArrayList<ServerStreamTracer.Factory>();
94 
95   HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
96 
97   ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
98 
99   DecompressorRegistry decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
100 
101   CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
102 
103   long handshakeTimeoutMillis = DEFAULT_HANDSHAKE_TIMEOUT_MILLIS;
104 
105   @Nullable
106   private CensusStatsModule censusStatsOverride;
107 
108   private boolean statsEnabled = true;
109   private boolean recordStartedRpcs = true;
110   private boolean recordFinishedRpcs = true;
111   private boolean tracingEnabled = true;
112 
113   @Nullable
114   protected BinaryLog binlog;
115   protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
116 
117   protected InternalChannelz channelz = InternalChannelz.instance();
118   protected CallTracer.Factory callTracerFactory = CallTracer.getDefaultFactory();
119 
120   @Override
directExecutor()121   public final T directExecutor() {
122     return executor(MoreExecutors.directExecutor());
123   }
124 
125   @Override
executor(@ullable Executor executor)126   public final T executor(@Nullable Executor executor) {
127     if (executor != null) {
128       this.executorPool = new FixedObjectPool<Executor>(executor);
129     } else {
130       this.executorPool = DEFAULT_EXECUTOR_POOL;
131     }
132     return thisT();
133   }
134 
135   @Override
addService(ServerServiceDefinition service)136   public final T addService(ServerServiceDefinition service) {
137     registryBuilder.addService(service);
138     return thisT();
139   }
140 
141   @Override
addService(BindableService bindableService)142   public final T addService(BindableService bindableService) {
143     if (bindableService instanceof InternalNotifyOnServerBuild) {
144       notifyOnBuildList.add((InternalNotifyOnServerBuild) bindableService);
145     }
146     return addService(bindableService.bindService());
147   }
148 
149   @Override
addTransportFilter(ServerTransportFilter filter)150   public final T addTransportFilter(ServerTransportFilter filter) {
151     transportFilters.add(checkNotNull(filter, "filter"));
152     return thisT();
153   }
154 
155   @Override
intercept(ServerInterceptor interceptor)156   public final T intercept(ServerInterceptor interceptor) {
157     interceptors.add(interceptor);
158     return thisT();
159   }
160 
161   @Override
addStreamTracerFactory(ServerStreamTracer.Factory factory)162   public final T addStreamTracerFactory(ServerStreamTracer.Factory factory) {
163     streamTracerFactories.add(checkNotNull(factory, "factory"));
164     return thisT();
165   }
166 
167   @Override
fallbackHandlerRegistry(HandlerRegistry registry)168   public final T fallbackHandlerRegistry(HandlerRegistry registry) {
169     if (registry != null) {
170       this.fallbackRegistry = registry;
171     } else {
172       this.fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
173     }
174     return thisT();
175   }
176 
177   @Override
decompressorRegistry(DecompressorRegistry registry)178   public final T decompressorRegistry(DecompressorRegistry registry) {
179     if (registry != null) {
180       decompressorRegistry = registry;
181     } else {
182       decompressorRegistry = DEFAULT_DECOMPRESSOR_REGISTRY;
183     }
184     return thisT();
185   }
186 
187   @Override
compressorRegistry(CompressorRegistry registry)188   public final T compressorRegistry(CompressorRegistry registry) {
189     if (registry != null) {
190       compressorRegistry = registry;
191     } else {
192       compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY;
193     }
194     return thisT();
195   }
196 
197   @Override
handshakeTimeout(long timeout, TimeUnit unit)198   public final T handshakeTimeout(long timeout, TimeUnit unit) {
199     checkArgument(timeout > 0, "handshake timeout is %s, but must be positive", timeout);
200     handshakeTimeoutMillis = unit.toMillis(timeout);
201     return thisT();
202   }
203 
204   @Override
setBinaryLog(BinaryLog binaryLog)205   public final T setBinaryLog(BinaryLog binaryLog) {
206     this.binlog = binaryLog;
207     return thisT();
208   }
209 
210   /**
211    * Override the default stats implementation.
212    */
213   @VisibleForTesting
overrideCensusStatsModule(CensusStatsModule censusStats)214   protected T overrideCensusStatsModule(CensusStatsModule censusStats) {
215     this.censusStatsOverride = censusStats;
216     return thisT();
217   }
218 
219   /**
220    * Disable or enable stats features.  Enabled by default.
221    */
setStatsEnabled(boolean value)222   protected void setStatsEnabled(boolean value) {
223     statsEnabled = value;
224   }
225 
226   /**
227    * Disable or enable stats recording for RPC upstarts.  Effective only if {@link
228    * #setStatsEnabled} is set to true.  Enabled by default.
229    */
setStatsRecordStartedRpcs(boolean value)230   protected void setStatsRecordStartedRpcs(boolean value) {
231     recordStartedRpcs = value;
232   }
233 
234   /**
235    * Disable or enable stats recording for RPC completions.  Effective only if {@link
236    * #setStatsEnabled} is set to true.  Enabled by default.
237    */
setStatsRecordFinishedRpcs(boolean value)238   protected void setStatsRecordFinishedRpcs(boolean value) {
239     recordFinishedRpcs = value;
240   }
241 
242   /**
243    * Disable or enable tracing features.  Enabled by default.
244    */
setTracingEnabled(boolean value)245   protected void setTracingEnabled(boolean value) {
246     tracingEnabled = value;
247   }
248 
249   @Override
build()250   public Server build() {
251     ServerImpl server = new ServerImpl(
252         this,
253         buildTransportServer(Collections.unmodifiableList(getTracerFactories())),
254         Context.ROOT);
255     for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
256       notifyTarget.notifyOnBuild(server);
257     }
258     return server;
259   }
260 
261   @VisibleForTesting
getTracerFactories()262   final List<ServerStreamTracer.Factory> getTracerFactories() {
263     ArrayList<ServerStreamTracer.Factory> tracerFactories =
264         new ArrayList<ServerStreamTracer.Factory>();
265     if (statsEnabled) {
266       CensusStatsModule censusStats = this.censusStatsOverride;
267       if (censusStats == null) {
268         censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
269       }
270       tracerFactories.add(
271           censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs));
272     }
273     if (tracingEnabled) {
274       CensusTracingModule censusTracing =
275           new CensusTracingModule(Tracing.getTracer(),
276               Tracing.getPropagationComponent().getBinaryFormat());
277       tracerFactories.add(censusTracing.getServerTracerFactory());
278     }
279     tracerFactories.addAll(streamTracerFactories);
280     return tracerFactories;
281   }
282 
283   /**
284    * Children of AbstractServerBuilder should override this method to provide transport specific
285    * information for the server.  This method is mean for Transport implementors and should not be
286    * used by normal users.
287    *
288    * @param streamTracerFactories an immutable list of stream tracer factories
289    */
290   @Internal
buildTransportServer( List<ServerStreamTracer.Factory> streamTracerFactories)291   protected abstract io.grpc.internal.InternalServer buildTransportServer(
292       List<ServerStreamTracer.Factory> streamTracerFactories);
293 
thisT()294   private T thisT() {
295     @SuppressWarnings("unchecked")
296     T thisT = (T) this;
297     return thisT;
298   }
299 }
300