1 //
2 // Copyright 2020 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/client_channel/dynamic_filters.h"
18
19 #include <grpc/support/port_platform.h>
20 #include <stddef.h>
21
22 #include <new>
23 #include <utility>
24
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/status/statusor.h"
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/channel/channel_stack.h"
30 #include "src/core/lib/channel/channel_stack_builder_impl.h"
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/lib/surface/channel_stack_type.h"
33 #include "src/core/lib/surface/lame_client.h"
34 #include "src/core/util/alloc.h"
35 #include "src/core/util/status_helper.h"
36
37 // Conversion between call and call stack.
38 #define CALL_TO_CALL_STACK(call) \
39 (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
40 sizeof(DynamicFilters::Call)))
41 #define CALL_STACK_TO_CALL(callstack) \
42 (DynamicFilters::Call*)(((char*)(call_stack)) - \
43 GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
44 sizeof(DynamicFilters::Call)))
45
46 namespace grpc_core {
47
48 //
49 // DynamicFilters::Call
50 //
51
Call(Args args,grpc_error_handle * error)52 DynamicFilters::Call::Call(Args args, grpc_error_handle* error)
53 : channel_stack_(std::move(args.channel_stack)) {
54 grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
55 const grpc_call_element_args call_args = {
56 call_stack, // call_stack
57 nullptr, // server_transport_data
58 args.path, // path
59 args.start_time, // start_time
60 args.deadline, // deadline
61 args.arena, // arena
62 args.call_combiner // call_combiner
63 };
64 *error = grpc_call_stack_init(channel_stack_->channel_stack_.get(), 1,
65 Destroy, this, &call_args);
66 if (GPR_UNLIKELY(!error->ok())) {
67 LOG(ERROR) << "error: " << StatusToString(*error);
68 return;
69 }
70 grpc_call_stack_set_pollset_or_pollset_set(call_stack, args.pollent);
71 }
72
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)73 void DynamicFilters::Call::StartTransportStreamOpBatch(
74 grpc_transport_stream_op_batch* batch) {
75 grpc_call_stack* call_stack = CALL_TO_CALL_STACK(this);
76 grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
77 GRPC_TRACE_LOG(channel, INFO)
78 << "OP[" << top_elem->filter->name << ":" << top_elem
79 << "]: " << grpc_transport_stream_op_batch_string(batch, false);
80 top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
81 }
82
SetAfterCallStackDestroy(grpc_closure * closure)83 void DynamicFilters::Call::SetAfterCallStackDestroy(grpc_closure* closure) {
84 CHECK_EQ(after_call_stack_destroy_, nullptr);
85 CHECK_NE(closure, nullptr);
86 after_call_stack_destroy_ = closure;
87 }
88
Ref()89 RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref() {
90 IncrementRefCount();
91 return RefCountedPtr<DynamicFilters::Call>(this);
92 }
93
Ref(const DebugLocation & location,const char * reason)94 RefCountedPtr<DynamicFilters::Call> DynamicFilters::Call::Ref(
95 const DebugLocation& location, const char* reason) {
96 IncrementRefCount(location, reason);
97 return RefCountedPtr<DynamicFilters::Call>(this);
98 }
99
Unref()100 void DynamicFilters::Call::Unref() {
101 GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), "dynamic-filters-unref");
102 }
103
Unref(const DebugLocation &,const char * reason)104 void DynamicFilters::Call::Unref(const DebugLocation& /*location*/,
105 const char* reason) {
106 GRPC_CALL_STACK_UNREF(CALL_TO_CALL_STACK(this), reason);
107 }
108
Destroy(void * arg,grpc_error_handle)109 void DynamicFilters::Call::Destroy(void* arg, grpc_error_handle /*error*/) {
110 DynamicFilters::Call* self = static_cast<DynamicFilters::Call*>(arg);
111 // Keep some members before destroying the subchannel call.
112 grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
113 RefCountedPtr<DynamicFilters> channel_stack = std::move(self->channel_stack_);
114 // Destroy the subchannel call.
115 self->~Call();
116 // Destroy the call stack. This should be after destroying the call, because
117 // call->after_call_stack_destroy(), if not null, will free the call arena.
118 grpc_call_stack_destroy(CALL_TO_CALL_STACK(self), nullptr,
119 after_call_stack_destroy);
120 // Automatically reset channel_stack. This should be after destroying the call
121 // stack, because destroying call stack needs access to the channel stack.
122 }
123
IncrementRefCount()124 void DynamicFilters::Call::IncrementRefCount() {
125 GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), "");
126 }
127
IncrementRefCount(const DebugLocation &,const char * reason)128 void DynamicFilters::Call::IncrementRefCount(const DebugLocation& /*location*/,
129 const char* reason) {
130 GRPC_CALL_STACK_REF(CALL_TO_CALL_STACK(this), reason);
131 }
132
133 //
134 // DynamicFilters
135 //
136
137 namespace {
138
CreateChannelStack(const ChannelArgs & args,std::vector<const grpc_channel_filter * > filters,const Blackboard * old_blackboard,Blackboard * new_blackboard)139 absl::StatusOr<RefCountedPtr<grpc_channel_stack>> CreateChannelStack(
140 const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters,
141 const Blackboard* old_blackboard, Blackboard* new_blackboard) {
142 ChannelStackBuilderImpl builder("DynamicFilters", GRPC_CLIENT_DYNAMIC, args);
143 builder.SetBlackboards(old_blackboard, new_blackboard);
144 for (auto filter : filters) {
145 builder.AppendFilter(filter);
146 }
147 return builder.Build();
148 }
149
150 } // namespace
151
Create(const ChannelArgs & args,std::vector<const grpc_channel_filter * > filters,const Blackboard * old_blackboard,Blackboard * new_blackboard)152 RefCountedPtr<DynamicFilters> DynamicFilters::Create(
153 const ChannelArgs& args, std::vector<const grpc_channel_filter*> filters,
154 const Blackboard* old_blackboard, Blackboard* new_blackboard) {
155 // Attempt to create channel stack from requested filters.
156 auto p = CreateChannelStack(args, std::move(filters), old_blackboard,
157 new_blackboard);
158 if (!p.ok()) {
159 // Channel stack creation failed with requested filters.
160 // Create with lame filter instead.
161 auto error = p.status();
162 p = CreateChannelStack(args.Set(MakeLameClientErrorArg(&error)),
163 {&LameClientFilter::kFilter}, nullptr, nullptr);
164 }
165 return MakeRefCounted<DynamicFilters>(std::move(p.value()));
166 }
167
CreateCall(DynamicFilters::Call::Args args,grpc_error_handle * error)168 RefCountedPtr<DynamicFilters::Call> DynamicFilters::CreateCall(
169 DynamicFilters::Call::Args args, grpc_error_handle* error) {
170 size_t allocation_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(Call)) +
171 channel_stack_->call_stack_size;
172 Call* call = static_cast<Call*>(args.arena->Alloc(allocation_size));
173 new (call) Call(std::move(args), error);
174 return RefCountedPtr<DynamicFilters::Call>(call);
175 }
176
177 } // namespace grpc_core
178