• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H
16 #define GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H
17 
18 #include <memory>
19 
20 #include "benchmark/benchmark.h"
21 #include "src/core/lib/event_engine/default_event_engine.h"
22 #include "src/core/lib/event_engine/event_engine_context.h"
23 #include "src/core/lib/iomgr/exec_ctx.h"
24 #include "src/core/lib/promise/all_ok.h"
25 #include "src/core/lib/promise/map.h"
26 #include "src/core/lib/resource_quota/resource_quota.h"
27 #include "src/core/lib/transport/call_spine.h"
28 #include "src/core/lib/transport/transport.h"
29 #include "src/core/util/notification.h"
30 
31 namespace grpc_core {
32 
33 struct BenchmarkCall {
34   CallInitiator initiator;
35   CallHandler handler;
36 };
37 
38 // Unary call with one spawn on each end of the spine.
39 template <typename Fixture>
BM_UnaryWithSpawnPerEnd(benchmark::State & state)40 void BM_UnaryWithSpawnPerEnd(benchmark::State& state) {
41   Fixture fixture;
42   for (auto _ : state) {
43     Notification handler_done;
44     Notification initiator_done;
45     {
46       ExecCtx exec_ctx;
47       BenchmarkCall call = fixture.MakeCall();
48       call.handler.SpawnInfallible("handler", [handler = call.handler, &fixture,
49                                                &handler_done]() mutable {
50         handler.PushServerInitialMetadata(fixture.MakeServerInitialMetadata());
51         return Map(
52             AllOk<StatusFlag>(
53                 Map(handler.PullClientInitialMetadata(),
54                     [](ValueOrFailure<ClientMetadataHandle> md) {
55                       return md.status();
56                     }),
57                 Map(handler.PullMessage(),
58                     [](ClientToServerNextMessage msg) { return msg.status(); }),
59                 handler.PushMessage(fixture.MakePayload())),
60             [&handler_done, &fixture, handler](StatusFlag status) mutable {
61               CHECK(status.ok());
62               handler.PushServerTrailingMetadata(
63                   fixture.MakeServerTrailingMetadata());
64               handler_done.Notify();
65             });
66       });
67       call.initiator.SpawnInfallible("initiator", [initiator = call.initiator,
68                                                    &fixture,
69                                                    &initiator_done]() mutable {
70         return Map(
71             AllOk<StatusFlag>(
72                 Map(initiator.PushMessage(fixture.MakePayload()),
73                     [](StatusFlag) { return Success{}; }),
74                 Map(initiator.PullServerInitialMetadata(),
75                     [](absl::optional<ServerMetadataHandle> md) {
76                       return Success{};
77                     }),
78                 Map(initiator.PullMessage(),
79                     [](ServerToClientNextMessage msg) { return msg.status(); }),
80                 Map(initiator.PullServerTrailingMetadata(),
81                     [](ServerMetadataHandle) { return Success(); })),
82             [&initiator_done](StatusFlag result) {
83               CHECK(result.ok());
84               initiator_done.Notify();
85             });
86       });
87     }
88     handler_done.WaitForNotification();
89     initiator_done.WaitForNotification();
90   }
91 }
92 
93 template <typename Fixture>
BM_ClientToServerStreaming(benchmark::State & state)94 void BM_ClientToServerStreaming(benchmark::State& state) {
95   Fixture fixture;
96   BenchmarkCall call = fixture.MakeCall();
97   Notification handler_metadata_done;
98   Notification initiator_metadata_done;
99   call.handler.SpawnInfallible("handler-initial-metadata", [&]() {
100     return Map(call.handler.PullClientInitialMetadata(),
101                [&](ValueOrFailure<ClientMetadataHandle> md) {
102                  CHECK(md.ok());
103                  call.handler.PushServerInitialMetadata(
104                      fixture.MakeServerInitialMetadata());
105                  handler_metadata_done.Notify();
106                });
107   });
108   call.initiator.SpawnInfallible("initiator-initial-metadata", [&]() {
109     return Map(call.initiator.PullServerInitialMetadata(),
110                [&](absl::optional<ServerMetadataHandle> md) {
111                  CHECK(md.has_value());
112                  initiator_metadata_done.Notify();
113                });
114   });
115   handler_metadata_done.WaitForNotification();
116   initiator_metadata_done.WaitForNotification();
117   for (auto _ : state) {
118     Notification handler_done;
119     Notification initiator_done;
120     call.handler.SpawnInfallible("handler", [&]() {
121       return Map(call.handler.PullMessage(),
122                  [&](ClientToServerNextMessage msg) {
123                    CHECK(msg.ok());
124                    handler_done.Notify();
125                  });
126     });
127     call.initiator.SpawnInfallible("initiator", [&]() {
128       return Map(call.initiator.PushMessage(fixture.MakePayload()),
129                  [&](StatusFlag result) {
130                    CHECK(result.ok());
131                    initiator_done.Notify();
132                  });
133     });
134     handler_done.WaitForNotification();
135     initiator_done.WaitForNotification();
136   }
137   call.initiator.SpawnInfallible(
138       "done", [initiator = call.initiator]() mutable { initiator.Cancel(); });
139   call.handler.SpawnInfallible("done", [handler = call.handler]() mutable {
140     handler.PushServerTrailingMetadata(
141         CancelledServerMetadataFromStatus(GRPC_STATUS_CANCELLED));
142   });
143 }
144 
145 // Base class for fixtures that wrap a single filter.
146 // Traits should have MakeClientInitialMetadata, MakeServerInitialMetadata,
147 // MakePayload, MakeServerTrailingMetadata, MakeChannelArgs and a type named
148 // Filter.
149 template <class Traits>
150 class FilterFixture {
151  public:
MakeCall()152   BenchmarkCall MakeCall() {
153     auto arena = arena_allocator_->MakeArena();
154     arena->SetContext<grpc_event_engine::experimental::EventEngine>(
155         event_engine_.get());
156     auto p =
157         MakeCallPair(traits_.MakeClientInitialMetadata(), std::move(arena));
158     return {std::move(p.initiator), p.handler.StartCall()};
159   }
160 
MakeServerInitialMetadata()161   ServerMetadataHandle MakeServerInitialMetadata() {
162     return traits_.MakeServerInitialMetadata();
163   }
164 
MakePayload()165   MessageHandle MakePayload() { return traits_.MakePayload(); }
166 
MakeServerTrailingMetadata()167   ServerMetadataHandle MakeServerTrailingMetadata() {
168     return traits_.MakeServerTrailingMetadata();
169   }
170 
171  private:
172   Traits traits_;
173   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
174       grpc_event_engine::experimental::GetDefaultEventEngine();
175   RefCountedPtr<CallArenaAllocator> arena_allocator_ =
176       MakeRefCounted<CallArenaAllocator>(
177           ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
178               "test-allocator"),
179           1024);
180   const RefCountedPtr<CallFilters::Stack> stack_ = [this]() {
181     auto filter = Traits::Filter::Create(traits_.MakeChannelArgs(),
182                                          typename Traits::Filter::Args{});
183     CHECK(filter.ok());
184     CallFilters::StackBuilder builder;
185     builder.Add(filter->get());
186     builder.AddOwnedObject(std::move(*filter));
187     return builder.Build();
188   }();
189 };
190 
191 // Base class for fixtures that wrap an UnstartedCallDestination.
192 template <class Traits>
193 class UnstartedCallDestinationFixture {
194  public:
MakeCall()195   BenchmarkCall MakeCall() {
196     auto arena = arena_allocator_->MakeArena();
197     arena->SetContext<grpc_event_engine::experimental::EventEngine>(
198         event_engine_.get());
199     auto p =
200         MakeCallPair(traits_->MakeClientInitialMetadata(), std::move(arena));
201     p.handler.SpawnInfallible("initiator_setup", [&]() {
202       top_destination_->StartCall(std::move(p.handler));
203     });
204     auto handler = bottom_destination_->TakeHandler();
205     absl::optional<CallHandler> started_handler;
206     Notification started;
207     handler.SpawnInfallible("handler_setup", [&]() {
208       started_handler = handler.StartCall();
209       started.Notify();
210     });
211     started.WaitForNotification();
212     CHECK(started_handler.has_value());
213     return {std::move(p.initiator), std::move(*started_handler)};
214   }
215 
~UnstartedCallDestinationFixture()216   ~UnstartedCallDestinationFixture() {
217     // TODO(ctiller): entire destructor can be deleted once ExecCtx is gone.
218     ExecCtx exec_ctx;
219     top_destination_.reset();
220     bottom_destination_.reset();
221     arena_allocator_.reset();
222     event_engine_.reset();
223     traits_.reset();
224   }
225 
MakeServerInitialMetadata()226   ServerMetadataHandle MakeServerInitialMetadata() {
227     return traits_->MakeServerInitialMetadata();
228   }
229 
MakePayload()230   MessageHandle MakePayload() { return traits_->MakePayload(); }
231 
MakeServerTrailingMetadata()232   ServerMetadataHandle MakeServerTrailingMetadata() {
233     return traits_->MakeServerTrailingMetadata();
234   }
235 
236  private:
237   class SinkDestination : public UnstartedCallDestination {
238    public:
StartCall(UnstartedCallHandler handler)239     void StartCall(UnstartedCallHandler handler) override {
240       MutexLock lock(&mu_);
241       handler_ = std::move(handler);
242     }
Orphaned()243     void Orphaned() override {}
244 
TakeHandler()245     UnstartedCallHandler TakeHandler() {
246       mu_.LockWhen(absl::Condition(
247           +[](SinkDestination* dest) ABSL_EXCLUSIVE_LOCKS_REQUIRED(dest->mu_) {
248             return dest->handler_.has_value();
249           },
250           this));
251       auto h = std::move(*handler_);
252       handler_.reset();
253       mu_.Unlock();
254       return h;
255     }
256 
257    private:
258     absl::Mutex mu_;
259     absl::optional<UnstartedCallHandler> handler_ ABSL_GUARDED_BY(mu_);
260   };
261 
262   // TODO(ctiller): no need for unique_ptr once ExecCtx is gone
263   std::unique_ptr<Traits> traits_ = std::make_unique<Traits>();
264   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
265       grpc_event_engine::experimental::GetDefaultEventEngine();
266   RefCountedPtr<CallArenaAllocator> arena_allocator_ =
267       MakeRefCounted<CallArenaAllocator>(
268           ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
269               "test-allocator"),
270           1024);
271   RefCountedPtr<SinkDestination> bottom_destination_ =
272       MakeRefCounted<SinkDestination>();
273   RefCountedPtr<UnstartedCallDestination> top_destination_ =
274       traits_->CreateCallDestination(bottom_destination_);
275 };
276 
277 // Base class for transports
278 // Traits should have MakeClientInitialMetadata, MakeServerInitialMetadata,
279 // MakePayload, MakeServerTrailingMetadata.
280 // They should also have a MakeTransport returning a BenchmarkTransport.
281 
282 struct BenchmarkTransport {
283   OrphanablePtr<ClientTransport> client;
284   OrphanablePtr<ServerTransport> server;
285 };
286 
287 template <class Traits>
288 class TransportFixture {
289  public:
TransportFixture()290   TransportFixture() { transport_.server->SetCallDestination(acceptor_); };
291 
MakeCall()292   BenchmarkCall MakeCall() {
293     auto arena = arena_allocator_->MakeArena();
294     arena->SetContext<grpc_event_engine::experimental::EventEngine>(
295         event_engine_.get());
296     auto p =
297         MakeCallPair(traits_.MakeClientInitialMetadata(), std::move(arena));
298     transport_.client->StartCall(p.handler.StartCall());
299     auto handler = acceptor_->TakeHandler();
300     absl::optional<CallHandler> started_handler;
301     Notification started;
302     handler.SpawnInfallible("handler_setup", [&]() {
303       started_handler = handler.StartCall();
304       started.Notify();
305     });
306     started.WaitForNotification();
307     CHECK(started_handler.has_value());
308     return {std::move(p.initiator), std::move(*started_handler)};
309   }
310 
MakeServerInitialMetadata()311   ServerMetadataHandle MakeServerInitialMetadata() {
312     return traits_.MakeServerInitialMetadata();
313   }
314 
MakePayload()315   MessageHandle MakePayload() { return traits_.MakePayload(); }
316 
MakeServerTrailingMetadata()317   ServerMetadataHandle MakeServerTrailingMetadata() {
318     return traits_.MakeServerTrailingMetadata();
319   }
320 
321  private:
322   class Acceptor : public UnstartedCallDestination {
323    public:
StartCall(UnstartedCallHandler handler)324     void StartCall(UnstartedCallHandler handler) override {
325       MutexLock lock(&mu_);
326       handler_ = std::move(handler);
327     }
Orphaned()328     void Orphaned() override {}
329 
TakeHandler()330     UnstartedCallHandler TakeHandler() {
331       mu_.LockWhen(absl::Condition(
332           +[](Acceptor* dest) ABSL_EXCLUSIVE_LOCKS_REQUIRED(dest->mu_) {
333             return dest->handler_.has_value();
334           },
335           this));
336       auto h = std::move(*handler_);
337       handler_.reset();
338       mu_.Unlock();
339       return h;
340     }
341 
342     absl::Mutex mu_;
343     absl::optional<UnstartedCallHandler> handler_ ABSL_GUARDED_BY(mu_);
344   };
345 
346   Traits traits_;
347   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
348       grpc_event_engine::experimental::GetDefaultEventEngine();
349   RefCountedPtr<CallArenaAllocator> arena_allocator_ =
350       MakeRefCounted<CallArenaAllocator>(
351           ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
352               "test-allocator"),
353           1024);
354   RefCountedPtr<Acceptor> acceptor_ = MakeRefCounted<Acceptor>();
355   BenchmarkTransport transport_ = traits_.MakeTransport();
356 };
357 
358 }  // namespace grpc_core
359 
360 // Declare all relevant benchmarks for a given fixture
361 // Must be called within the grpc_core namespace
362 #define GRPC_CALL_SPINE_BENCHMARK(Fixture)     \
363   BENCHMARK(BM_UnaryWithSpawnPerEnd<Fixture>); \
364   BENCHMARK(BM_ClientToServerStreaming<Fixture>)
365 
366 #endif  // GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H
367