• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/surface/legacy_channel.h"
20 
21 #include <grpc/event_engine/event_engine.h>
22 #include <grpc/grpc.h>
23 #include <grpc/impl/connectivity_state.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/port_platform.h>
26 
27 #include "absl/base/thread_annotations.h"
28 #include "absl/log/check.h"
29 #include "absl/log/log.h"
30 #include "absl/status/status.h"
31 #include "absl/types/optional.h"
32 #include "src/core/channelz/channelz.h"
33 #include "src/core/client_channel/client_channel_filter.h"
34 #include "src/core/config/core_configuration.h"
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/channel_fwd.h"
37 #include "src/core/lib/channel/channel_stack.h"
38 #include "src/core/lib/channel/channel_stack_builder_impl.h"
39 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
40 #include "src/core/lib/iomgr/closure.h"
41 #include "src/core/lib/iomgr/error.h"
42 #include "src/core/lib/iomgr/exec_ctx.h"
43 #include "src/core/lib/resource_quota/resource_quota.h"
44 #include "src/core/lib/surface/call.h"
45 #include "src/core/lib/surface/channel.h"
46 #include "src/core/lib/surface/channel_init.h"
47 #include "src/core/lib/surface/channel_stack_type.h"
48 #include "src/core/lib/surface/completion_queue.h"
49 #include "src/core/lib/surface/init_internally.h"
50 #include "src/core/lib/surface/lame_client.h"
51 #include "src/core/lib/transport/transport.h"
52 #include "src/core/telemetry/metrics.h"
53 #include "src/core/telemetry/stats.h"
54 #include "src/core/telemetry/stats_data.h"
55 #include "src/core/util/crash.h"
56 #include "src/core/util/dual_ref_counted.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/sync.h"
59 #include "src/core/util/time.h"
60 
61 namespace grpc_core {
62 
Create(std::string target,ChannelArgs args,grpc_channel_stack_type channel_stack_type)63 absl::StatusOr<RefCountedPtr<Channel>> LegacyChannel::Create(
64     std::string target, ChannelArgs args,
65     grpc_channel_stack_type channel_stack_type) {
66   if (grpc_channel_stack_type_is_client(channel_stack_type)) {
67     auto channel_args_mutator =
68         grpc_channel_args_get_client_channel_creation_mutator();
69     if (channel_args_mutator != nullptr) {
70       args = channel_args_mutator(target.c_str(), args, channel_stack_type);
71     }
72   }
73   ChannelStackBuilderImpl builder(
74       grpc_channel_stack_type_string(channel_stack_type), channel_stack_type,
75       args);
76   builder.SetTarget(target.c_str());
77   if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
78     return nullptr;
79   }
80   // Only need to update stats for server channels here.  Stats for client
81   // channels are handled in our base class.
82   if (builder.channel_stack_type() == GRPC_SERVER_CHANNEL) {
83     global_stats().IncrementServerChannelsCreated();
84   }
85   absl::StatusOr<RefCountedPtr<grpc_channel_stack>> r = builder.Build();
86   if (!r.ok()) {
87     auto status = r.status();
88     LOG(ERROR) << "channel stack builder failed: " << status;
89     return status;
90   }
91   if (channel_stack_type == GRPC_SERVER_CHANNEL) {
92     *(*r)->stats_plugin_group =
93         GlobalStatsPluginRegistry::GetStatsPluginsForServer(args);
94     // Add per-server stats plugins.
95     auto* stats_plugin_list = args.GetPointer<
96         std::shared_ptr<std::vector<std::shared_ptr<StatsPlugin>>>>(
97         GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
98     if (stats_plugin_list != nullptr) {
99       for (const auto& plugin : **stats_plugin_list) {
100         (*r)->stats_plugin_group->AddStatsPlugin(
101             plugin, plugin->GetServerScopeConfig(args));
102       }
103     }
104   } else {
105     std::string authority = args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY)
106                                 .value_or(CoreConfiguration::Get()
107                                               .resolver_registry()
108                                               .GetDefaultAuthority(target));
109     grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config(
110         args);
111     experimental::StatsPluginChannelScope scope(target, authority,
112                                                 endpoint_config);
113     *(*r)->stats_plugin_group =
114         GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
115     // Add per-channel stats plugins.
116     auto* stats_plugin_list = args.GetPointer<
117         std::shared_ptr<std::vector<std::shared_ptr<StatsPlugin>>>>(
118         GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
119     if (stats_plugin_list != nullptr) {
120       for (const auto& plugin : **stats_plugin_list) {
121         (*r)->stats_plugin_group->AddStatsPlugin(
122             plugin, plugin->GetChannelScopeConfig(scope));
123       }
124     }
125   }
126   return MakeRefCounted<LegacyChannel>(
127       grpc_channel_stack_type_is_client(builder.channel_stack_type()),
128       std::move(target), args, std::move(*r));
129 }
130 
LegacyChannel(bool is_client,std::string target,const ChannelArgs & channel_args,RefCountedPtr<grpc_channel_stack> channel_stack)131 LegacyChannel::LegacyChannel(bool is_client, std::string target,
132                              const ChannelArgs& channel_args,
133                              RefCountedPtr<grpc_channel_stack> channel_stack)
134     : Channel(std::move(target), channel_args),
135       is_client_(is_client),
136       channel_stack_(std::move(channel_stack)) {
137   // We need to make sure that grpc_shutdown() does not shut things down
138   // until after the channel is destroyed.  However, the channel may not
139   // actually be destroyed by the time grpc_channel_destroy() returns,
140   // since there may be other existing refs to the channel.  If those
141   // refs are held by things that are visible to the wrapped language
142   // (such as outstanding calls on the channel), then the wrapped
143   // language can be responsible for making sure that grpc_shutdown()
144   // does not run until after those refs are released.  However, the
145   // channel may also have refs to itself held internally for various
146   // things that need to be cleaned up at channel destruction (e.g.,
147   // LB policies, subchannels, etc), and because these refs are not
148   // visible to the wrapped language, it cannot be responsible for
149   // deferring grpc_shutdown() until after they are released.  To
150   // accommodate that, we call grpc_init() here and then call
151   // grpc_shutdown() when the channel is actually destroyed, thus
152   // ensuring that shutdown is deferred until that point.
153   InitInternally();
154   RefCountedPtr<channelz::ChannelNode> node;
155   if (channelz_node() != nullptr) {
156     node = channelz_node()->RefAsSubclass<channelz::ChannelNode>();
157   }
158   *channel_stack_->on_destroy = [node = std::move(node)]() {
159     if (node != nullptr) {
160       node->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
161                           grpc_slice_from_static_string("Channel destroyed"));
162     }
163     ShutdownInternally();
164   };
165 }
166 
Orphaned()167 void LegacyChannel::Orphaned() {
168   grpc_transport_op* op = grpc_make_transport_op(nullptr);
169   op->disconnect_with_error = GRPC_ERROR_CREATE("Channel Destroyed");
170   grpc_channel_element* elem =
171       grpc_channel_stack_element(channel_stack_.get(), 0);
172   elem->filter->start_transport_op(elem, op);
173 }
174 
IsLame() const175 bool LegacyChannel::IsLame() const {
176   grpc_channel_element* elem =
177       grpc_channel_stack_last_element(channel_stack_.get());
178   return elem->filter == &LameClientFilter::kFilter;
179 }
180 
CreateCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,grpc_pollset_set * pollset_set_alternative,Slice path,absl::optional<Slice> authority,Timestamp deadline,bool registered_method)181 grpc_call* LegacyChannel::CreateCall(
182     grpc_call* parent_call, uint32_t propagation_mask,
183     grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
184     Slice path, absl::optional<Slice> authority, Timestamp deadline,
185     bool registered_method) {
186   CHECK(is_client_);
187   CHECK(!(cq != nullptr && pollset_set_alternative != nullptr));
188   grpc_call_create_args args;
189   args.channel = RefAsSubclass<LegacyChannel>();
190   args.server = nullptr;
191   args.parent = parent_call;
192   args.propagation_mask = propagation_mask;
193   args.cq = cq;
194   args.pollset_set_alternative = pollset_set_alternative;
195   args.server_transport_data = nullptr;
196   args.path = std::move(path);
197   args.authority = std::move(authority);
198   args.send_deadline = deadline;
199   args.registered_method = registered_method;
200   grpc_call* call;
201   GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
202   return call;
203 }
204 
CheckConnectivityState(bool try_to_connect)205 grpc_connectivity_state LegacyChannel::CheckConnectivityState(
206     bool try_to_connect) {
207   // Forward through to the underlying client channel.
208   ClientChannelFilter* client_channel = GetClientChannelFilter();
209   if (GPR_UNLIKELY(client_channel == nullptr)) {
210     if (IsLame()) return GRPC_CHANNEL_TRANSIENT_FAILURE;
211     LOG(ERROR) << "grpc_channel_check_connectivity_state called on something "
212                   "that is not a client channel";
213     return GRPC_CHANNEL_SHUTDOWN;
214   }
215   return client_channel->CheckConnectivityState(try_to_connect);
216 }
217 
SupportsConnectivityWatcher() const218 bool LegacyChannel::SupportsConnectivityWatcher() const {
219   return GetClientChannelFilter() != nullptr;
220 }
221 
222 // A fire-and-forget object to handle external connectivity state watches.
223 class LegacyChannel::StateWatcher final : public DualRefCounted<StateWatcher> {
224  public:
StateWatcher(WeakRefCountedPtr<LegacyChannel> channel,grpc_completion_queue * cq,void * tag,grpc_connectivity_state last_observed_state,Timestamp deadline)225   StateWatcher(WeakRefCountedPtr<LegacyChannel> channel,
226                grpc_completion_queue* cq, void* tag,
227                grpc_connectivity_state last_observed_state, Timestamp deadline)
228       : channel_(std::move(channel)),
229         cq_(cq),
230         tag_(tag),
231         state_(last_observed_state) {
232     CHECK(grpc_cq_begin_op(cq, tag));
233     GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
234     ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
235     if (client_channel == nullptr) {
236       // If the target URI used to create the channel was invalid, channel
237       // stack initialization failed, and that caused us to create a lame
238       // channel.  In that case, connectivity state will never change (it
239       // will always be TRANSIENT_FAILURE), so we don't actually start a
240       // watch, but we are hiding that fact from the application.
241       if (channel_->IsLame()) {
242         // A ref is held by the timer callback.
243         StartTimer(deadline);
244         // Ref from object creation needs to be freed here since lame channel
245         // does not have a watcher.
246         Unref();
247         return;
248       }
249       Crash(
250           "grpc_channel_watch_connectivity_state called on something that is "
251           "not a client channel");
252     }
253     // Ref from object creation is held by the watcher callback.
254     auto* watcher_timer_init_state = new WatcherTimerInitState(this, deadline);
255     client_channel->AddExternalConnectivityWatcher(
256         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &state_,
257         &on_complete_, watcher_timer_init_state->closure());
258   }
259 
260  private:
261   // A fire-and-forget object used to delay starting the timer until the
262   // ClientChannelFilter actually starts the watch.
263   class WatcherTimerInitState final {
264    public:
WatcherTimerInitState(StateWatcher * state_watcher,Timestamp deadline)265     WatcherTimerInitState(StateWatcher* state_watcher, Timestamp deadline)
266         : state_watcher_(state_watcher), deadline_(deadline) {
267       GRPC_CLOSURE_INIT(&closure_, WatcherTimerInit, this, nullptr);
268     }
269 
closure()270     grpc_closure* closure() { return &closure_; }
271 
272    private:
WatcherTimerInit(void * arg,grpc_error_handle)273     static void WatcherTimerInit(void* arg, grpc_error_handle /*error*/) {
274       auto* self = static_cast<WatcherTimerInitState*>(arg);
275       self->state_watcher_->StartTimer(self->deadline_);
276       delete self;
277     }
278 
279     StateWatcher* state_watcher_;
280     Timestamp deadline_;
281     grpc_closure closure_;
282   };
283 
StartTimer(Timestamp deadline)284   void StartTimer(Timestamp deadline) {
285     const Duration timeout = deadline - Timestamp::Now();
286     MutexLock lock(&mu_);
287     timer_handle_ =
288         channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
289           ApplicationCallbackExecCtx callback_exec_ctx;
290           ExecCtx exec_ctx;
291           self->TimeoutComplete();
292           // StateWatcher deletion might require an active ExecCtx.
293           self.reset();
294         });
295   }
296 
TimeoutComplete()297   void TimeoutComplete() {
298     timer_fired_ = true;
299     // If this is a client channel (not a lame channel), cancel the watch.
300     ClientChannelFilter* client_channel = channel_->GetClientChannelFilter();
301     if (client_channel != nullptr) {
302       client_channel->CancelExternalConnectivityWatcher(&on_complete_);
303     }
304   }
305 
WatchComplete(void * arg,grpc_error_handle error)306   static void WatchComplete(void* arg, grpc_error_handle error) {
307     RefCountedPtr<StateWatcher> self(static_cast<StateWatcher*>(arg));
308     if (GRPC_TRACE_FLAG_ENABLED(op_failure)) {
309       GRPC_LOG_IF_ERROR("watch_completion_error", error);
310     }
311     MutexLock lock(&self->mu_);
312     if (self->timer_handle_.has_value()) {
313       self->channel_->event_engine()->Cancel(*self->timer_handle_);
314     }
315   }
316 
317   // Invoked when both strong refs are released.
Orphaned()318   void Orphaned() override {
319     WeakRef().release();  // Take a weak ref until completion is finished.
320     grpc_error_handle error =
321         timer_fired_
322             ? GRPC_ERROR_CREATE("Timed out waiting for connection state change")
323             : absl::OkStatus();
324     grpc_cq_end_op(cq_, tag_, error, FinishedCompletion, this,
325                    &completion_storage_);
326   }
327 
328   // Called when the completion is returned to the CQ.
FinishedCompletion(void * arg,grpc_cq_completion *)329   static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
330     auto* self = static_cast<StateWatcher*>(arg);
331     self->WeakUnref();
332   }
333 
334   WeakRefCountedPtr<LegacyChannel> channel_;
335   grpc_completion_queue* cq_;
336   void* tag_;
337 
338   grpc_connectivity_state state_;
339   grpc_cq_completion completion_storage_;
340   grpc_closure on_complete_;
341 
342   // timer_handle_ might be accessed in parallel from multiple threads, e.g.
343   // timer callback fired immediately on an EventEngine thread before
344   // RunAfter() returns.
345   Mutex mu_;
346   absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
347       timer_handle_ ABSL_GUARDED_BY(mu_);
348   bool timer_fired_ = false;
349 };
350 
WatchConnectivityState(grpc_connectivity_state last_observed_state,Timestamp deadline,grpc_completion_queue * cq,void * tag)351 void LegacyChannel::WatchConnectivityState(
352     grpc_connectivity_state last_observed_state, Timestamp deadline,
353     grpc_completion_queue* cq, void* tag) {
354   new StateWatcher(WeakRefAsSubclass<LegacyChannel>(), cq, tag,
355                    last_observed_state, deadline);
356 }
357 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)358 void LegacyChannel::AddConnectivityWatcher(
359     grpc_connectivity_state initial_state,
360     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
361   auto* client_channel = GetClientChannelFilter();
362   CHECK_NE(client_channel, nullptr);
363   client_channel->AddConnectivityWatcher(initial_state, std::move(watcher));
364 }
365 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)366 void LegacyChannel::RemoveConnectivityWatcher(
367     AsyncConnectivityStateWatcherInterface* watcher) {
368   auto* client_channel = GetClientChannelFilter();
369   CHECK_NE(client_channel, nullptr);
370   client_channel->RemoveConnectivityWatcher(watcher);
371 }
372 
GetInfo(const grpc_channel_info * channel_info)373 void LegacyChannel::GetInfo(const grpc_channel_info* channel_info) {
374   grpc_channel_element* elem =
375       grpc_channel_stack_element(channel_stack_.get(), 0);
376   elem->filter->get_channel_info(elem, channel_info);
377 }
378 
ResetConnectionBackoff()379 void LegacyChannel::ResetConnectionBackoff() {
380   grpc_transport_op* op = grpc_make_transport_op(nullptr);
381   op->reset_connect_backoff = true;
382   grpc_channel_element* elem =
383       grpc_channel_stack_element(channel_stack_.get(), 0);
384   elem->filter->start_transport_op(elem, op);
385 }
386 
387 namespace {
388 
389 struct ping_result {
390   grpc_closure closure;
391   void* tag;
392   grpc_completion_queue* cq;
393   grpc_cq_completion completion_storage;
394 };
ping_destroy(void * arg,grpc_cq_completion *)395 void ping_destroy(void* arg, grpc_cq_completion* /*storage*/) { gpr_free(arg); }
396 
ping_done(void * arg,grpc_error_handle error)397 void ping_done(void* arg, grpc_error_handle error) {
398   ping_result* pr = static_cast<ping_result*>(arg);
399   grpc_cq_end_op(pr->cq, pr->tag, error, ping_destroy, pr,
400                  &pr->completion_storage);
401 }
402 
403 }  // namespace
404 
Ping(grpc_completion_queue * cq,void * tag)405 void LegacyChannel::Ping(grpc_completion_queue* cq, void* tag) {
406   ping_result* pr = static_cast<ping_result*>(gpr_malloc(sizeof(*pr)));
407   pr->tag = tag;
408   pr->cq = cq;
409   GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
410   grpc_transport_op* op = grpc_make_transport_op(nullptr);
411   op->send_ping.on_ack = &pr->closure;
412   op->bind_pollset = grpc_cq_pollset(cq);
413   CHECK(grpc_cq_begin_op(cq, tag));
414   grpc_channel_element* top_elem =
415       grpc_channel_stack_element(channel_stack_.get(), 0);
416   top_elem->filter->start_transport_op(top_elem, op);
417 }
418 
GetClientChannelFilter() const419 ClientChannelFilter* LegacyChannel::GetClientChannelFilter() const {
420   grpc_channel_element* elem =
421       grpc_channel_stack_last_element(channel_stack_.get());
422   if (elem->filter != &ClientChannelFilter::kFilter) {
423     return nullptr;
424   }
425   return static_cast<ClientChannelFilter*>(elem->channel_data);
426 }
427 
428 }  // namespace grpc_core
429