1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <benchmark/benchmark.h>
16 #include <grpc/grpc.h>
17
18 #include <memory>
19
20 #include "absl/strings/string_view.h"
21 #include "src/core/client_channel/subchannel_interface_internal.h"
22 #include "src/core/config/core_configuration.h"
23 #include "src/core/lib/address_utils/parse_address.h"
24 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
25 #include "src/core/lib/event_engine/default_event_engine.h"
26 #include "src/core/lib/transport/connectivity_state.h"
27 #include "src/core/load_balancing/health_check_client_internal.h"
28 #include "src/core/load_balancing/lb_policy.h"
29 #include "src/core/util/json/json_reader.h"
30 #include "test/core/test_util/build.h"
31
32 namespace grpc_core {
33 namespace {
34
IsSlowBuild()35 bool IsSlowBuild() {
36 return BuiltUnderMsan() || BuiltUnderUbsan() || BuiltUnderTsan();
37 }
38
39 class BenchmarkHelper : public std::enable_shared_from_this<BenchmarkHelper> {
40 public:
BenchmarkHelper(absl::string_view name,absl::string_view config)41 BenchmarkHelper(absl::string_view name, absl::string_view config)
42 : name_(name), config_json_(config) {
43 CHECK(lb_policy_ != nullptr) << "Failed to create LB policy: " << name;
44 auto parsed_json = JsonParse(std::string(config_json_));
45 CHECK_OK(parsed_json);
46 auto config_parsed =
47 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
48 *parsed_json);
49 CHECK_OK(config_parsed);
50 config_ = std::move(*config_parsed);
51 }
52
GetPicker()53 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> GetPicker() {
54 MutexLock lock(&mu_);
55 while (picker_ == nullptr) {
56 cv_.Wait(&mu_);
57 }
58 return picker_;
59 }
60
UpdateLbPolicy(size_t num_endpoints)61 void UpdateLbPolicy(size_t num_endpoints) {
62 {
63 MutexLock lock(&mu_);
64 picker_ = nullptr;
65 work_serializer_->Schedule(
66 [this, num_endpoints]() {
67 EndpointAddressesList addresses;
68 for (size_t i = 0; i < num_endpoints; i++) {
69 grpc_resolved_address addr;
70 int port = i % 65536;
71 int ip = i / 65536;
72 CHECK_LT(ip, 256);
73 CHECK(grpc_parse_uri(
74 URI::Parse(absl::StrCat("ipv4:127.0.0.", ip, ":", port))
75 .value(),
76 &addr));
77 addresses.emplace_back(addr, ChannelArgs());
78 }
79 CHECK_OK(lb_policy_->UpdateLocked(LoadBalancingPolicy::UpdateArgs{
80 std::make_shared<EndpointAddressesListIterator>(
81 std::move(addresses)),
82 config_, "", ChannelArgs()}));
83 },
84 DEBUG_LOCATION);
85 }
86 work_serializer_->DrainQueue();
87 }
88
89 private:
90 class SubchannelFake final : public SubchannelInterface {
91 public:
SubchannelFake(BenchmarkHelper * helper)92 explicit SubchannelFake(BenchmarkHelper* helper) : helper_(helper) {}
93
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> unique_watcher)94 void WatchConnectivityState(
95 std::unique_ptr<ConnectivityStateWatcherInterface> unique_watcher)
96 override {
97 AddConnectivityWatcherInternal(
98 std::shared_ptr<ConnectivityStateWatcherInterface>(
99 std::move(unique_watcher)));
100 }
101
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)102 void CancelConnectivityStateWatch(
103 ConnectivityStateWatcherInterface* watcher) override {
104 MutexLock lock(&helper_->mu_);
105 helper_->connectivity_watchers_.erase(watcher);
106 }
107
RequestConnection()108 void RequestConnection() override { LOG(FATAL) << "unimplemented"; }
109
ResetBackoff()110 void ResetBackoff() override { LOG(FATAL) << "unimplemented"; }
111
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)112 void AddDataWatcher(
113 std::unique_ptr<DataWatcherInterface> watcher) override {
114 auto* watcher_internal =
115 DownCast<InternalSubchannelDataWatcherInterface*>(watcher.get());
116 if (watcher_internal->type() == HealthProducer::Type()) {
117 AddConnectivityWatcherInternal(
118 DownCast<HealthWatcher*>(watcher_internal)->TakeWatcher());
119 } else {
120 LOG(FATAL) << "unimplemented watcher type: "
121 << watcher_internal->type();
122 }
123 }
124
CancelDataWatcher(DataWatcherInterface * watcher)125 void CancelDataWatcher(DataWatcherInterface* watcher) override {}
126
address() const127 std::string address() const override { return "test"; }
128
129 private:
AddConnectivityWatcherInternal(std::shared_ptr<ConnectivityStateWatcherInterface> watcher)130 void AddConnectivityWatcherInternal(
131 std::shared_ptr<ConnectivityStateWatcherInterface> watcher) {
132 {
133 MutexLock lock(&helper_->mu_);
134 helper_->work_serializer_->Schedule(
135 [watcher]() {
136 watcher->OnConnectivityStateChange(GRPC_CHANNEL_READY,
137 absl::OkStatus());
138 },
139 DEBUG_LOCATION);
140 helper_->connectivity_watchers_.insert(std::move(watcher));
141 }
142 helper_->work_serializer_->DrainQueue();
143 }
144
145 BenchmarkHelper* helper_;
146 };
147
148 class LbHelper final : public LoadBalancingPolicy::ChannelControlHelper {
149 public:
LbHelper(BenchmarkHelper * helper)150 explicit LbHelper(BenchmarkHelper* helper) : helper_(helper) {}
151
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)152 RefCountedPtr<SubchannelInterface> CreateSubchannel(
153 const grpc_resolved_address& address,
154 const ChannelArgs& per_address_args, const ChannelArgs& args) override {
155 return MakeRefCounted<SubchannelFake>(helper_);
156 }
157
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)158 void UpdateState(
159 grpc_connectivity_state state, const absl::Status& status,
160 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
161 MutexLock lock(&helper_->mu_);
162 helper_->picker_ = std::move(picker);
163 helper_->cv_.SignalAll();
164 }
165
RequestReresolution()166 void RequestReresolution() override { LOG(FATAL) << "unimplemented"; }
167
GetTarget()168 absl::string_view GetTarget() override { return "foo"; }
169
GetAuthority()170 absl::string_view GetAuthority() override { return "foo"; }
171
GetChannelCredentials()172 RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
173 LOG(FATAL) << "unimplemented";
174 }
175
GetUnsafeChannelCredentials()176 RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
177 override {
178 LOG(FATAL) << "unimplemented";
179 }
180
GetEventEngine()181 grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
182 return helper_->event_engine_.get();
183 }
184
GetStatsPluginGroup()185 GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup()
186 override {
187 return helper_->stats_plugin_group_;
188 }
189
AddTraceEvent(TraceSeverity severity,absl::string_view message)190 void AddTraceEvent(TraceSeverity severity,
191 absl::string_view message) override {
192 LOG(FATAL) << "unimplemented";
193 }
194
195 BenchmarkHelper* helper_;
196 };
197
198 const absl::string_view name_;
199 const absl::string_view config_json_;
200 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
201 grpc_event_engine::experimental::GetDefaultEventEngine();
202 std::shared_ptr<WorkSerializer> work_serializer_ =
203 std::make_shared<WorkSerializer>(event_engine_);
204 OrphanablePtr<LoadBalancingPolicy> lb_policy_ =
205 CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
206 name_, LoadBalancingPolicy::Args{work_serializer_,
207 std::make_unique<LbHelper>(this),
208 ChannelArgs()});
209 RefCountedPtr<LoadBalancingPolicy::Config> config_;
210 Mutex mu_;
211 CondVar cv_;
212 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
213 ABSL_GUARDED_BY(mu_);
214 absl::flat_hash_set<
215 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>>
216 connectivity_watchers_ ABSL_GUARDED_BY(mu_);
217 GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_ =
218 GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
219 experimental::StatsPluginChannelScope(
220 "foo", "foo",
221 grpc_event_engine::experimental::ChannelArgsEndpointConfig{
222 ChannelArgs{}}));
223 };
224
BM_Pick(benchmark::State & state,BenchmarkHelper & helper)225 void BM_Pick(benchmark::State& state, BenchmarkHelper& helper) {
226 helper.UpdateLbPolicy(state.range(0));
227 auto picker = helper.GetPicker();
228 for (auto _ : state) {
229 picker->Pick(LoadBalancingPolicy::PickArgs{
230 "/foo/bar",
231 nullptr,
232 nullptr,
233 });
234 }
235 }
236 #define PICKER_BENCHMARK(policy, config) \
237 BENCHMARK_CAPTURE(BM_Pick, policy, \
238 []() -> BenchmarkHelper& { \
239 static auto* helper = \
240 new BenchmarkHelper(#policy, config); \
241 return *helper; \
242 }()) \
243 ->RangeMultiplier(10) \
244 ->Range(1, IsSlowBuild() ? 1000 : 100000)
245
246 PICKER_BENCHMARK(pick_first, "[{\"pick_first\":{}}]");
247 PICKER_BENCHMARK(
248 weighted_round_robin,
249 "[{\"weighted_round_robin\":{\"enableOobLoadReport\":false}}]");
250
251 } // namespace
252 } // namespace grpc_core
253
254 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
255 // and others do not. This allows us to support both modes.
256 namespace benchmark {
RunTheBenchmarksNamespaced()257 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
258 } // namespace benchmark
259
main(int argc,char ** argv)260 int main(int argc, char** argv) {
261 ::benchmark::Initialize(&argc, argv);
262 grpc_init();
263 benchmark::RunTheBenchmarksNamespaced();
264 grpc_shutdown();
265 return 0;
266 }
267