1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/server/server_config_selector_filter.h"
16
17 #include <grpc/support/port_platform.h>
18
19 #include <functional>
20 #include <memory>
21 #include <utility>
22
23 #include "absl/base/thread_annotations.h"
24 #include "absl/log/check.h"
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 #include "absl/types/optional.h"
28 #include "src/core/lib/channel/channel_args.h"
29 #include "src/core/lib/channel/promise_based_filter.h"
30 #include "src/core/lib/event_engine/event_engine_context.h"
31 #include "src/core/lib/promise/arena_promise.h"
32 #include "src/core/lib/promise/context.h"
33 #include "src/core/lib/promise/promise.h"
34 #include "src/core/lib/resource_quota/arena.h"
35 #include "src/core/lib/transport/metadata_batch.h"
36 #include "src/core/lib/transport/transport.h"
37 #include "src/core/server/server_config_selector.h"
38 #include "src/core/service_config/service_config.h"
39 #include "src/core/service_config/service_config_call_data.h"
40 #include "src/core/util/latent_see.h"
41 #include "src/core/util/ref_counted_ptr.h"
42 #include "src/core/util/status_helper.h"
43 #include "src/core/util/sync.h"
44
45 namespace grpc_core {
46
47 namespace {
48
49 class ServerConfigSelectorFilter final
50 : public ImplementChannelFilter<ServerConfigSelectorFilter>,
51 public InternallyRefCounted<ServerConfigSelectorFilter> {
52 public:
53 explicit ServerConfigSelectorFilter(
54 RefCountedPtr<ServerConfigSelectorProvider>
55 server_config_selector_provider);
56
TypeName()57 static absl::string_view TypeName() {
58 return "server_config_selector_filter";
59 }
60
61 ServerConfigSelectorFilter(const ServerConfigSelectorFilter&) = delete;
62 ServerConfigSelectorFilter& operator=(const ServerConfigSelectorFilter&) =
63 delete;
64
65 static absl::StatusOr<OrphanablePtr<ServerConfigSelectorFilter>> Create(
66 const ChannelArgs& args, ChannelFilter::Args);
67
68 void Orphan() override;
69
70 class Call {
71 public:
72 absl::Status OnClientInitialMetadata(ClientMetadata& md,
73 ServerConfigSelectorFilter* filter);
74 static const NoInterceptor OnServerInitialMetadata;
75 static const NoInterceptor OnServerTrailingMetadata;
76 static const NoInterceptor OnClientToServerMessage;
77 static const NoInterceptor OnClientToServerHalfClose;
78 static const NoInterceptor OnServerToClientMessage;
79 static const NoInterceptor OnFinalize;
80 };
81
config_selector()82 absl::StatusOr<RefCountedPtr<ServerConfigSelector>> config_selector() {
83 MutexLock lock(&mu_);
84 return config_selector_.value();
85 }
86
87 private:
88 class ServerConfigSelectorWatcher
89 : public ServerConfigSelectorProvider::ServerConfigSelectorWatcher {
90 public:
ServerConfigSelectorWatcher(RefCountedPtr<ServerConfigSelectorFilter> filter)91 explicit ServerConfigSelectorWatcher(
92 RefCountedPtr<ServerConfigSelectorFilter> filter)
93 : filter_(filter) {}
OnServerConfigSelectorUpdate(absl::StatusOr<RefCountedPtr<ServerConfigSelector>> update)94 void OnServerConfigSelectorUpdate(
95 absl::StatusOr<RefCountedPtr<ServerConfigSelector>> update) override {
96 MutexLock lock(&filter_->mu_);
97 filter_->config_selector_ = std::move(update);
98 }
99
100 private:
101 RefCountedPtr<ServerConfigSelectorFilter> filter_;
102 };
103
104 RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider_;
105 Mutex mu_;
106 absl::optional<absl::StatusOr<RefCountedPtr<ServerConfigSelector>>>
107 config_selector_ ABSL_GUARDED_BY(mu_);
108 };
109
110 absl::StatusOr<OrphanablePtr<ServerConfigSelectorFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args)111 ServerConfigSelectorFilter::Create(const ChannelArgs& args,
112 ChannelFilter::Args) {
113 ServerConfigSelectorProvider* server_config_selector_provider =
114 args.GetObject<ServerConfigSelectorProvider>();
115 if (server_config_selector_provider == nullptr) {
116 return absl::UnknownError("No ServerConfigSelectorProvider object found");
117 }
118 return MakeOrphanable<ServerConfigSelectorFilter>(
119 server_config_selector_provider->Ref());
120 }
121
ServerConfigSelectorFilter(RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider)122 ServerConfigSelectorFilter::ServerConfigSelectorFilter(
123 RefCountedPtr<ServerConfigSelectorProvider> server_config_selector_provider)
124 : server_config_selector_provider_(
125 std::move(server_config_selector_provider)) {
126 CHECK(server_config_selector_provider_ != nullptr);
127 auto server_config_selector_watcher =
128 std::make_unique<ServerConfigSelectorWatcher>(Ref());
129 auto config_selector = server_config_selector_provider_->Watch(
130 std::move(server_config_selector_watcher));
131 MutexLock lock(&mu_);
132 // It's possible for the watcher to have already updated config_selector_
133 if (!config_selector_.has_value()) {
134 config_selector_ = std::move(config_selector);
135 }
136 }
137
Orphan()138 void ServerConfigSelectorFilter::Orphan() {
139 if (server_config_selector_provider_ != nullptr) {
140 server_config_selector_provider_->CancelWatch();
141 }
142 Unref();
143 }
144
OnClientInitialMetadata(ClientMetadata & md,ServerConfigSelectorFilter * filter)145 absl::Status ServerConfigSelectorFilter::Call::OnClientInitialMetadata(
146 ClientMetadata& md, ServerConfigSelectorFilter* filter) {
147 GRPC_LATENT_SEE_INNER_SCOPE(
148 "ServerConfigSelectorFilter::Call::OnClientInitialMetadata");
149 auto sel = filter->config_selector();
150 if (!sel.ok()) return sel.status();
151 auto call_config = sel.value()->GetCallConfig(&md);
152 if (!call_config.ok()) {
153 return absl::UnavailableError(StatusToString(call_config.status()));
154 }
155 auto* service_config_call_data =
156 GetContext<Arena>()->New<ServiceConfigCallData>(GetContext<Arena>());
157 service_config_call_data->SetServiceConfig(
158 std::move(call_config->service_config), call_config->method_configs);
159 return absl::OkStatus();
160 }
161
162 const NoInterceptor ServerConfigSelectorFilter::Call::OnServerInitialMetadata;
163 const NoInterceptor ServerConfigSelectorFilter::Call::OnServerTrailingMetadata;
164 const NoInterceptor ServerConfigSelectorFilter::Call::OnClientToServerMessage;
165 const NoInterceptor ServerConfigSelectorFilter::Call::OnClientToServerHalfClose;
166 const NoInterceptor ServerConfigSelectorFilter::Call::OnServerToClientMessage;
167 const NoInterceptor ServerConfigSelectorFilter::Call::OnFinalize;
168
169 } // namespace
170
171 const grpc_channel_filter kServerConfigSelectorFilter =
172 MakePromiseBasedFilter<ServerConfigSelectorFilter,
173 FilterEndpoint::kServer>();
174
175 } // namespace grpc_core
176