1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/surface/legacy_channel.h"
20
21 #include <grpc/event_engine/event_engine.h>
22 #include <grpc/grpc.h>
23 #include <grpc/impl/connectivity_state.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/port_platform.h>
26
27 #include "absl/base/thread_annotations.h"
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/status.h"
31 #include "absl/types/optional.h"
32 #include "src/core/channelz/channelz.h"
33 #include "src/core/client_channel/client_channel_filter.h"
34 #include "src/core/config/core_configuration.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/channel_fwd.h"
37 #include "src/core/lib/channel/channel_stack.h"
38 #include "src/core/lib/channel/channel_stack_builder_impl.h"
39 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
40 #include "src/core/lib/iomgr/closure.h"
41 #include "src/core/lib/iomgr/error.h"
42 #include "src/core/lib/iomgr/exec_ctx.h"
43 #include "src/core/lib/resource_quota/resource_quota.h"
44 #include "src/core/lib/surface/call.h"
45 #include "src/core/lib/surface/channel.h"
46 #include "src/core/lib/surface/channel_init.h"
47 #include "src/core/lib/surface/channel_stack_type.h"
48 #include "src/core/lib/surface/completion_queue.h"
49 #include "src/core/lib/surface/init_internally.h"
50 #include "src/core/lib/surface/lame_client.h"
51 #include "src/core/lib/transport/transport.h"
52 #include "src/core/telemetry/metrics.h"
53 #include "src/core/telemetry/stats.h"
54 #include "src/core/telemetry/stats_data.h"
55 #include "src/core/util/crash.h"
56 #include "src/core/util/dual_ref_counted.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/sync.h"
59 #include "src/core/util/time.h"
60
61 namespace grpc_core {
62
Create(std::string target,ChannelArgs args,grpc_channel_stack_type channel_stack_type)63 absl::StatusOr<RefCountedPtr<Channel>> LegacyChannel::Create(
64 std::string target, ChannelArgs args,
65 grpc_channel_stack_type channel_stack_type) {
66 if (grpc_channel_stack_type_is_client(channel_stack_type)) {
67 auto channel_args_mutator =
68 grpc_channel_args_get_client_channel_creation_mutator();
69 if (channel_args_mutator != nullptr) {
70 args = channel_args_mutator(target.c_str(), args, channel_stack_type);
71 }
72 }
73 ChannelStackBuilderImpl builder(
74 grpc_channel_stack_type_string(channel_stack_type), channel_stack_type,
75 args);
76 builder.SetTarget(target.c_str());
77 if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
78 return nullptr;
79 }
80 // Only need to update stats for server channels here. Stats for client
81 // channels are handled in our base class.
82 if (builder.channel_stack_type() == GRPC_SERVER_CHANNEL) {
83 global_stats().IncrementServerChannelsCreated();
84 }
85 absl::StatusOr<RefCountedPtr<grpc_channel_stack>> r = builder.Build();
86 if (!r.ok()) {
87 auto status = r.status();
88 LOG(ERROR) << "channel stack builder failed: " << status;
89 return status;
90 }
91 if (channel_stack_type == GRPC_SERVER_CHANNEL) {
92 *(*r)->stats_plugin_group =
93 GlobalStatsPluginRegistry::GetStatsPluginsForServer(args);
94 // Add per-server stats plugins.
95 auto* stats_plugin_list = args.GetPointer<
96 std::shared_ptr<std::vector<std::shared_ptr<StatsPlugin>>>>(
97 GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
98 if (stats_plugin_list != nullptr) {
99 for (const auto& plugin : **stats_plugin_list) {
100 (*r)->stats_plugin_group->AddStatsPlugin(
101 plugin, plugin->GetServerScopeConfig(args));
102 }
103 }
104 } else {
105 std::string authority = args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY)
106 .value_or(CoreConfiguration::Get()
107 .resolver_registry()
108 .GetDefaultAuthority(target));
109 grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config(
110 args);
111 experimental::StatsPluginChannelScope scope(target, authority,
112 endpoint_config);
113 *(*r)->stats_plugin_group =
114 GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
115 // Add per-channel stats plugins.
116 auto* stats_plugin_list = args.GetPointer<
117 std::shared_ptr<std::vector<std::shared_ptr<StatsPlugin>>>>(
118 GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
119 if (stats_plugin_list != nullptr) {
120 for (const auto& plugin : **stats_plugin_list) {
121 (*r)->stats_plugin_group->AddStatsPlugin(
122 plugin, plugin->GetChannelScopeConfig(scope));
123 }
124 }
125 }
126 return MakeRefCounted<LegacyChannel>(
127 grpc_channel_stack_type_is_client(builder.channel_stack_type()),
128 std::move(target), args, std::move(*r));
129 }
130
LegacyChannel(bool is_client,std::string target,const ChannelArgs & channel_args,RefCountedPtr<grpc_channel_stack> channel_stack)131 LegacyChannel::LegacyChannel(bool is_client, std::string target,
132 const ChannelArgs& channel_args,
133 RefCountedPtr<grpc_channel_stack> channel_stack)
134 : Channel(std::move(target), channel_args),
135 is_client_(is_client),
136 channel_stack_(std::move(channel_stack)) {
137 // We need to make sure that grpc_shutdown() does not shut things down
138 // until after the channel is destroyed. However, the channel may not
139 // actually be destroyed by the time grpc_channel_destroy() returns,
140 // since there may be other existing refs to the channel. If those
141 // refs are held by things that are visible to the wrapped language
142 // (such as outstanding calls on the channel), then the wrapped
143 // language can be responsible for making sure that grpc_shutdown()
144 // does not run until after those refs are released. However, the
145 // channel may also have refs to itself held internally for various
146 // things that need to be cleaned up at channel destruction (e.g.,
147 // LB policies, subchannels, etc), and because these refs are not
148 // visible to the wrapped language, it cannot be responsible for
149 // deferring grpc_shutdown() until after they are released. To
150 // accommodate that, we call grpc_init() here and then call
151 // grpc_shutdown() when the channel is actually destroyed, thus
152 // ensuring that shutdown is deferred until that point.
153 InitInternally();
154 RefCountedPtr<channelz::ChannelNode> node;
155 if (channelz_node() != nullptr) {
156 node = channelz_node()->RefAsSubclass<channelz::ChannelNode>();
157 }
158 *channel_stack_->on_destroy = [node = std::move(node)]() {
159 if (node != nullptr) {
160 node->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
161 grpc_slice_from_static_string("Channel destroyed"));
162 }
163 ShutdownInternally();
164 };
165 }
166
Orphaned()167 void LegacyChannel::Orphaned() {
168 grpc_transport_op* op = grpc_make_transport_op(nullptr);
169 op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
170 grpc_channel_element* elem =
171 grpc_channel_stack_element(channel_stack_.get(), 0);
172 elem->filter->start_transport_op(elem, op);
173 }
174
IsLame() const175 bool LegacyChannel::IsLame() const {
176 grpc_channel_element* elem =
177 grpc_channel_stack_last_element(channel_stack_.get());
178 return elem->filter == &LameClientFilter::kFilter;
179 }
180
CreateCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,grpc_pollset_set * pollset_set_alternative,Slice path,absl::optional<Slice> authority,Timestamp deadline,bool registered_method)181 grpc_call* LegacyChannel::CreateCall(
182 grpc_call* parent_call, uint32_t propagation_mask,
183 grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
184 Slice path, absl::optional<Slice> authority, Timestamp deadline,
185 bool registered_method) {
186 CHECK(is_client_);
187 CHECK(!(cq != nullptr && pollset_set_alternative != nullptr));
188 grpc_call_create_args args;
189 args.channel = RefAsSubclass<LegacyChannel>();
190 args.server = nullptr;
191 args.parent = parent_call;
192 args.propagation_mask = propagation_mask;
193 args.cq = cq;
194 args.pollset_set_alternative = pollset_set_alternative;
195 args.server_transport_data = nullptr;
196 args.path = std::move(path);
197 args.authority = std::move(authority);
198 args.send_deadline = deadline;
199 args.registered_method = registered_method;
200 grpc_call* call;
201 GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
202 return call;
203 }
204
CheckConnectivityState(bool try_to_connect)205 grpc_connectivity_state LegacyChannel::CheckConnectivityState(
206 bool try_to_connect) {
207 // Forward through to the underlying client channel.
208 ClientChannelFilter* client_channel = GetClientChannelFilter();
209 if (GPR_UNLIKELY(client_channel == nullptr)) {
210 if (IsLame()) return GRPC_CHANNEL_TRANSIENT_FAILURE;
211 LOG(ERROR) << "grpc_channel_check_connectivity_state called on something "
212 "that is not a client channel";
213 return GRPC_CHANNEL_SHUTDOWN;
214 }
215 return client_channel->CheckConnectivityState(try_to_connect);
216 }
217
SupportsConnectivityWatcher() const218 bool LegacyChannel::SupportsConnectivityWatcher() const {
219 return GetClientChannelFilter() != nullptr;
220 }
221
222 // A fire-and-forget object to handle external connectivity state watches.
223 class LegacyChannel::StateWatcher final : public DualRefCounted<StateWatcher> {
224 public:
StateWatcher(WeakRefCountedPtr<LegacyChannel> channel,grpc_completion_queue * cq,void * tag,grpc_connectivity_state last_observed_state,Timestamp deadline)225 StateWatcher(WeakRefCountedPtr<LegacyChannel> channel,
226 grpc_completion_queue* cq, void* tag,
227 grpc_connectivity_state last_observed_state, Timestamp deadline)
228 : channel_(std::move(channel)),
229 cq_(cq),
230 tag_(tag),
231 state_(last_observed_state) {
232 CHECK(grpc_cq_begin_op(cq, tag));
233 GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
234 ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
235 if (client_channel == nullptr) {
236 // If the target URI used to create the channel was invalid, channel
237 // stack initialization failed, and that caused us to create a lame
238 // channel. In that case, connectivity state will never change (it
239 // will always be TRANSIENT_FAILURE), so we don't actually start a
240 // watch, but we are hiding that fact from the application.
241 if (channel_->IsLame()) {
242 // A ref is held by the timer callback.
243 StartTimer(deadline);
244 // Ref from object creation needs to be freed here since lame channel
245 // does not have a watcher.
246 Unref();
247 return;
248 }
249 Crash(
250 "grpc_channel_watch_connectivity_state called on something that is "
251 "not a client channel");
252 }
253 // Ref from object creation is held by the watcher callback.
254 auto* watcher_timer_init_state = new WatcherTimerInitState(this, deadline);
255 client_channel->AddExternalConnectivityWatcher(
256 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &state_,
257 &on_complete_, watcher_timer_init_state->closure());
258 }
259
260 private:
261 // A fire-and-forget object used to delay starting the timer until the
262 // ClientChannelFilter actually starts the watch.
263 class WatcherTimerInitState final {
264 public:
WatcherTimerInitState(StateWatcher * state_watcher,Timestamp deadline)265 WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline)
266 : state_watcher_(state_watcher), deadline_(deadline) {
267 GRPC_CLOSURE_INIT(&closure_, WatcherTimerInit, this, nullptr);
268 }
269
closure()270 grpc_closure* closure() { return &closure_; }
271
272 private:
WatcherTimerInit(void * arg,grpc_error_handle)273 static void WatcherTimerInit(void* arg, grpc_error_handle /*error*/) {
274 auto* self = static_cast<WatcherTimerInitState*>(arg);
275 self->state_watcher_->StartTimer(self->deadline_);
276 delete self;
277 }
278
279 StateWatcher* state_watcher_;
280 Timestamp deadline_;
281 grpc_closure closure_;
282 };
283
StartTimer(Timestamp deadline)284 void StartTimer(Timestamp deadline) {
285 const Duration timeout = deadline - Timestamp::Now();
286 MutexLock lock(&mu_);
287 timer_handle_ =
288 channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
289 ApplicationCallbackExecCtx callback_exec_ctx;
290 ExecCtx exec_ctx;
291 self->TimeoutComplete();
292 // StateWatcher deletion might require an active ExecCtx.
293 self.reset();
294 });
295 }
296
TimeoutComplete()297 void TimeoutComplete() {
298 timer_fired_ = true;
299 // If this is a client channel (not a lame channel), cancel the watch.
300 ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
301 if (client_channel != nullptr) {
302 client_channel->CancelExternalConnectivityWatcher(&on_complete_);
303 }
304 }
305
WatchComplete(void * arg,grpc_error_handle error)306 static void WatchComplete(void* arg, grpc_error_handle error) {
307 RefCountedPtr<StateWatcher> self(static_cast<StateWatcher*>(arg));
308 if (GRPC_TRACE_FLAG_ENABLED(op_failure)) {
309 GRPC_LOG_IF_ERROR("watch_completion_error", error);
310 }
311 MutexLock lock(&self->mu_);
312 if (self->timer_handle_.has_value()) {
313 self->channel_->event_engine()->Cancel(*self->timer_handle_);
314 }
315 }
316
317 // Invoked when both strong refs are released.
Orphaned()318 void Orphaned() override {
319 WeakRef().release(); // Take a weak ref until completion is finished.
320 grpc_error_handle error =
321 timer_fired_
322 ? GRPC_ERROR_CREATE("Timed out waiting for connection state change")
323 : absl::OkStatus();
324 grpc_cq_end_op(cq_, tag_, error, FinishedCompletion, this,
325 &completion_storage_);
326 }
327
328 // Called when the completion is returned to the CQ.
FinishedCompletion(void * arg,grpc_cq_completion *)329 static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
330 auto* self = static_cast<StateWatcher*>(arg);
331 self->WeakUnref();
332 }
333
334 WeakRefCountedPtr<LegacyChannel> channel_;
335 grpc_completion_queue* cq_;
336 void* tag_;
337
338 grpc_connectivity_state state_;
339 grpc_cq_completion completion_storage_;
340 grpc_closure on_complete_;
341
342 // timer_handle_ might be accessed in parallel from multiple threads, e.g.
343 // timer callback fired immediately on an EventEngine thread before
344 // RunAfter() returns.
345 Mutex mu_;
346 absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
347 timer_handle_ ABSL_GUARDED_BY(mu_);
348 bool timer_fired_ = false;
349 };
350
WatchConnectivityState(grpc_connectivity_state last_observed_state,Timestamp deadline,grpc_completion_queue * cq,void * tag)351 void LegacyChannel::WatchConnectivityState(
352 grpc_connectivity_state last_observed_state, Timestamp deadline,
353 grpc_completion_queue* cq, void* tag) {
354 new StateWatcher(WeakRefAsSubclass<LegacyChannel>(), cq, tag,
355 last_observed_state, deadline);
356 }
357
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)358 void LegacyChannel::AddConnectivityWatcher(
359 grpc_connectivity_state initial_state,
360 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
361 auto* client_channel = GetClientChannelFilter();
362 CHECK_NE(client_channel, nullptr);
363 client_channel->AddConnectivityWatcher(initial_state, std::move(watcher));
364 }
365
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)366 void LegacyChannel::RemoveConnectivityWatcher(
367 AsyncConnectivityStateWatcherInterface* watcher) {
368 auto* client_channel = GetClientChannelFilter();
369 CHECK_NE(client_channel, nullptr);
370 client_channel->RemoveConnectivityWatcher(watcher);
371 }
372
GetInfo(const grpc_channel_info * channel_info)373 void LegacyChannel::GetInfo(const grpc_channel_info* channel_info) {
374 grpc_channel_element* elem =
375 grpc_channel_stack_element(channel_stack_.get(), 0);
376 elem->filter->get_channel_info(elem, channel_info);
377 }
378
ResetConnectionBackoff()379 void LegacyChannel::ResetConnectionBackoff() {
380 grpc_transport_op* op = grpc_make_transport_op(nullptr);
381 op->reset_connect_backoff = true;
382 grpc_channel_element* elem =
383 grpc_channel_stack_element(channel_stack_.get(), 0);
384 elem->filter->start_transport_op(elem, op);
385 }
386
387 namespace {
388
389 struct ping_result {
390 grpc_closure closure;
391 void* tag;
392 grpc_completion_queue* cq;
393 grpc_cq_completion completion_storage;
394 };
ping_destroy(void * arg,grpc_cq_completion *)395 void ping_destroy(void* arg, grpc_cq_completion* /*storage*/) { gpr_free(arg); }
396
ping_done(void * arg,grpc_error_handle error)397 void ping_done(void* arg, grpc_error_handle error) {
398 ping_result* pr = static_cast<ping_result*>(arg);
399 grpc_cq_end_op(pr->cq, pr->tag, error, ping_destroy, pr,
400 &pr->completion_storage);
401 }
402
403 } // namespace
404
Ping(grpc_completion_queue * cq,void * tag)405 void LegacyChannel::Ping(grpc_completion_queue* cq, void* tag) {
406 ping_result* pr = static_cast<ping_result*>(gpr_malloc(sizeof(*pr)));
407 pr->tag = tag;
408 pr->cq = cq;
409 GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
410 grpc_transport_op* op = grpc_make_transport_op(nullptr);
411 op->send_ping.on_ack = &pr->closure;
412 op->bind_pollset = grpc_cq_pollset(cq);
413 CHECK(grpc_cq_begin_op(cq, tag));
414 grpc_channel_element* top_elem =
415 grpc_channel_stack_element(channel_stack_.get(), 0);
416 top_elem->filter->start_transport_op(top_elem, op);
417 }
418
GetClientChannelFilter() const419 ClientChannelFilter* LegacyChannel::GetClientChannelFilter() const {
420 grpc_channel_element* elem =
421 grpc_channel_stack_last_element(channel_stack_.get());
422 if (elem->filter != &ClientChannelFilter::kFilter) {
423 return nullptr;
424 }
425 return static_cast<ClientChannelFilter*>(elem->channel_data);
426 }
427
428 } // namespace grpc_core
429