1 #region Copyright notice and license 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 #endregion 16 17 using System; 18 using System.Collections.Generic; 19 using System.Threading; 20 using System.Threading.Tasks; 21 22 using Grpc.Core.Internal; 23 using Grpc.Core.Logging; 24 using Grpc.Core.Utils; 25 26 namespace Grpc.Core 27 { 28 /// <summary> 29 /// Represents a gRPC channel. Channels are an abstraction of long-lived connections to remote servers. 30 /// More client objects can reuse the same channel. Creating a channel is an expensive operation compared to invoking 31 /// a remote call so in general you should reuse a single channel for as many calls as possible. 32 /// </summary> 33 public class Channel : ChannelBase 34 { 35 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); 36 37 readonly object myLock = new object(); 38 readonly AtomicCounter activeCallCounter = new AtomicCounter(); 39 readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); 40 41 readonly GrpcEnvironment environment; 42 readonly CompletionQueueSafeHandle completionQueue; 43 readonly ChannelSafeHandle handle; 44 readonly Dictionary<string, ChannelOption> options; 45 46 bool shutdownRequested; 47 48 /// <summary> 49 /// Creates a channel that connects to a specific host. 50 /// Port will default to 80 for an unsecure channel and to 443 for a secure channel. 51 /// </summary> 52 /// <param name="target">Target of the channel.</param> 53 /// <param name="credentials">Credentials to secure the channel.</param> Channel(string target, ChannelCredentials credentials)54 public Channel(string target, ChannelCredentials credentials) : 55 this(target, credentials, null) 56 { 57 } 58 59 /// <summary> 60 /// Creates a channel that connects to a specific host. 61 /// Port will default to 80 for an unsecure channel or to 443 for a secure channel. 62 /// </summary> 63 /// <param name="target">Target of the channel.</param> 64 /// <param name="credentials">Credentials to secure the channel.</param> 65 /// <param name="options">Channel options.</param> Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options)66 public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options) : base(target) 67 { 68 this.options = CreateOptionsDictionary(options); 69 EnsureUserAgentChannelOption(this.options); 70 this.environment = GrpcEnvironment.AddRef(); 71 72 this.completionQueue = this.environment.PickCompletionQueue(); 73 using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values)) 74 { 75 var nativeCredentials = credentials.ToNativeCredentials(); 76 if (nativeCredentials != null) 77 { 78 this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs); 79 } 80 else 81 { 82 this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs); 83 } 84 } 85 GrpcEnvironment.RegisterChannel(this); 86 } 87 88 /// <summary> 89 /// Creates a channel that connects to a specific host and port. 90 /// </summary> 91 /// <param name="host">The name or IP address of the host.</param> 92 /// <param name="port">The port.</param> 93 /// <param name="credentials">Credentials to secure the channel.</param> Channel(string host, int port, ChannelCredentials credentials)94 public Channel(string host, int port, ChannelCredentials credentials) : 95 this(host, port, credentials, null) 96 { 97 } 98 99 /// <summary> 100 /// Creates a channel that connects to a specific host and port. 101 /// </summary> 102 /// <param name="host">The name or IP address of the host.</param> 103 /// <param name="port">The port.</param> 104 /// <param name="credentials">Credentials to secure the channel.</param> 105 /// <param name="options">Channel options.</param> Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options)106 public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) : 107 this(string.Format("{0}:{1}", host, port), credentials, options) 108 { 109 } 110 111 /// <summary> 112 /// Gets current connectivity state of this channel. 113 /// After channel has been shutdown, <c>ChannelState.Shutdown</c> will be returned. 114 /// </summary> 115 public ChannelState State 116 { 117 get 118 { 119 return GetConnectivityState(false); 120 } 121 } 122 123 // cached handler for watch connectivity state 124 static readonly BatchCompletionDelegate WatchConnectivityStateHandler = (success, ctx, state) => 125 { 126 var tcs = (TaskCompletionSource<bool>) state; 127 tcs.SetResult(success); 128 }; 129 130 /// <summary> 131 /// Returned tasks completes once channel state has become different from 132 /// given lastObservedState. 133 /// If deadline is reached or an error occurs, returned task is cancelled. 134 /// </summary> WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)135 public async Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) 136 { 137 var result = await TryWaitForStateChangedAsync(lastObservedState, deadline).ConfigureAwait(false); 138 if (!result) 139 { 140 throw new TaskCanceledException("Reached deadline."); 141 } 142 } 143 144 /// <summary> 145 /// Returned tasks completes once channel state has become different from 146 /// given lastObservedState (<c>true</c> is returned) or if the wait has timed out (<c>false</c> is returned). 147 /// </summary> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)148 public Task<bool> TryWaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) 149 { 150 GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown, 151 "Shutdown is a terminal state. No further state changes can occur."); 152 var tcs = new TaskCompletionSource<bool>(); 153 var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; 154 lock (myLock) 155 { 156 if (handle.IsClosed) 157 { 158 // If channel has been already shutdown and handle was disposed, we would end up with 159 // an abandoned completion added to the completion registry. Instead, we make sure we fail early. 160 throw new ObjectDisposedException(nameof(handle), "Channel handle has already been disposed."); 161 } 162 else 163 { 164 // pass "tcs" as "state" for WatchConnectivityStateHandler. 165 handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs); 166 } 167 } 168 return tcs.Task; 169 } 170 171 /// <summary>Resolved address of the remote endpoint in URI format.</summary> 172 public string ResolvedTarget 173 { 174 get 175 { 176 return handle.GetTarget(); 177 } 178 } 179 180 /// <summary> 181 /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked. 182 /// </summary> 183 public CancellationToken ShutdownToken 184 { 185 get 186 { 187 return this.shutdownTokenSource.Token; 188 } 189 } 190 191 /// <summary> 192 /// Allows explicitly requesting channel to connect without starting an RPC. 193 /// Returned task completes once state Ready was seen. If the deadline is reached, 194 /// or channel enters the Shutdown state, the task is cancelled. 195 /// There is no need to call this explicitly unless your use case requires that. 196 /// Starting an RPC on a new channel will request connection implicitly. 197 /// </summary> 198 /// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param> 199 public async Task ConnectAsync(DateTime? deadline = null) 200 { 201 var currentState = GetConnectivityState(true); 202 while (currentState != ChannelState.Ready) 203 { 204 if (currentState == ChannelState.Shutdown) 205 { 206 throw new OperationCanceledException("Channel has reached Shutdown state."); 207 } 208 await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); 209 currentState = GetConnectivityState(false); 210 } 211 } 212 213 /// <summary>Provides implementation of a non-virtual public member.</summary> ShutdownAsyncCore()214 protected override async Task ShutdownAsyncCore() 215 { 216 lock (myLock) 217 { 218 GrpcPreconditions.CheckState(!shutdownRequested); 219 shutdownRequested = true; 220 } 221 GrpcEnvironment.UnregisterChannel(this); 222 223 shutdownTokenSource.Cancel(); 224 225 var activeCallCount = activeCallCounter.Count; 226 if (activeCallCount > 0) 227 { 228 Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount); 229 } 230 231 lock (myLock) 232 { 233 handle.Dispose(); 234 } 235 236 await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); 237 } 238 239 /// <summary> 240 /// Create a new <see cref="CallInvoker"/> for the channel. 241 /// </summary> 242 /// <returns>A new <see cref="CallInvoker"/>.</returns> CreateCallInvoker()243 public override CallInvoker CreateCallInvoker() 244 { 245 return new DefaultCallInvoker(this); 246 } 247 248 internal ChannelSafeHandle Handle 249 { 250 get 251 { 252 return this.handle; 253 } 254 } 255 256 internal GrpcEnvironment Environment 257 { 258 get 259 { 260 return this.environment; 261 } 262 } 263 264 internal CompletionQueueSafeHandle CompletionQueue 265 { 266 get 267 { 268 return this.completionQueue; 269 } 270 } 271 AddCallReference(object call)272 internal void AddCallReference(object call) 273 { 274 activeCallCounter.Increment(); 275 276 bool success = false; 277 handle.DangerousAddRef(ref success); 278 GrpcPreconditions.CheckState(success); 279 } 280 RemoveCallReference(object call)281 internal void RemoveCallReference(object call) 282 { 283 handle.DangerousRelease(); 284 285 activeCallCounter.Decrement(); 286 } 287 288 // for testing only GetCallReferenceCount()289 internal long GetCallReferenceCount() 290 { 291 return activeCallCounter.Count; 292 } 293 GetConnectivityState(bool tryToConnect)294 private ChannelState GetConnectivityState(bool tryToConnect) 295 { 296 try 297 { 298 lock (myLock) 299 { 300 return handle.CheckConnectivityState(tryToConnect); 301 } 302 } 303 catch (ObjectDisposedException) 304 { 305 return ChannelState.Shutdown; 306 } 307 } 308 EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)309 private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options) 310 { 311 var key = ChannelOptions.PrimaryUserAgentString; 312 var userAgentString = ""; 313 314 ChannelOption option; 315 if (options.TryGetValue(key, out option)) 316 { 317 // user-provided userAgentString needs to be at the beginning 318 userAgentString = option.StringValue + " "; 319 }; 320 321 // TODO(jtattermusch): it would be useful to also provide .NET/mono version. 322 userAgentString += string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion); 323 324 options[ChannelOptions.PrimaryUserAgentString] = new ChannelOption(key, userAgentString); 325 } 326 CreateOptionsDictionary(IEnumerable<ChannelOption> options)327 private static Dictionary<string, ChannelOption> CreateOptionsDictionary(IEnumerable<ChannelOption> options) 328 { 329 var dict = new Dictionary<string, ChannelOption>(); 330 if (options == null) 331 { 332 return dict; 333 } 334 foreach (var option in options) 335 { 336 dict.Add(option.Name, option); 337 } 338 return dict; 339 } 340 } 341 } 342