1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/transport/call_filters.h"
16
17 #include <grpc/support/port_platform.h>
18
19 #include "absl/log/check.h"
20 #include "absl/log/log.h"
21 #include "src/core/lib/transport/metadata.h"
22 #include "src/core/util/crash.h"
23
24 namespace grpc_core {
25 // Call data for those calls that don't have any call data
26 // (we form pointers to this that aren't allowed to be nullptr)
27 char CallFilters::g_empty_call_data_;
28
29 ///////////////////////////////////////////////////////////////////////////////
30 // CallFilters
31
Start()32 void CallFilters::Start() {
33 CHECK_EQ(call_data_, nullptr);
34 size_t call_data_alignment = 1;
35 for (const auto& stack : stacks_) {
36 call_data_alignment =
37 std::max(call_data_alignment, stack.stack->data_.call_data_alignment);
38 }
39 size_t call_data_size = 0;
40 for (auto& stack : stacks_) {
41 stack.call_data_offset = call_data_size;
42 size_t stack_call_data_size = stack.stack->data_.call_data_size;
43 if (stack_call_data_size % call_data_alignment != 0) {
44 stack_call_data_size +=
45 call_data_alignment - stack_call_data_size % call_data_alignment;
46 }
47 call_data_size += stack_call_data_size;
48 }
49 if (call_data_size != 0) {
50 call_data_ = gpr_malloc_aligned(call_data_size, call_data_alignment);
51 } else {
52 call_data_ = &g_empty_call_data_;
53 }
54 for (const auto& stack : stacks_) {
55 for (const auto& constructor : stack.stack->data_.filter_constructor) {
56 constructor.call_init(
57 filters_detail::Offset(
58 call_data_, stack.call_data_offset + constructor.call_offset),
59 constructor.channel_data);
60 }
61 }
62 call_state_.Start();
63 }
64
Finalize(const grpc_call_final_info * final_info)65 void CallFilters::Finalize(const grpc_call_final_info* final_info) {
66 for (auto& stack : stacks_) {
67 for (auto& finalizer : stack.stack->data_.finalizers) {
68 finalizer.final(
69 filters_detail::Offset(
70 call_data_, stack.call_data_offset + finalizer.call_offset),
71 finalizer.channel_data, final_info);
72 }
73 }
74 }
75
CancelDueToFailedPipeOperation(SourceLocation but_where)76 void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
77 // We expect something cancelled before now
78 if (push_server_trailing_metadata_ == nullptr) return;
79 GRPC_TRACE_VLOG(promise_primitives, 2)
80 .AtLocation(but_where.file(), but_where.line())
81 << "Cancelling due to failed pipe operation: " << DebugString();
82 Cancel();
83 }
84
PushServerTrailingMetadata(ServerMetadataHandle md)85 void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
86 CHECK(md != nullptr);
87 GRPC_TRACE_LOG(call, INFO)
88 << GetContext<Activity>()->DebugTag() << " PushServerTrailingMetadata["
89 << this << "]: " << md->DebugString() << " into " << DebugString();
90 CHECK(md != nullptr);
91 if (call_state_.PushServerTrailingMetadata(
92 md->get(GrpcCallWasCancelled()).value_or(false))) {
93 push_server_trailing_metadata_ = std::move(md);
94 }
95 }
96
Cancel()97 void CallFilters::Cancel() {
98 GRPC_TRACE_LOG(call, INFO) << GetContext<Activity>()->DebugTag() << " Cancel["
99 << this << "]: into " << DebugString();
100 if (call_state_.PushServerTrailingMetadata(true)) {
101 push_server_trailing_metadata_ =
102 CancelledServerMetadataFromStatus(GRPC_STATUS_CANCELLED);
103 }
104 }
105
DebugString() const106 std::string CallFilters::DebugString() const {
107 std::vector<std::string> components = {
108 absl::StrFormat("this:%p", this),
109 absl::StrCat("state:", call_state_.DebugString()),
110 absl::StrCat("server_trailing_metadata:",
111 push_server_trailing_metadata_ == nullptr
112 ? "not-set"
113 : push_server_trailing_metadata_->DebugString())};
114 return absl::StrCat("CallFilters{", absl::StrJoin(components, ", "), "}");
115 };
116
117 ///////////////////////////////////////////////////////////////////////////////
118 // CallFilters::Stack
119
~Stack()120 CallFilters::Stack::~Stack() {
121 for (auto& destructor : data_.channel_data_destructors) {
122 destructor.destroy(destructor.channel_data);
123 }
124 }
125
126 ///////////////////////////////////////////////////////////////////////////////
127 // CallFilters::StackBuilder
128
~StackBuilder()129 CallFilters::StackBuilder::~StackBuilder() {
130 for (auto& destructor : data_.channel_data_destructors) {
131 destructor.destroy(destructor.channel_data);
132 }
133 }
134
Build()135 RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
136 if (data_.call_data_size % data_.call_data_alignment != 0) {
137 data_.call_data_size += data_.call_data_alignment -
138 data_.call_data_size % data_.call_data_alignment;
139 }
140 // server -> client needs to be reversed so that we can iterate all stacks
141 // in the same order
142 data_.server_initial_metadata.Reverse();
143 data_.server_to_client_messages.Reverse();
144 absl::c_reverse(data_.server_trailing_metadata);
145 return RefCountedPtr<Stack>(new Stack(std::move(data_)));
146 }
147
148 } // namespace grpc_core
149