1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H
16 #define GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H
17
18 #include <memory>
19
20 #include "benchmark/benchmark.h"
21 #include "src/core/lib/event_engine/default_event_engine.h"
22 #include "src/core/lib/event_engine/event_engine_context.h"
23 #include "src/core/lib/iomgr/exec_ctx.h"
24 #include "src/core/lib/promise/all_ok.h"
25 #include "src/core/lib/promise/map.h"
26 #include "src/core/lib/resource_quota/resource_quota.h"
27 #include "src/core/lib/transport/call_spine.h"
28 #include "src/core/lib/transport/transport.h"
29 #include "src/core/util/notification.h"
30
31 namespace grpc_core {
32
33 struct BenchmarkCall {
34 CallInitiator initiator;
35 CallHandler handler;
36 };
37
38 // Unary call with one spawn on each end of the spine.
39 template <typename Fixture>
BM_UnaryWithSpawnPerEnd(benchmark::State & state)40 void BM_UnaryWithSpawnPerEnd(benchmark::State& state) {
41 Fixture fixture;
42 for (auto _ : state) {
43 Notification handler_done;
44 Notification initiator_done;
45 {
46 ExecCtx exec_ctx;
47 BenchmarkCall call = fixture.MakeCall();
48 call.handler.SpawnInfallible("handler", [handler = call.handler, &fixture,
49 &handler_done]() mutable {
50 handler.PushServerInitialMetadata(fixture.MakeServerInitialMetadata());
51 return Map(
52 AllOk<StatusFlag>(
53 Map(handler.PullClientInitialMetadata(),
54 [](ValueOrFailure<ClientMetadataHandle> md) {
55 return md.status();
56 }),
57 Map(handler.PullMessage(),
58 [](ClientToServerNextMessage msg) { return msg.status(); }),
59 handler.PushMessage(fixture.MakePayload())),
60 [&handler_done, &fixture, handler](StatusFlag status) mutable {
61 CHECK(status.ok());
62 handler.PushServerTrailingMetadata(
63 fixture.MakeServerTrailingMetadata());
64 handler_done.Notify();
65 });
66 });
67 call.initiator.SpawnInfallible("initiator", [initiator = call.initiator,
68 &fixture,
69 &initiator_done]() mutable {
70 return Map(
71 AllOk<StatusFlag>(
72 Map(initiator.PushMessage(fixture.MakePayload()),
73 [](StatusFlag) { return Success{}; }),
74 Map(initiator.PullServerInitialMetadata(),
75 [](absl::optional<ServerMetadataHandle> md) {
76 return Success{};
77 }),
78 Map(initiator.PullMessage(),
79 [](ServerToClientNextMessage msg) { return msg.status(); }),
80 Map(initiator.PullServerTrailingMetadata(),
81 [](ServerMetadataHandle) { return Success(); })),
82 [&initiator_done](StatusFlag result) {
83 CHECK(result.ok());
84 initiator_done.Notify();
85 });
86 });
87 }
88 handler_done.WaitForNotification();
89 initiator_done.WaitForNotification();
90 }
91 }
92
93 template <typename Fixture>
BM_ClientToServerStreaming(benchmark::State & state)94 void BM_ClientToServerStreaming(benchmark::State& state) {
95 Fixture fixture;
96 BenchmarkCall call = fixture.MakeCall();
97 Notification handler_metadata_done;
98 Notification initiator_metadata_done;
99 call.handler.SpawnInfallible("handler-initial-metadata", [&]() {
100 return Map(call.handler.PullClientInitialMetadata(),
101 [&](ValueOrFailure<ClientMetadataHandle> md) {
102 CHECK(md.ok());
103 call.handler.PushServerInitialMetadata(
104 fixture.MakeServerInitialMetadata());
105 handler_metadata_done.Notify();
106 });
107 });
108 call.initiator.SpawnInfallible("initiator-initial-metadata", [&]() {
109 return Map(call.initiator.PullServerInitialMetadata(),
110 [&](absl::optional<ServerMetadataHandle> md) {
111 CHECK(md.has_value());
112 initiator_metadata_done.Notify();
113 });
114 });
115 handler_metadata_done.WaitForNotification();
116 initiator_metadata_done.WaitForNotification();
117 for (auto _ : state) {
118 Notification handler_done;
119 Notification initiator_done;
120 call.handler.SpawnInfallible("handler", [&]() {
121 return Map(call.handler.PullMessage(),
122 [&](ClientToServerNextMessage msg) {
123 CHECK(msg.ok());
124 handler_done.Notify();
125 });
126 });
127 call.initiator.SpawnInfallible("initiator", [&]() {
128 return Map(call.initiator.PushMessage(fixture.MakePayload()),
129 [&](StatusFlag result) {
130 CHECK(result.ok());
131 initiator_done.Notify();
132 });
133 });
134 handler_done.WaitForNotification();
135 initiator_done.WaitForNotification();
136 }
137 call.initiator.SpawnInfallible(
138 "done", [initiator = call.initiator]() mutable { initiator.Cancel(); });
139 call.handler.SpawnInfallible("done", [handler = call.handler]() mutable {
140 handler.PushServerTrailingMetadata(
141 CancelledServerMetadataFromStatus(GRPC_STATUS_CANCELLED));
142 });
143 }
144
145 // Base class for fixtures that wrap a single filter.
146 // Traits should have MakeClientInitialMetadata, MakeServerInitialMetadata,
147 // MakePayload, MakeServerTrailingMetadata, MakeChannelArgs and a type named
148 // Filter.
149 template <class Traits>
150 class FilterFixture {
151 public:
MakeCall()152 BenchmarkCall MakeCall() {
153 auto arena = arena_allocator_->MakeArena();
154 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
155 event_engine_.get());
156 auto p =
157 MakeCallPair(traits_.MakeClientInitialMetadata(), std::move(arena));
158 return {std::move(p.initiator), p.handler.StartCall()};
159 }
160
MakeServerInitialMetadata()161 ServerMetadataHandle MakeServerInitialMetadata() {
162 return traits_.MakeServerInitialMetadata();
163 }
164
MakePayload()165 MessageHandle MakePayload() { return traits_.MakePayload(); }
166
MakeServerTrailingMetadata()167 ServerMetadataHandle MakeServerTrailingMetadata() {
168 return traits_.MakeServerTrailingMetadata();
169 }
170
171 private:
172 Traits traits_;
173 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
174 grpc_event_engine::experimental::GetDefaultEventEngine();
175 RefCountedPtr<CallArenaAllocator> arena_allocator_ =
176 MakeRefCounted<CallArenaAllocator>(
177 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
178 "test-allocator"),
179 1024);
180 const RefCountedPtr<CallFilters::Stack> stack_ = [this]() {
181 auto filter = Traits::Filter::Create(traits_.MakeChannelArgs(),
182 typename Traits::Filter::Args{});
183 CHECK(filter.ok());
184 CallFilters::StackBuilder builder;
185 builder.Add(filter->get());
186 builder.AddOwnedObject(std::move(*filter));
187 return builder.Build();
188 }();
189 };
190
191 // Base class for fixtures that wrap an UnstartedCallDestination.
192 template <class Traits>
193 class UnstartedCallDestinationFixture {
194 public:
MakeCall()195 BenchmarkCall MakeCall() {
196 auto arena = arena_allocator_->MakeArena();
197 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
198 event_engine_.get());
199 auto p =
200 MakeCallPair(traits_->MakeClientInitialMetadata(), std::move(arena));
201 p.handler.SpawnInfallible("initiator_setup", [&]() {
202 top_destination_->StartCall(std::move(p.handler));
203 });
204 auto handler = bottom_destination_->TakeHandler();
205 absl::optional<CallHandler> started_handler;
206 Notification started;
207 handler.SpawnInfallible("handler_setup", [&]() {
208 started_handler = handler.StartCall();
209 started.Notify();
210 });
211 started.WaitForNotification();
212 CHECK(started_handler.has_value());
213 return {std::move(p.initiator), std::move(*started_handler)};
214 }
215
~UnstartedCallDestinationFixture()216 ~UnstartedCallDestinationFixture() {
217 // TODO(ctiller): entire destructor can be deleted once ExecCtx is gone.
218 ExecCtx exec_ctx;
219 top_destination_.reset();
220 bottom_destination_.reset();
221 arena_allocator_.reset();
222 event_engine_.reset();
223 traits_.reset();
224 }
225
MakeServerInitialMetadata()226 ServerMetadataHandle MakeServerInitialMetadata() {
227 return traits_->MakeServerInitialMetadata();
228 }
229
MakePayload()230 MessageHandle MakePayload() { return traits_->MakePayload(); }
231
MakeServerTrailingMetadata()232 ServerMetadataHandle MakeServerTrailingMetadata() {
233 return traits_->MakeServerTrailingMetadata();
234 }
235
236 private:
237 class SinkDestination : public UnstartedCallDestination {
238 public:
StartCall(UnstartedCallHandler handler)239 void StartCall(UnstartedCallHandler handler) override {
240 MutexLock lock(&mu_);
241 handler_ = std::move(handler);
242 }
Orphaned()243 void Orphaned() override {}
244
TakeHandler()245 UnstartedCallHandler TakeHandler() {
246 mu_.LockWhen(absl::Condition(
247 +[](SinkDestination* dest) ABSL_EXCLUSIVE_LOCKS_REQUIRED(dest->mu_) {
248 return dest->handler_.has_value();
249 },
250 this));
251 auto h = std::move(*handler_);
252 handler_.reset();
253 mu_.Unlock();
254 return h;
255 }
256
257 private:
258 absl::Mutex mu_;
259 absl::optional<UnstartedCallHandler> handler_ ABSL_GUARDED_BY(mu_);
260 };
261
262 // TODO(ctiller): no need for unique_ptr once ExecCtx is gone
263 std::unique_ptr<Traits> traits_ = std::make_unique<Traits>();
264 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
265 grpc_event_engine::experimental::GetDefaultEventEngine();
266 RefCountedPtr<CallArenaAllocator> arena_allocator_ =
267 MakeRefCounted<CallArenaAllocator>(
268 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
269 "test-allocator"),
270 1024);
271 RefCountedPtr<SinkDestination> bottom_destination_ =
272 MakeRefCounted<SinkDestination>();
273 RefCountedPtr<UnstartedCallDestination> top_destination_ =
274 traits_->CreateCallDestination(bottom_destination_);
275 };
276
277 // Base class for transports
278 // Traits should have MakeClientInitialMetadata, MakeServerInitialMetadata,
279 // MakePayload, MakeServerTrailingMetadata.
280 // They should also have a MakeTransport returning a BenchmarkTransport.
281
282 struct BenchmarkTransport {
283 OrphanablePtr<ClientTransport> client;
284 OrphanablePtr<ServerTransport> server;
285 };
286
287 template <class Traits>
288 class TransportFixture {
289 public:
TransportFixture()290 TransportFixture() { transport_.server->SetCallDestination(acceptor_); };
291
MakeCall()292 BenchmarkCall MakeCall() {
293 auto arena = arena_allocator_->MakeArena();
294 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
295 event_engine_.get());
296 auto p =
297 MakeCallPair(traits_.MakeClientInitialMetadata(), std::move(arena));
298 transport_.client->StartCall(p.handler.StartCall());
299 auto handler = acceptor_->TakeHandler();
300 absl::optional<CallHandler> started_handler;
301 Notification started;
302 handler.SpawnInfallible("handler_setup", [&]() {
303 started_handler = handler.StartCall();
304 started.Notify();
305 });
306 started.WaitForNotification();
307 CHECK(started_handler.has_value());
308 return {std::move(p.initiator), std::move(*started_handler)};
309 }
310
MakeServerInitialMetadata()311 ServerMetadataHandle MakeServerInitialMetadata() {
312 return traits_.MakeServerInitialMetadata();
313 }
314
MakePayload()315 MessageHandle MakePayload() { return traits_.MakePayload(); }
316
MakeServerTrailingMetadata()317 ServerMetadataHandle MakeServerTrailingMetadata() {
318 return traits_.MakeServerTrailingMetadata();
319 }
320
321 private:
322 class Acceptor : public UnstartedCallDestination {
323 public:
StartCall(UnstartedCallHandler handler)324 void StartCall(UnstartedCallHandler handler) override {
325 MutexLock lock(&mu_);
326 handler_ = std::move(handler);
327 }
Orphaned()328 void Orphaned() override {}
329
TakeHandler()330 UnstartedCallHandler TakeHandler() {
331 mu_.LockWhen(absl::Condition(
332 +[](Acceptor* dest) ABSL_EXCLUSIVE_LOCKS_REQUIRED(dest->mu_) {
333 return dest->handler_.has_value();
334 },
335 this));
336 auto h = std::move(*handler_);
337 handler_.reset();
338 mu_.Unlock();
339 return h;
340 }
341
342 absl::Mutex mu_;
343 absl::optional<UnstartedCallHandler> handler_ ABSL_GUARDED_BY(mu_);
344 };
345
346 Traits traits_;
347 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_ =
348 grpc_event_engine::experimental::GetDefaultEventEngine();
349 RefCountedPtr<CallArenaAllocator> arena_allocator_ =
350 MakeRefCounted<CallArenaAllocator>(
351 ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
352 "test-allocator"),
353 1024);
354 RefCountedPtr<Acceptor> acceptor_ = MakeRefCounted<Acceptor>();
355 BenchmarkTransport transport_ = traits_.MakeTransport();
356 };
357
358 } // namespace grpc_core
359
360 // Declare all relevant benchmarks for a given fixture
361 // Must be called within the grpc_core namespace
362 #define GRPC_CALL_SPINE_BENCHMARK(Fixture) \
363 BENCHMARK(BM_UnaryWithSpawnPerEnd<Fixture>); \
364 BENCHMARK(BM_ClientToServerStreaming<Fixture>)
365
366 #endif // GRPC_TEST_CORE_TRANSPORT_CALL_SPINE_BENCHMARKS_H
367