• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/client_channel/dynamic_filters.h"
18 
19 #include <grpc/support/port_platform.h>
20 #include <stddef.h>
21 
22 #include <new>
23 #include <utility>
24 
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/status/statusor.h"
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/channel/channel_stack.h"
30 #include "src/core/lib/channel/channel_stack_builder_impl.h"
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/lame_client.h"
34 #include "src/core/util/alloc.h"
35 #include "src/core/util/status_helper.h"
36 
37 // Conversion between call and call stack.
38 #define CALL_TO_CALL_STACK(call)                                     \
39   (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
40                                          sizeof(DynamicFilters::Call)))
41 #define CALL_STACK_TO_CALL(callstack)                     \
42   (DynamicFilters::Call*)(((char*)(call_stack)) -         \
43                           GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
44                               sizeof(DynamicFilters::Call)))
45 
46 namespace grpc_core {
47 
48 //
49 // DynamicFilters::Call
50 //
51 
Call(Args args,grpc_error_handle * error)52 DynamicFilters::Call::Call(Args args, grpc_error_handle* error)
53     : channel_stack_(std::move(args.channel_stack)) {
54   grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
55   const grpc_call_element_args call_args = {
56       call_stack,         // call_stack
57       nullptr,            // server_transport_data
58       args.path,          // path
59       args.start_time,    // start_time
60       args.deadline,      // deadline
61       args.arena,         // arena
62       args.call_combiner  // call_combiner
63   };
64   *error = grpc_call_stack_init(channel_stack_->channel_stack_.get(), 1,
65                                 Destroy, this, &call_args);
66   if (GPR_UNLIKELY(!error->ok())) {
67     LOG(ERROR) << "error: " << StatusToString(*error);
68     return;
69   }
70   grpc_call_stack_set_pollset_or_pollset_set(call_stack, args.pollent);
71 }
72 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)73 void DynamicFilters::Call::StartTransportStreamOpBatch(
74     grpc_transport_stream_op_batch* batch) {
75   grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
76   grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
77   GRPC_TRACE_LOG(channel, INFO)
78       << "OP[" << top_elem->filter->name << ":" << top_elem
79       << "]: " << grpc_transport_stream_op_batch_string(batch, false);
80   top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
81 }
82 
SetAfterCallStackDestroy(grpc_closure * closure)83 void DynamicFilters::Call::SetAfterCallStackDestroy(grpc_closure* closure) {
84   CHECK_EQ(after_call_stack_destroy_, nullptr);
85   CHECK_NE(closure, nullptr);
86   after_call_stack_destroy_ = closure;
87 }
88 
Ref()89 RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref() {
90   IncrementRefCount();
91   return RefCountedPtr<DynamicFilters::Call>(this);
92 }
93 
Ref(const DebugLocation & location,const char * reason)94 RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref(
95     const DebugLocation& location, const char* reason) {
96   IncrementRefCount(location, reason);
97   return RefCountedPtr<DynamicFilters::Call>(this);
98 }
99 
Unref()100 void DynamicFilters::Call::Unref() {
101   GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), "dynamic-filters-unref");
102 }
103 
Unref(const DebugLocation &,const char * reason)104 void DynamicFilters::Call::Unref(const DebugLocation& /*location*/,
105                                  const char* reason) {
106   GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), reason);
107 }
108 
Destroy(void * arg,grpc_error_handle)109 void DynamicFilters::Call::Destroy(void* arg, grpc_error_handle /*error*/) {
110   DynamicFilters::Call* self = static_cast<DynamicFilters::Call*>(arg);
111   // Keep some members before destroying the subchannel call.
112   grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
113   RefCountedPtr<DynamicFilters> channel_stack = std::move(self->channel_stack_);
114   // Destroy the subchannel call.
115   self->~Call();
116   // Destroy the call stack. This should be after destroying the call, because
117   // call->after_call_stack_destroy(), if not null, will free the call arena.
118   grpc_call_stack_destroy(CALL_TO_CALL_STACK(self), nullptr,
119                           after_call_stack_destroy);
120   // Automatically reset channel_stack. This should be after destroying the call
121   // stack, because destroying call stack needs access to the channel stack.
122 }
123 
IncrementRefCount()124 void DynamicFilters::Call::IncrementRefCount() {
125   GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), "");
126 }
127 
IncrementRefCount(const DebugLocation &,const char * reason)128 void DynamicFilters::Call::IncrementRefCount(const DebugLocation& /*location*/,
129                                              const char* reason) {
130   GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), reason);
131 }
132 
133 //
134 // DynamicFilters
135 //
136 
137 namespace {
138 
CreateChannelStack(const ChannelArgs & args,std::vector<const grpc_channel_filter * > filters,const Blackboard * old_blackboard,Blackboard * new_blackboard)139 absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
140     const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters,
141     const Blackboard* old_blackboard, Blackboard* new_blackboard) {
142   ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC, args);
143   builder.SetBlackboards(old_blackboard, new_blackboard);
144   for (auto filter : filters) {
145     builder.AppendFilter(filter);
146   }
147   return builder.Build();
148 }
149 
150 }  // namespace
151 
Create(const ChannelArgs & args,std::vector<const grpc_channel_filter * > filters,const Blackboard * old_blackboard,Blackboard * new_blackboard)152 RefCountedPtr<DynamicFilters> DynamicFilters::Create(
153     const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters,
154     const Blackboard* old_blackboard, Blackboard* new_blackboard) {
155   // Attempt to create channel stack from requested filters.
156   auto p = CreateChannelStack(args, std::move(filters), old_blackboard,
157                               new_blackboard);
158   if (!p.ok()) {
159     // Channel stack creation failed with requested filters.
160     // Create with lame filter instead.
161     auto error = p.status();
162     p = CreateChannelStack(args.Set(MakeLameClientErrorArg(&error)),
163                            {&LameClientFilter::kFilter}, nullptr, nullptr);
164   }
165   return MakeRefCounted<DynamicFilters>(std::move(p.value()));
166 }
167 
CreateCall(DynamicFilters::Call::Args args,grpc_error_handle * error)168 RefCountedPtr<DynamicFilters::Call> DynamicFilters::CreateCall(
169     DynamicFilters::Call::Args args, grpc_error_handle* error) {
170   size_t allocation_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(Call)) +
171                            channel_stack_->call_stack_size;
172   Call* call = static_cast<Call*>(args.arena->Alloc(allocation_size));
173   new (call) Call(std::move(args), error);
174   return RefCountedPtr<DynamicFilters::Call>(call);
175 }
176 
177 }  // namespace grpc_core
178