• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/transport/call_filters.h"
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "absl/log/check.h"
20 #include "absl/log/log.h"
21 #include "src/core/lib/transport/metadata.h"
22 #include "src/core/util/crash.h"
23 
24 namespace grpc_core {
25 // Call data for those calls that don't have any call data
26 // (we form pointers to this that aren't allowed to be nullptr)
27 char CallFilters::g_empty_call_data_;
28 
29 ///////////////////////////////////////////////////////////////////////////////
30 // CallFilters
31 
Start()32 void CallFilters::Start() {
33   CHECK_EQ(call_data_, nullptr);
34   size_t call_data_alignment = 1;
35   for (const auto& stack : stacks_) {
36     call_data_alignment =
37         std::max(call_data_alignment, stack.stack->data_.call_data_alignment);
38   }
39   size_t call_data_size = 0;
40   for (auto& stack : stacks_) {
41     stack.call_data_offset = call_data_size;
42     size_t stack_call_data_size = stack.stack->data_.call_data_size;
43     if (stack_call_data_size % call_data_alignment != 0) {
44       stack_call_data_size +=
45           call_data_alignment - stack_call_data_size % call_data_alignment;
46     }
47     call_data_size += stack_call_data_size;
48   }
49   if (call_data_size != 0) {
50     call_data_ = gpr_malloc_aligned(call_data_size, call_data_alignment);
51   } else {
52     call_data_ = &g_empty_call_data_;
53   }
54   for (const auto& stack : stacks_) {
55     for (const auto& constructor : stack.stack->data_.filter_constructor) {
56       constructor.call_init(
57           filters_detail::Offset(
58               call_data_, stack.call_data_offset + constructor.call_offset),
59           constructor.channel_data);
60     }
61   }
62   call_state_.Start();
63 }
64 
Finalize(const grpc_call_final_info * final_info)65 void CallFilters::Finalize(const grpc_call_final_info* final_info) {
66   for (auto& stack : stacks_) {
67     for (auto& finalizer : stack.stack->data_.finalizers) {
68       finalizer.final(
69           filters_detail::Offset(
70               call_data_, stack.call_data_offset + finalizer.call_offset),
71           finalizer.channel_data, final_info);
72     }
73   }
74 }
75 
CancelDueToFailedPipeOperation(SourceLocation but_where)76 void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
77   // We expect something cancelled before now
78   if (push_server_trailing_metadata_ == nullptr) return;
79   GRPC_TRACE_VLOG(promise_primitives, 2)
80           .AtLocation(but_where.file(), but_where.line())
81       << "Cancelling due to failed pipe operation: " << DebugString();
82   Cancel();
83 }
84 
PushServerTrailingMetadata(ServerMetadataHandle md)85 void CallFilters::PushServerTrailingMetadata(ServerMetadataHandle md) {
86   CHECK(md != nullptr);
87   GRPC_TRACE_LOG(call, INFO)
88       << GetContext<Activity>()->DebugTag() << " PushServerTrailingMetadata["
89       << this << "]: " << md->DebugString() << " into " << DebugString();
90   CHECK(md != nullptr);
91   if (call_state_.PushServerTrailingMetadata(
92           md->get(GrpcCallWasCancelled()).value_or(false))) {
93     push_server_trailing_metadata_ = std::move(md);
94   }
95 }
96 
Cancel()97 void CallFilters::Cancel() {
98   GRPC_TRACE_LOG(call, INFO) << GetContext<Activity>()->DebugTag() << " Cancel["
99                              << this << "]: into " << DebugString();
100   if (call_state_.PushServerTrailingMetadata(true)) {
101     push_server_trailing_metadata_ =
102         CancelledServerMetadataFromStatus(GRPC_STATUS_CANCELLED);
103   }
104 }
105 
DebugString() const106 std::string CallFilters::DebugString() const {
107   std::vector<std::string> components = {
108       absl::StrFormat("this:%p", this),
109       absl::StrCat("state:", call_state_.DebugString()),
110       absl::StrCat("server_trailing_metadata:",
111                    push_server_trailing_metadata_ == nullptr
112                        ? "not-set"
113                        : push_server_trailing_metadata_->DebugString())};
114   return absl::StrCat("CallFilters{", absl::StrJoin(components, ", "), "}");
115 };
116 
117 ///////////////////////////////////////////////////////////////////////////////
118 // CallFilters::Stack
119 
~Stack()120 CallFilters::Stack::~Stack() {
121   for (auto& destructor : data_.channel_data_destructors) {
122     destructor.destroy(destructor.channel_data);
123   }
124 }
125 
126 ///////////////////////////////////////////////////////////////////////////////
127 // CallFilters::StackBuilder
128 
~StackBuilder()129 CallFilters::StackBuilder::~StackBuilder() {
130   for (auto& destructor : data_.channel_data_destructors) {
131     destructor.destroy(destructor.channel_data);
132   }
133 }
134 
Build()135 RefCountedPtr<CallFilters::Stack> CallFilters::StackBuilder::Build() {
136   if (data_.call_data_size % data_.call_data_alignment != 0) {
137     data_.call_data_size += data_.call_data_alignment -
138                             data_.call_data_size % data_.call_data_alignment;
139   }
140   // server -> client needs to be reversed so that we can iterate all stacks
141   // in the same order
142   data_.server_initial_metadata.Reverse();
143   data_.server_to_client_messages.Reverse();
144   absl::c_reverse(data_.server_trailing_metadata);
145   return RefCountedPtr<Stack>(new Stack(std::move(data_)));
146 }
147 
148 }  // namespace grpc_core
149