• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <benchmark/benchmark.h>
16 #include <grpc/grpc.h>
17 
18 #include <memory>
19 
20 #include "absl/strings/string_view.h"
21 #include "src/core/client_channel/subchannel_interface_internal.h"
22 #include "src/core/config/core_configuration.h"
23 #include "src/core/lib/address_utils/parse_address.h"
24 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
25 #include "src/core/lib/event_engine/default_event_engine.h"
26 #include "src/core/lib/transport/connectivity_state.h"
27 #include "src/core/load_balancing/health_check_client_internal.h"
28 #include "src/core/load_balancing/lb_policy.h"
29 #include "src/core/util/json/json_reader.h"
30 #include "test/core/test_util/build.h"
31 
32 namespace grpc_core {
33 namespace {
34 
IsSlowBuild()35 bool IsSlowBuild() {
36   return BuiltUnderMsan() || BuiltUnderUbsan() || BuiltUnderTsan();
37 }
38 
39 class BenchmarkHelper : public std::enable_shared_from_this<BenchmarkHelper> {
40  public:
BenchmarkHelper(absl::string_view name,absl::string_view config)41   BenchmarkHelper(absl::string_view name, absl::string_view config)
42       : name_(name), config_json_(config) {
43     CHECK(lb_policy_ != nullptr) << "Failed to create LB policy: " << name;
44     auto parsed_json = JsonParse(std::string(config_json_));
45     CHECK_OK(parsed_json);
46     auto config_parsed =
47         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
48             *parsed_json);
49     CHECK_OK(config_parsed);
50     config_ = std::move(*config_parsed);
51   }
52 
GetPicker()53   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> GetPicker() {
54     MutexLock lock(&mu_);
55     while (picker_ == nullptr) {
56       cv_.Wait(&mu_);
57     }
58     return picker_;
59   }
60 
UpdateLbPolicy(size_t num_endpoints)61   void UpdateLbPolicy(size_t num_endpoints) {
62     {
63       MutexLock lock(&mu_);
64       picker_ = nullptr;
65       work_serializer_->Schedule(
66           [this, num_endpoints]() {
67             EndpointAddressesList addresses;
68             for (size_t i = 0; i < num_endpoints; i++) {
69               grpc_resolved_address addr;
70               int port = i % 65536;
71               int ip = i / 65536;
72               CHECK_LT(ip, 256);
73               CHECK(grpc_parse_uri(
74                   URI::Parse(absl::StrCat("ipv4:127.0.0.", ip, ":", port))
75                       .value(),
76                   &addr));
77               addresses.emplace_back(addr, ChannelArgs());
78             }
79             CHECK_OK(lb_policy_->UpdateLocked(LoadBalancingPolicy::UpdateArgs{
80                 std::make_shared<EndpointAddressesListIterator>(
81                     std::move(addresses)),
82                 config_, "", ChannelArgs()}));
83           },
84           DEBUG_LOCATION);
85     }
86     work_serializer_->DrainQueue();
87   }
88 
89  private:
90   class SubchannelFake final : public SubchannelInterface {
91    public:
SubchannelFake(BenchmarkHelper * helper)92     explicit SubchannelFake(BenchmarkHelper* helper) : helper_(helper) {}
93 
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> unique_watcher)94     void WatchConnectivityState(
95         std::unique_ptr<ConnectivityStateWatcherInterface> unique_watcher)
96         override {
97       AddConnectivityWatcherInternal(
98           std::shared_ptr<ConnectivityStateWatcherInterface>(
99               std::move(unique_watcher)));
100     }
101 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)102     void CancelConnectivityStateWatch(
103         ConnectivityStateWatcherInterface* watcher) override {
104       MutexLock lock(&helper_->mu_);
105       helper_->connectivity_watchers_.erase(watcher);
106     }
107 
RequestConnection()108     void RequestConnection() override { LOG(FATAL) << "unimplemented"; }
109 
ResetBackoff()110     void ResetBackoff() override { LOG(FATAL) << "unimplemented"; }
111 
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)112     void AddDataWatcher(
113         std::unique_ptr<DataWatcherInterface> watcher) override {
114       auto* watcher_internal =
115           DownCast<InternalSubchannelDataWatcherInterface*>(watcher.get());
116       if (watcher_internal->type() == HealthProducer::Type()) {
117         AddConnectivityWatcherInternal(
118             DownCast<HealthWatcher*>(watcher_internal)->TakeWatcher());
119       } else {
120         LOG(FATAL) << "unimplemented watcher type: "
121                    << watcher_internal->type();
122       }
123     }
124 
CancelDataWatcher(DataWatcherInterface * watcher)125     void CancelDataWatcher(DataWatcherInterface* watcher) override {}
126 
address() const127     std::string address() const override { return "test"; }
128 
129    private:
AddConnectivityWatcherInternal(std::shared_ptr<ConnectivityStateWatcherInterface> watcher)130     void AddConnectivityWatcherInternal(
131         std::shared_ptr<ConnectivityStateWatcherInterface> watcher) {
132       {
133         MutexLock lock(&helper_->mu_);
134         helper_->work_serializer_->Schedule(
135             [watcher]() {
136               watcher->OnConnectivityStateChange(GRPC_CHANNEL_READY,
137                                                  absl::OkStatus());
138             },
139             DEBUG_LOCATION);
140         helper_->connectivity_watchers_.insert(std::move(watcher));
141       }
142       helper_->work_serializer_->DrainQueue();
143     }
144 
145     BenchmarkHelper* helper_;
146   };
147 
148   class LbHelper final : public LoadBalancingPolicy::ChannelControlHelper {
149    public:
LbHelper(BenchmarkHelper * helper)150     explicit LbHelper(BenchmarkHelper* helper) : helper_(helper) {}
151 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)152     RefCountedPtr<SubchannelInterface> CreateSubchannel(
153         const grpc_resolved_address& address,
154         const ChannelArgs& per_address_args, const ChannelArgs& args) override {
155       return MakeRefCounted<SubchannelFake>(helper_);
156     }
157 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)158     void UpdateState(
159         grpc_connectivity_state state, const absl::Status& status,
160         RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
161       MutexLock lock(&helper_->mu_);
162       helper_->picker_ = std::move(picker);
163       helper_->cv_.SignalAll();
164     }
165 
RequestReresolution()166     void RequestReresolution() override { LOG(FATAL) << "unimplemented"; }
167 
GetTarget()168     absl::string_view GetTarget() override { return "foo"; }
169 
GetAuthority()170     absl::string_view GetAuthority() override { return "foo"; }
171 
GetChannelCredentials()172     RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
173       LOG(FATAL) << "unimplemented";
174     }
175 
GetUnsafeChannelCredentials()176     RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
177         override {
178       LOG(FATAL) << "unimplemented";
179     }
180 
GetEventEngine()181     grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
182       return helper_->event_engine_.get();
183     }
184 
GetStatsPluginGroup()185     GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup()
186         override {
187       return helper_->stats_plugin_group_;
188     }
189 
AddTraceEvent(TraceSeverity severity,absl::string_view message)190     void AddTraceEvent(TraceSeverity severity,
191                        absl::string_view message) override {
192       LOG(FATAL) << "unimplemented";
193     }
194 
195     BenchmarkHelper* helper_;
196   };
197 
198   const absl::string_view name_;
199   const absl::string_view config_json_;
200   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
201       grpc_event_engine::experimental::GetDefaultEventEngine();
202   std::shared_ptr<WorkSerializer> work_serializer_ =
203       std::make_shared<WorkSerializer>(event_engine_);
204   OrphanablePtr<LoadBalancingPolicy> lb_policy_ =
205       CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
206           name_, LoadBalancingPolicy::Args{work_serializer_,
207                                            std::make_unique<LbHelper>(this),
208                                            ChannelArgs()});
209   RefCountedPtr<LoadBalancingPolicy::Config> config_;
210   Mutex mu_;
211   CondVar cv_;
212   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
213       ABSL_GUARDED_BY(mu_);
214   absl::flat_hash_set<
215       std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>>
216       connectivity_watchers_ ABSL_GUARDED_BY(mu_);
217   GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_ =
218       GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
219           experimental::StatsPluginChannelScope(
220               "foo", "foo",
221               grpc_event_engine::experimental::ChannelArgsEndpointConfig{
222                   ChannelArgs{}}));
223 };
224 
BM_Pick(benchmark::State & state,BenchmarkHelper & helper)225 void BM_Pick(benchmark::State& state, BenchmarkHelper& helper) {
226   helper.UpdateLbPolicy(state.range(0));
227   auto picker = helper.GetPicker();
228   for (auto _ : state) {
229     picker->Pick(LoadBalancingPolicy::PickArgs{
230         "/foo/bar",
231         nullptr,
232         nullptr,
233     });
234   }
235 }
236 #define PICKER_BENCHMARK(policy, config)                        \
237   BENCHMARK_CAPTURE(BM_Pick, policy,                            \
238                     []() -> BenchmarkHelper& {                  \
239                       static auto* helper =                     \
240                           new BenchmarkHelper(#policy, config); \
241                       return *helper;                           \
242                     }())                                        \
243       ->RangeMultiplier(10)                                     \
244       ->Range(1, IsSlowBuild() ? 1000 : 100000)
245 
246 PICKER_BENCHMARK(pick_first, "[{\"pick_first\":{}}]");
247 PICKER_BENCHMARK(
248     weighted_round_robin,
249     "[{\"weighted_round_robin\":{\"enableOobLoadReport\":false}}]");
250 
251 }  // namespace
252 }  // namespace grpc_core
253 
254 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
255 // and others do not. This allows us to support both modes.
256 namespace benchmark {
RunTheBenchmarksNamespaced()257 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
258 }  // namespace benchmark
259 
main(int argc,char ** argv)260 int main(int argc, char** argv) {
261   ::benchmark::Initialize(&argc, argv);
262   grpc_init();
263   benchmark::RunTheBenchmarksNamespaced();
264   grpc_shutdown();
265   return 0;
266 }
267