• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/ext/filters/message_size/message_size_filter.h"
18 
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpc/status.h>
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 
24 #include <functional>
25 #include <utility>
26 
27 #include "absl/log/log.h"
28 #include "absl/strings/str_format.h"
29 #include "src/core/config/core_configuration.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/channel/channel_stack.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/promise/activity.h"
34 #include "src/core/lib/promise/context.h"
35 #include "src/core/lib/promise/latch.h"
36 #include "src/core/lib/promise/race.h"
37 #include "src/core/lib/resource_quota/arena.h"
38 #include "src/core/lib/slice/slice.h"
39 #include "src/core/lib/slice/slice_buffer.h"
40 #include "src/core/lib/surface/channel_stack_type.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/transport.h"
43 #include "src/core/service_config/service_config_call_data.h"
44 #include "src/core/util/latent_see.h"
45 
46 namespace grpc_core {
47 
48 const NoInterceptor ClientMessageSizeFilter::Call::OnClientInitialMetadata;
49 const NoInterceptor ClientMessageSizeFilter::Call::OnServerInitialMetadata;
50 const NoInterceptor ClientMessageSizeFilter::Call::OnServerTrailingMetadata;
51 const NoInterceptor ClientMessageSizeFilter::Call::OnClientToServerHalfClose;
52 const NoInterceptor ClientMessageSizeFilter::Call::OnFinalize;
53 const NoInterceptor ServerMessageSizeFilter::Call::OnClientInitialMetadata;
54 const NoInterceptor ServerMessageSizeFilter::Call::OnServerInitialMetadata;
55 const NoInterceptor ServerMessageSizeFilter::Call::OnServerTrailingMetadata;
56 const NoInterceptor ServerMessageSizeFilter::Call::OnClientToServerHalfClose;
57 const NoInterceptor ServerMessageSizeFilter::Call::OnFinalize;
58 
59 //
60 // MessageSizeParsedConfig
61 //
62 
GetFromCallContext(Arena * arena,size_t service_config_parser_index)63 const MessageSizeParsedConfig* MessageSizeParsedConfig::GetFromCallContext(
64     Arena* arena, size_t service_config_parser_index) {
65   auto* svc_cfg_call_data = arena->GetContext<ServiceConfigCallData>();
66   if (svc_cfg_call_data == nullptr) return nullptr;
67   return static_cast<const MessageSizeParsedConfig*>(
68       svc_cfg_call_data->GetMethodParsedConfig(service_config_parser_index));
69 }
70 
GetFromChannelArgs(const ChannelArgs & channel_args)71 MessageSizeParsedConfig MessageSizeParsedConfig::GetFromChannelArgs(
72     const ChannelArgs& channel_args) {
73   MessageSizeParsedConfig limits;
74   limits.max_send_size_ = GetMaxSendSizeFromChannelArgs(channel_args);
75   limits.max_recv_size_ = GetMaxRecvSizeFromChannelArgs(channel_args);
76   return limits;
77 }
78 
GetMaxRecvSizeFromChannelArgs(const ChannelArgs & args)79 absl::optional<uint32_t> GetMaxRecvSizeFromChannelArgs(
80     const ChannelArgs& args) {
81   if (args.WantMinimalStack()) return absl::nullopt;
82   int size = args.GetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH)
83                  .value_or(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH);
84   if (size < 0) return absl::nullopt;
85   return static_cast<uint32_t>(size);
86 }
87 
GetMaxSendSizeFromChannelArgs(const ChannelArgs & args)88 absl::optional<uint32_t> GetMaxSendSizeFromChannelArgs(
89     const ChannelArgs& args) {
90   if (args.WantMinimalStack()) return absl::nullopt;
91   int size = args.GetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH)
92                  .value_or(GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH);
93   if (size < 0) return absl::nullopt;
94   return static_cast<uint32_t>(size);
95 }
96 
JsonLoader(const JsonArgs &)97 const JsonLoaderInterface* MessageSizeParsedConfig::JsonLoader(
98     const JsonArgs&) {
99   static const auto* loader =
100       JsonObjectLoader<MessageSizeParsedConfig>()
101           .OptionalField("maxRequestMessageBytes",
102                          &MessageSizeParsedConfig::max_send_size_)
103           .OptionalField("maxResponseMessageBytes",
104                          &MessageSizeParsedConfig::max_recv_size_)
105           .Finish();
106   return loader;
107 }
108 
109 //
110 // MessageSizeParser
111 //
112 
113 std::unique_ptr<ServiceConfigParser::ParsedConfig>
ParsePerMethodParams(const ChannelArgs &,const Json & json,ValidationErrors * errors)114 MessageSizeParser::ParsePerMethodParams(const ChannelArgs& /*args*/,
115                                         const Json& json,
116                                         ValidationErrors* errors) {
117   return LoadFromJson<std::unique_ptr<MessageSizeParsedConfig>>(
118       json, JsonArgs(), errors);
119 }
120 
Register(CoreConfiguration::Builder * builder)121 void MessageSizeParser::Register(CoreConfiguration::Builder* builder) {
122   builder->service_config_parser()->RegisterParser(
123       std::make_unique<MessageSizeParser>());
124 }
125 
ParserIndex()126 size_t MessageSizeParser::ParserIndex() {
127   return CoreConfiguration::Get().service_config_parser().GetParserIndex(
128       parser_name());
129 }
130 
131 //
132 // MessageSizeFilter
133 //
134 
135 const grpc_channel_filter ClientMessageSizeFilter::kFilter =
136     MakePromiseBasedFilter<ClientMessageSizeFilter, FilterEndpoint::kClient,
137                            kFilterExaminesOutboundMessages |
138                                kFilterExaminesInboundMessages>();
139 
140 const grpc_channel_filter ServerMessageSizeFilter::kFilter =
141     MakePromiseBasedFilter<ServerMessageSizeFilter, FilterEndpoint::kServer,
142                            kFilterExaminesOutboundMessages |
143                                kFilterExaminesInboundMessages>();
144 
145 absl::StatusOr<std::unique_ptr<ClientMessageSizeFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)146 ClientMessageSizeFilter::Create(const ChannelArgs& args, ChannelFilter::Args) {
147   return std::make_unique<ClientMessageSizeFilter>(args);
148 }
149 
150 absl::StatusOr<std::unique_ptr<ServerMessageSizeFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)151 ServerMessageSizeFilter::Create(const ChannelArgs& args, ChannelFilter::Args) {
152   return std::make_unique<ServerMessageSizeFilter>(args);
153 }
154 
155 namespace {
CheckPayload(const Message & msg,absl::optional<uint32_t> max_length,bool is_client,bool is_send)156 ServerMetadataHandle CheckPayload(const Message& msg,
157                                   absl::optional<uint32_t> max_length,
158                                   bool is_client, bool is_send) {
159   if (!max_length.has_value()) return nullptr;
160   GRPC_TRACE_LOG(call, INFO)
161       << GetContext<Activity>()->DebugTag() << "[message_size] "
162       << (is_send ? "send" : "recv") << " len:" << msg.payload()->Length()
163       << " max:" << *max_length;
164   if (msg.payload()->Length() <= *max_length) return nullptr;
165   return ServerMetadataFromStatus(
166       GRPC_STATUS_RESOURCE_EXHAUSTED,
167       absl::StrFormat("%s: %s message larger than max (%u vs. %d)",
168                       is_client ? "CLIENT" : "SERVER",
169                       is_send ? "Sent" : "Received", msg.payload()->Length(),
170                       *max_length));
171 }
172 }  // namespace
173 
Call(ClientMessageSizeFilter * filter)174 ClientMessageSizeFilter::Call::Call(ClientMessageSizeFilter* filter)
175     : limits_(filter->parsed_config_) {
176   // Get max sizes from channel data, then merge in per-method config values.
177   // Note: Per-method config is only available on the client, so we
178   // apply the max request size to the send limit and the max response
179   // size to the receive limit.
180   const MessageSizeParsedConfig* config_from_call_context =
181       MessageSizeParsedConfig::GetFromCallContext(
182           GetContext<Arena>(), filter->service_config_parser_index_);
183   if (config_from_call_context != nullptr) {
184     absl::optional<uint32_t> max_send_size = limits_.max_send_size();
185     absl::optional<uint32_t> max_recv_size = limits_.max_recv_size();
186     if (config_from_call_context->max_send_size().has_value() &&
187         (!max_send_size.has_value() ||
188          *config_from_call_context->max_send_size() < *max_send_size)) {
189       max_send_size = config_from_call_context->max_send_size();
190     }
191     if (config_from_call_context->max_recv_size().has_value() &&
192         (!max_recv_size.has_value() ||
193          *config_from_call_context->max_recv_size() < *max_recv_size)) {
194       max_recv_size = config_from_call_context->max_recv_size();
195     }
196     limits_ = MessageSizeParsedConfig(max_send_size, max_recv_size);
197   }
198 }
199 
OnClientToServerMessage(const Message & message,ServerMessageSizeFilter * filter)200 ServerMetadataHandle ServerMessageSizeFilter::Call::OnClientToServerMessage(
201     const Message& message, ServerMessageSizeFilter* filter) {
202   GRPC_LATENT_SEE_INNER_SCOPE(
203       "ServerMessageSizeFilter::Call::OnClientToServerMessage");
204   return CheckPayload(message, filter->parsed_config_.max_recv_size(),
205                       /*is_client=*/false, false);
206 }
207 
OnServerToClientMessage(const Message & message,ServerMessageSizeFilter * filter)208 ServerMetadataHandle ServerMessageSizeFilter::Call::OnServerToClientMessage(
209     const Message& message, ServerMessageSizeFilter* filter) {
210   GRPC_LATENT_SEE_INNER_SCOPE(
211       "ServerMessageSizeFilter::Call::OnServerToClientMessage");
212   return CheckPayload(message, filter->parsed_config_.max_send_size(),
213                       /*is_client=*/false, true);
214 }
215 
OnClientToServerMessage(const Message & message)216 ServerMetadataHandle ClientMessageSizeFilter::Call::OnClientToServerMessage(
217     const Message& message) {
218   GRPC_LATENT_SEE_INNER_SCOPE(
219       "ClientMessageSizeFilter::Call::OnClientToServerMessage");
220   return CheckPayload(message, limits_.max_send_size(), /*is_client=*/true,
221                       true);
222 }
223 
OnServerToClientMessage(const Message & message)224 ServerMetadataHandle ClientMessageSizeFilter::Call::OnServerToClientMessage(
225     const Message& message) {
226   GRPC_LATENT_SEE_INNER_SCOPE(
227       "ClientMessageSizeFilter::Call::OnServerToClientMessage");
228   return CheckPayload(message, limits_.max_recv_size(), /*is_client=*/true,
229                       false);
230 }
231 
232 namespace {
233 // Used for GRPC_CLIENT_DIRECT_CHANNEL and GRPC_SERVER_CHANNEL. Adds the
234 // filter only if message size limits or service config is specified.
HasMessageSizeLimits(const ChannelArgs & channel_args)235 bool HasMessageSizeLimits(const ChannelArgs& channel_args) {
236   MessageSizeParsedConfig limits =
237       MessageSizeParsedConfig::GetFromChannelArgs(channel_args);
238   return limits.max_send_size().has_value() ||
239          limits.max_recv_size().has_value() ||
240          channel_args.GetString(GRPC_ARG_SERVICE_CONFIG).has_value();
241 }
242 
243 }  // namespace
RegisterMessageSizeFilter(CoreConfiguration::Builder * builder)244 void RegisterMessageSizeFilter(CoreConfiguration::Builder* builder) {
245   MessageSizeParser::Register(builder);
246   builder->channel_init()
247       ->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_SUBCHANNEL)
248       .ExcludeFromMinimalStack();
249   builder->channel_init()
250       ->RegisterFilter<ClientMessageSizeFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
251       .ExcludeFromMinimalStack()
252       .If(HasMessageSizeLimits);
253   builder->channel_init()
254       ->RegisterFilter<ServerMessageSizeFilter>(GRPC_SERVER_CHANNEL)
255       .ExcludeFromMinimalStack()
256       .If(HasMessageSizeLimits);
257 }
258 }  // namespace grpc_core
259