1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.MoreObjects; 23 import com.google.common.base.Preconditions; 24 import com.google.common.util.concurrent.ListenableFuture; 25 import com.google.common.util.concurrent.SettableFuture; 26 import io.grpc.Attributes; 27 import io.grpc.CallOptions; 28 import io.grpc.ClientCall; 29 import io.grpc.ClientStreamTracer; 30 import io.grpc.ConnectivityState; 31 import io.grpc.ConnectivityStateInfo; 32 import io.grpc.Context; 33 import io.grpc.EquivalentAddressGroup; 34 import io.grpc.InternalChannelz; 35 import io.grpc.InternalChannelz.ChannelStats; 36 import io.grpc.InternalChannelz.ChannelTrace; 37 import io.grpc.InternalInstrumented; 38 import io.grpc.InternalLogId; 39 import io.grpc.InternalWithLogId; 40 import io.grpc.LoadBalancer; 41 import io.grpc.LoadBalancer.PickResult; 42 import io.grpc.LoadBalancer.PickSubchannelArgs; 43 import io.grpc.LoadBalancer.Subchannel; 44 import io.grpc.LoadBalancer.SubchannelPicker; 45 import io.grpc.ManagedChannel; 46 import io.grpc.Metadata; 47 import io.grpc.MethodDescriptor; 48 import io.grpc.Status; 49 import io.grpc.SynchronizationContext; 50 import io.grpc.internal.ClientCallImpl.ClientStreamProvider; 51 import java.util.Collections; 52 import java.util.List; 53 import java.util.concurrent.CountDownLatch; 54 import java.util.concurrent.Executor; 55 import java.util.concurrent.ScheduledExecutorService; 56 import java.util.concurrent.TimeUnit; 57 import java.util.logging.Level; 58 import java.util.logging.Logger; 59 import javax.annotation.concurrent.ThreadSafe; 60 61 /** 62 * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer} 63 * to its own RPC needs. 64 */ 65 @ThreadSafe 66 final class OobChannel extends ManagedChannel implements InternalInstrumented<ChannelStats> { 67 private static final Logger log = Logger.getLogger(OobChannel.class.getName()); 68 69 private InternalSubchannel subchannel; 70 private AbstractSubchannel subchannelImpl; 71 private SubchannelPicker subchannelPicker; 72 73 private final InternalLogId logId; 74 private final String authority; 75 private final DelayedClientTransport delayedTransport; 76 private final InternalChannelz channelz; 77 private final ObjectPool<? extends Executor> executorPool; 78 private final Executor executor; 79 private final ScheduledExecutorService deadlineCancellationExecutor; 80 private final CountDownLatch terminatedLatch = new CountDownLatch(1); 81 private volatile boolean shutdown; 82 private final CallTracer channelCallsTracer; 83 private final ChannelTracer channelTracer; 84 private final TimeProvider timeProvider; 85 86 private final ClientStreamProvider transportProvider = new ClientStreamProvider() { 87 @Override 88 public ClientStream newStream(MethodDescriptor<?, ?> method, 89 CallOptions callOptions, Metadata headers, Context context) { 90 ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( 91 callOptions, headers, 0, /* isTransparentRetry= */ false); 92 Context origContext = context.attach(); 93 // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't 94 // matter here because OOB communication should be sparse, and it's not on application RPC's 95 // critical path. 96 try { 97 return delayedTransport.newStream(method, headers, callOptions, tracers); 98 } finally { 99 context.detach(origContext); 100 } 101 } 102 }; 103 OobChannel( String authority, ObjectPool<? extends Executor> executorPool, ScheduledExecutorService deadlineCancellationExecutor, SynchronizationContext syncContext, CallTracer callsTracer, ChannelTracer channelTracer, InternalChannelz channelz, TimeProvider timeProvider)104 OobChannel( 105 String authority, ObjectPool<? extends Executor> executorPool, 106 ScheduledExecutorService deadlineCancellationExecutor, SynchronizationContext syncContext, 107 CallTracer callsTracer, ChannelTracer channelTracer, InternalChannelz channelz, 108 TimeProvider timeProvider) { 109 this.authority = checkNotNull(authority, "authority"); 110 this.logId = InternalLogId.allocate(getClass(), authority); 111 this.executorPool = checkNotNull(executorPool, "executorPool"); 112 this.executor = checkNotNull(executorPool.getObject(), "executor"); 113 this.deadlineCancellationExecutor = checkNotNull( 114 deadlineCancellationExecutor, "deadlineCancellationExecutor"); 115 this.delayedTransport = new DelayedClientTransport(executor, syncContext); 116 this.channelz = Preconditions.checkNotNull(channelz); 117 this.delayedTransport.start(new ManagedClientTransport.Listener() { 118 @Override 119 public void transportShutdown(Status s) { 120 // Don't care 121 } 122 123 @Override 124 public void transportTerminated() { 125 subchannelImpl.shutdown(); 126 } 127 128 @Override 129 public void transportReady() { 130 // Don't care 131 } 132 133 @Override 134 public void transportInUse(boolean inUse) { 135 // Don't care 136 } 137 }); 138 this.channelCallsTracer = callsTracer; 139 this.channelTracer = checkNotNull(channelTracer, "channelTracer"); 140 this.timeProvider = checkNotNull(timeProvider, "timeProvider"); 141 } 142 143 // Must be called only once, right after the OobChannel is created. setSubchannel(final InternalSubchannel subchannel)144 void setSubchannel(final InternalSubchannel subchannel) { 145 log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel}); 146 this.subchannel = subchannel; 147 subchannelImpl = new AbstractSubchannel() { 148 @Override 149 public void shutdown() { 150 subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown")); 151 } 152 153 @Override 154 InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() { 155 return subchannel; 156 } 157 158 @Override 159 public void requestConnection() { 160 subchannel.obtainActiveTransport(); 161 } 162 163 @Override 164 public List<EquivalentAddressGroup> getAllAddresses() { 165 return subchannel.getAddressGroups(); 166 } 167 168 @Override 169 public Attributes getAttributes() { 170 return Attributes.EMPTY; 171 } 172 173 @Override 174 public Object getInternalSubchannel() { 175 return subchannel; 176 } 177 }; 178 179 final class OobSubchannelPicker extends SubchannelPicker { 180 final PickResult result = PickResult.withSubchannel(subchannelImpl); 181 182 @Override 183 public PickResult pickSubchannel(PickSubchannelArgs args) { 184 return result; 185 } 186 187 @Override 188 public String toString() { 189 return MoreObjects.toStringHelper(OobSubchannelPicker.class) 190 .add("result", result) 191 .toString(); 192 } 193 } 194 195 subchannelPicker = new OobSubchannelPicker(); 196 delayedTransport.reprocess(subchannelPicker); 197 } 198 updateAddresses(List<EquivalentAddressGroup> eag)199 void updateAddresses(List<EquivalentAddressGroup> eag) { 200 subchannel.updateAddresses(eag); 201 } 202 203 @Override newCall( MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions)204 public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( 205 MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { 206 return new ClientCallImpl<>(methodDescriptor, 207 callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), 208 callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer, null); 209 } 210 211 @Override authority()212 public String authority() { 213 return authority; 214 } 215 216 @Override isTerminated()217 public boolean isTerminated() { 218 return terminatedLatch.getCount() == 0; 219 } 220 221 @Override awaitTermination(long time, TimeUnit unit)222 public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException { 223 return terminatedLatch.await(time, unit); 224 } 225 226 @Override getState(boolean requestConnectionIgnored)227 public ConnectivityState getState(boolean requestConnectionIgnored) { 228 if (subchannel == null) { 229 return ConnectivityState.IDLE; 230 } 231 return subchannel.getState(); 232 } 233 234 @Override shutdown()235 public ManagedChannel shutdown() { 236 shutdown = true; 237 delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called")); 238 return this; 239 } 240 241 @Override isShutdown()242 public boolean isShutdown() { 243 return shutdown; 244 } 245 246 @Override shutdownNow()247 public ManagedChannel shutdownNow() { 248 shutdown = true; 249 delayedTransport.shutdownNow( 250 Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called")); 251 return this; 252 } 253 handleSubchannelStateChange(final ConnectivityStateInfo newState)254 void handleSubchannelStateChange(final ConnectivityStateInfo newState) { 255 channelTracer.reportEvent( 256 new ChannelTrace.Event.Builder() 257 .setDescription("Entering " + newState.getState() + " state") 258 .setSeverity(ChannelTrace.Event.Severity.CT_INFO) 259 .setTimestampNanos(timeProvider.currentTimeNanos()) 260 .build()); 261 switch (newState.getState()) { 262 case READY: 263 case IDLE: 264 delayedTransport.reprocess(subchannelPicker); 265 break; 266 case TRANSIENT_FAILURE: 267 final class OobErrorPicker extends SubchannelPicker { 268 final PickResult errorResult = PickResult.withError(newState.getStatus()); 269 270 @Override 271 public PickResult pickSubchannel(PickSubchannelArgs args) { 272 return errorResult; 273 } 274 275 @Override 276 public String toString() { 277 return MoreObjects.toStringHelper(OobErrorPicker.class) 278 .add("errorResult", errorResult) 279 .toString(); 280 } 281 } 282 283 delayedTransport.reprocess(new OobErrorPicker()); 284 break; 285 default: 286 // Do nothing 287 } 288 } 289 290 // must be run from channel executor handleSubchannelTerminated()291 void handleSubchannelTerminated() { 292 channelz.removeSubchannel(this); 293 // When delayedTransport is terminated, it shuts down subchannel. Therefore, at this point 294 // both delayedTransport and subchannel have terminated. 295 executorPool.returnObject(executor); 296 terminatedLatch.countDown(); 297 } 298 299 @VisibleForTesting getSubchannel()300 Subchannel getSubchannel() { 301 return subchannelImpl; 302 } 303 getInternalSubchannel()304 InternalSubchannel getInternalSubchannel() { 305 return subchannel; 306 } 307 308 @Override getStats()309 public ListenableFuture<ChannelStats> getStats() { 310 final SettableFuture<ChannelStats> ret = SettableFuture.create(); 311 final ChannelStats.Builder builder = new ChannelStats.Builder(); 312 channelCallsTracer.updateBuilder(builder); 313 channelTracer.updateBuilder(builder); 314 builder 315 .setTarget(authority) 316 .setState(subchannel.getState()) 317 .setSubchannels(Collections.<InternalWithLogId>singletonList(subchannel)); 318 ret.set(builder.build()); 319 return ret; 320 } 321 322 @Override getLogId()323 public InternalLogId getLogId() { 324 return logId; 325 } 326 327 @Override toString()328 public String toString() { 329 return MoreObjects.toStringHelper(this) 330 .add("logId", logId.getId()) 331 .add("authority", authority) 332 .toString(); 333 } 334 335 @Override resetConnectBackoff()336 public void resetConnectBackoff() { 337 subchannel.resetConnectBackoff(); 338 } 339 } 340