• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/server/server_config_selector_filter.h"
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <functional>
20 #include <memory>
21 #include <utility>
22 
23 #include "absl/base/thread_annotations.h"
24 #include "absl/log/check.h"
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/types/optional.h"
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/channel/promise_based_filter.h"
30 #include "src/core/lib/event_engine/event_engine_context.h"
31 #include "src/core/lib/promise/arena_promise.h"
32 #include "src/core/lib/promise/context.h"
33 #include "src/core/lib/promise/promise.h"
34 #include "src/core/lib/resource_quota/arena.h"
35 #include "src/core/lib/transport/metadata_batch.h"
36 #include "src/core/lib/transport/transport.h"
37 #include "src/core/server/server_config_selector.h"
38 #include "src/core/service_config/service_config.h"
39 #include "src/core/service_config/service_config_call_data.h"
40 #include "src/core/util/latent_see.h"
41 #include "src/core/util/ref_counted_ptr.h"
42 #include "src/core/util/status_helper.h"
43 #include "src/core/util/sync.h"
44 
45 namespace grpc_core {
46 
47 namespace {
48 
49 class ServerConfigSelectorFilter final
50     : public ImplementChannelFilter<ServerConfigSelectorFilter>,
51       public InternallyRefCounted<ServerConfigSelectorFilter> {
52  public:
53   explicit ServerConfigSelectorFilter(
54       RefCountedPtr<ServerConfigSelectorProvider>
55           server_config_selector_provider);
56 
TypeName()57   static absl::string_view TypeName() {
58     return "server_config_selector_filter";
59   }
60 
61   ServerConfigSelectorFilter(const ServerConfigSelectorFilter&) = delete;
62   ServerConfigSelectorFilter& operator=(const ServerConfigSelectorFilter&) =
63       delete;
64 
65   static absl::StatusOr<OrphanablePtr<ServerConfigSelectorFilter>> Create(
66       const ChannelArgs& args, ChannelFilter::Args);
67 
68   void Orphan() override;
69 
70   class Call {
71    public:
72     absl::Status OnClientInitialMetadata(ClientMetadata& md,
73                                          ServerConfigSelectorFilter* filter);
74     static const NoInterceptor OnServerInitialMetadata;
75     static const NoInterceptor OnServerTrailingMetadata;
76     static const NoInterceptor OnClientToServerMessage;
77     static const NoInterceptor OnClientToServerHalfClose;
78     static const NoInterceptor OnServerToClientMessage;
79     static const NoInterceptor OnFinalize;
80   };
81 
config_selector()82   absl::StatusOr<RefCountedPtr<ServerConfigSelector>> config_selector() {
83     MutexLock lock(&mu_);
84     return config_selector_.value();
85   }
86 
87  private:
88   class ServerConfigSelectorWatcher
89       : public ServerConfigSelectorProvider::ServerConfigSelectorWatcher {
90    public:
ServerConfigSelectorWatcher(RefCountedPtr<ServerConfigSelectorFilter> filter)91     explicit ServerConfigSelectorWatcher(
92         RefCountedPtr<ServerConfigSelectorFilter> filter)
93         : filter_(filter) {}
OnServerConfigSelectorUpdate(absl::StatusOr<RefCountedPtr<ServerConfigSelector>> update)94     void OnServerConfigSelectorUpdate(
95         absl::StatusOr<RefCountedPtr<ServerConfigSelector>> update) override {
96       MutexLock lock(&filter_->mu_);
97       filter_->config_selector_ = std::move(update);
98     }
99 
100    private:
101     RefCountedPtr<ServerConfigSelectorFilter> filter_;
102   };
103 
104   RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider_;
105   Mutex mu_;
106   absl::optional<absl::StatusOr<RefCountedPtr<ServerConfigSelector>>>
107       config_selector_ ABSL_GUARDED_BY(mu_);
108 };
109 
110 absl::StatusOr<OrphanablePtr<ServerConfigSelectorFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)111 ServerConfigSelectorFilter::Create(const ChannelArgs& args,
112                                    ChannelFilter::Args) {
113   ServerConfigSelectorProvider* server_config_selector_provider =
114       args.GetObject<ServerConfigSelectorProvider>();
115   if (server_config_selector_provider == nullptr) {
116     return absl::UnknownError("No ServerConfigSelectorProvider object found");
117   }
118   return MakeOrphanable<ServerConfigSelectorFilter>(
119       server_config_selector_provider->Ref());
120 }
121 
ServerConfigSelectorFilter(RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider)122 ServerConfigSelectorFilter::ServerConfigSelectorFilter(
123     RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider)
124     : server_config_selector_provider_(
125           std::move(server_config_selector_provider)) {
126   CHECK(server_config_selector_provider_ != nullptr);
127   auto server_config_selector_watcher =
128       std::make_unique<ServerConfigSelectorWatcher>(Ref());
129   auto config_selector = server_config_selector_provider_->Watch(
130       std::move(server_config_selector_watcher));
131   MutexLock lock(&mu_);
132   // It's possible for the watcher to have already updated config_selector_
133   if (!config_selector_.has_value()) {
134     config_selector_ = std::move(config_selector);
135   }
136 }
137 
Orphan()138 void ServerConfigSelectorFilter::Orphan() {
139   if (server_config_selector_provider_ != nullptr) {
140     server_config_selector_provider_->CancelWatch();
141   }
142   Unref();
143 }
144 
OnClientInitialMetadata(ClientMetadata & md,ServerConfigSelectorFilter * filter)145 absl::Status ServerConfigSelectorFilter::Call::OnClientInitialMetadata(
146     ClientMetadata& md, ServerConfigSelectorFilter* filter) {
147   GRPC_LATENT_SEE_INNER_SCOPE(
148       "ServerConfigSelectorFilter::Call::OnClientInitialMetadata");
149   auto sel = filter->config_selector();
150   if (!sel.ok()) return sel.status();
151   auto call_config = sel.value()->GetCallConfig(&md);
152   if (!call_config.ok()) {
153     return absl::UnavailableError(StatusToString(call_config.status()));
154   }
155   auto* service_config_call_data =
156       GetContext<Arena>()->New<ServiceConfigCallData>(GetContext<Arena>());
157   service_config_call_data->SetServiceConfig(
158       std::move(call_config->service_config), call_config->method_configs);
159   return absl::OkStatus();
160 }
161 
162 const NoInterceptor ServerConfigSelectorFilter::Call::OnServerInitialMetadata;
163 const NoInterceptor ServerConfigSelectorFilter::Call::OnServerTrailingMetadata;
164 const NoInterceptor ServerConfigSelectorFilter::Call::OnClientToServerMessage;
165 const NoInterceptor ServerConfigSelectorFilter::Call::OnClientToServerHalfClose;
166 const NoInterceptor ServerConfigSelectorFilter::Call::OnServerToClientMessage;
167 const NoInterceptor ServerConfigSelectorFilter::Call::OnFinalize;
168 
169 }  // namespace
170 
171 const grpc_channel_filter kServerConfigSelectorFilter =
172     MakePromiseBasedFilter<ServerConfigSelectorFilter,
173                            FilterEndpoint::kServer>();
174 
175 }  // namespace grpc_core
176