1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/ext/filters/message_size/message_size_filter.h"
18
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpc/status.h>
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23
24 #include <functional>
25 #include <utility>
26
27 #include "absl/log/log.h"
28 #include "absl/strings/str_format.h"
29 #include "src/core/config/core_configuration.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/channel/channel_stack.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/promise/activity.h"
34 #include "src/core/lib/promise/context.h"
35 #include "src/core/lib/promise/latch.h"
36 #include "src/core/lib/promise/race.h"
37 #include "src/core/lib/resource_quota/arena.h"
38 #include "src/core/lib/slice/slice.h"
39 #include "src/core/lib/slice/slice_buffer.h"
40 #include "src/core/lib/surface/channel_stack_type.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/transport.h"
43 #include "src/core/service_config/service_config_call_data.h"
44 #include "src/core/util/latent_see.h"
45
46 namespace grpc_core {
47
48 const NoInterceptor ClientMessageSizeFilter::Call::OnClientInitialMetadata;
49 const NoInterceptor ClientMessageSizeFilter::Call::OnServerInitialMetadata;
50 const NoInterceptor ClientMessageSizeFilter::Call::OnServerTrailingMetadata;
51 const NoInterceptor ClientMessageSizeFilter::Call::OnClientToServerHalfClose;
52 const NoInterceptor ClientMessageSizeFilter::Call::OnFinalize;
53 const NoInterceptor ServerMessageSizeFilter::Call::OnClientInitialMetadata;
54 const NoInterceptor ServerMessageSizeFilter::Call::OnServerInitialMetadata;
55 const NoInterceptor ServerMessageSizeFilter::Call::OnServerTrailingMetadata;
56 const NoInterceptor ServerMessageSizeFilter::Call::OnClientToServerHalfClose;
57 const NoInterceptor ServerMessageSizeFilter::Call::OnFinalize;
58
59 //
60 // MessageSizeParsedConfig
61 //
62
GetFromCallContext(Arena * arena,size_t service_config_parser_index)63 const MessageSizeParsedConfig* MessageSizeParsedConfig::GetFromCallContext(
64 Arena* arena, size_t service_config_parser_index) {
65 auto* svc_cfg_call_data = arena->GetContext<ServiceConfigCallData>();
66 if (svc_cfg_call_data == nullptr) return nullptr;
67 return static_cast<const MessageSizeParsedConfig*>(
68 svc_cfg_call_data->GetMethodParsedConfig(service_config_parser_index));
69 }
70
GetFromChannelArgs(const ChannelArgs & channel_args)71 MessageSizeParsedConfig MessageSizeParsedConfig::GetFromChannelArgs(
72 const ChannelArgs& channel_args) {
73 MessageSizeParsedConfig limits;
74 limits.max_send_size_ = GetMaxSendSizeFromChannelArgs(channel_args);
75 limits.max_recv_size_ = GetMaxRecvSizeFromChannelArgs(channel_args);
76 return limits;
77 }
78
GetMaxRecvSizeFromChannelArgs(const ChannelArgs & args)79 absl::optional<uint32_t> GetMaxRecvSizeFromChannelArgs(
80 const ChannelArgs& args) {
81 if (args.WantMinimalStack()) return absl::nullopt;
82 int size = args.GetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)
83 .value_or(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH);
84 if (size < 0) return absl::nullopt;
85 return static_cast<uint32_t>(size);
86 }
87
GetMaxSendSizeFromChannelArgs(const ChannelArgs & args)88 absl::optional<uint32_t> GetMaxSendSizeFromChannelArgs(
89 const ChannelArgs& args) {
90 if (args.WantMinimalStack()) return absl::nullopt;
91 int size = args.GetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH)
92 .value_or(GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH);
93 if (size < 0) return absl::nullopt;
94 return static_cast<uint32_t>(size);
95 }
96
JsonLoader(const JsonArgs &)97 const JsonLoaderInterface* MessageSizeParsedConfig::JsonLoader(
98 const JsonArgs&) {
99 static const auto* loader =
100 JsonObjectLoader<MessageSizeParsedConfig>()
101 .OptionalField("maxRequestMessageBytes",
102 &MessageSizeParsedConfig::max_send_size_)
103 .OptionalField("maxResponseMessageBytes",
104 &MessageSizeParsedConfig::max_recv_size_)
105 .Finish();
106 return loader;
107 }
108
109 //
110 // MessageSizeParser
111 //
112
113 std::unique_ptr<ServiceConfigParser::ParsedConfig>
ParsePerMethodParams(const ChannelArgs &,const Json & json,ValidationErrors * errors)114 MessageSizeParser::ParsePerMethodParams(const ChannelArgs& /*args*/,
115 const Json& json,
116 ValidationErrors* errors) {
117 return LoadFromJson<std::unique_ptr<MessageSizeParsedConfig>>(
118 json, JsonArgs(), errors);
119 }
120
Register(CoreConfiguration::Builder * builder)121 void MessageSizeParser::Register(CoreConfiguration::Builder* builder) {
122 builder->service_config_parser()->RegisterParser(
123 std::make_unique<MessageSizeParser>());
124 }
125
ParserIndex()126 size_t MessageSizeParser::ParserIndex() {
127 return CoreConfiguration::Get().service_config_parser().GetParserIndex(
128 parser_name());
129 }
130
131 //
132 // MessageSizeFilter
133 //
134
135 const grpc_channel_filter ClientMessageSizeFilter::kFilter =
136 MakePromiseBasedFilter<ClientMessageSizeFilter, FilterEndpoint::kClient,
137 kFilterExaminesOutboundMessages |
138 kFilterExaminesInboundMessages>();
139
140 const grpc_channel_filter ServerMessageSizeFilter::kFilter =
141 MakePromiseBasedFilter<ServerMessageSizeFilter, FilterEndpoint::kServer,
142 kFilterExaminesOutboundMessages |
143 kFilterExaminesInboundMessages>();
144
145 absl::StatusOr<std::unique_ptr<ClientMessageSizeFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)146 ClientMessageSizeFilter::Create(const ChannelArgs& args, ChannelFilter::Args) {
147 return std::make_unique<ClientMessageSizeFilter>(args);
148 }
149
150 absl::StatusOr<std::unique_ptr<ServerMessageSizeFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)151 ServerMessageSizeFilter::Create(const ChannelArgs& args, ChannelFilter::Args) {
152 return std::make_unique<ServerMessageSizeFilter>(args);
153 }
154
155 namespace {
CheckPayload(const Message & msg,absl::optional<uint32_t> max_length,bool is_client,bool is_send)156 ServerMetadataHandle CheckPayload(const Message& msg,
157 absl::optional<uint32_t> max_length,
158 bool is_client, bool is_send) {
159 if (!max_length.has_value()) return nullptr;
160 GRPC_TRACE_LOG(call, INFO)
161 << GetContext<Activity>()->DebugTag() << "[message_size] "
162 << (is_send ? "send" : "recv") << " len:" << msg.payload()->Length()
163 << " max:" << *max_length;
164 if (msg.payload()->Length() <= *max_length) return nullptr;
165 return ServerMetadataFromStatus(
166 GRPC_STATUS_RESOURCE_EXHAUSTED,
167 absl::StrFormat("%s: %s message larger than max (%u vs. %d)",
168 is_client ? "CLIENT" : "SERVER",
169 is_send ? "Sent" : "Received", msg.payload()->Length(),
170 *max_length));
171 }
172 } // namespace
173
Call(ClientMessageSizeFilter * filter)174 ClientMessageSizeFilter::Call::Call(ClientMessageSizeFilter* filter)
175 : limits_(filter->parsed_config_) {
176 // Get max sizes from channel data, then merge in per-method config values.
177 // Note: Per-method config is only available on the client, so we
178 // apply the max request size to the send limit and the max response
179 // size to the receive limit.
180 const MessageSizeParsedConfig* config_from_call_context =
181 MessageSizeParsedConfig::GetFromCallContext(
182 GetContext<Arena>(), filter->service_config_parser_index_);
183 if (config_from_call_context != nullptr) {
184 absl::optional<uint32_t> max_send_size = limits_.max_send_size();
185 absl::optional<uint32_t> max_recv_size = limits_.max_recv_size();
186 if (config_from_call_context->max_send_size().has_value() &&
187 (!max_send_size.has_value() ||
188 *config_from_call_context->max_send_size() < *max_send_size)) {
189 max_send_size = config_from_call_context->max_send_size();
190 }
191 if (config_from_call_context->max_recv_size().has_value() &&
192 (!max_recv_size.has_value() ||
193 *config_from_call_context->max_recv_size() < *max_recv_size)) {
194 max_recv_size = config_from_call_context->max_recv_size();
195 }
196 limits_ = MessageSizeParsedConfig(max_send_size, max_recv_size);
197 }
198 }
199
OnClientToServerMessage(const Message & message,ServerMessageSizeFilter * filter)200 ServerMetadataHandle ServerMessageSizeFilter::Call::OnClientToServerMessage(
201 const Message& message, ServerMessageSizeFilter* filter) {
202 GRPC_LATENT_SEE_INNER_SCOPE(
203 "ServerMessageSizeFilter::Call::OnClientToServerMessage");
204 return CheckPayload(message, filter->parsed_config_.max_recv_size(),
205 /*is_client=*/false, false);
206 }
207
OnServerToClientMessage(const Message & message,ServerMessageSizeFilter * filter)208 ServerMetadataHandle ServerMessageSizeFilter::Call::OnServerToClientMessage(
209 const Message& message, ServerMessageSizeFilter* filter) {
210 GRPC_LATENT_SEE_INNER_SCOPE(
211 "ServerMessageSizeFilter::Call::OnServerToClientMessage");
212 return CheckPayload(message, filter->parsed_config_.max_send_size(),
213 /*is_client=*/false, true);
214 }
215
OnClientToServerMessage(const Message & message)216 ServerMetadataHandle ClientMessageSizeFilter::Call::OnClientToServerMessage(
217 const Message& message) {
218 GRPC_LATENT_SEE_INNER_SCOPE(
219 "ClientMessageSizeFilter::Call::OnClientToServerMessage");
220 return CheckPayload(message, limits_.max_send_size(), /*is_client=*/true,
221 true);
222 }
223
OnServerToClientMessage(const Message & message)224 ServerMetadataHandle ClientMessageSizeFilter::Call::OnServerToClientMessage(
225 const Message& message) {
226 GRPC_LATENT_SEE_INNER_SCOPE(
227 "ClientMessageSizeFilter::Call::OnServerToClientMessage");
228 return CheckPayload(message, limits_.max_recv_size(), /*is_client=*/true,
229 false);
230 }
231
232 namespace {
233 // Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the
234 // filter only if message size limits or service config is specified.
HasMessageSizeLimits(const ChannelArgs & channel_args)235 bool HasMessageSizeLimits(const ChannelArgs& channel_args) {
236 MessageSizeParsedConfig limits =
237 MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
238 return limits.max_send_size().has_value() ||
239 limits.max_recv_size().has_value() ||
240 channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
241 }
242
243 } // namespace
RegisterMessageSizeFilter(CoreConfiguration::Builder * builder)244 void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
245 MessageSizeParser::Register(builder);
246 builder->channel_init()
247 ->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_SUBCHANNEL)
248 .ExcludeFromMinimalStack();
249 builder->channel_init()
250 ->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
251 .ExcludeFromMinimalStack()
252 .If(HasMessageSizeLimits);
253 builder->channel_init()
254 ->RegisterFilter<ServerMessageSizeFilter>(GRPC_SERVER_CHANNEL)
255 .ExcludeFromMinimalStack()
256 .If(HasMessageSizeLimits);
257 }
258 } // namespace grpc_core
259