1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* Microbenchmarks around CHTTP2 transport operations */
20
21 #include <benchmark/benchmark.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24 #include <grpc/support/string_util.h>
25 #include <grpcpp/support/channel_arguments.h>
26 #include <string.h>
27 #include <memory>
28 #include <queue>
29 #include <sstream>
30 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
31 #include "src/core/ext/transport/chttp2/transport/internal.h"
32 #include "src/core/lib/iomgr/closure.h"
33 #include "src/core/lib/iomgr/resource_quota.h"
34 #include "src/core/lib/slice/slice_internal.h"
35 #include "src/core/lib/transport/static_metadata.h"
36 #include "test/core/util/test_config.h"
37 #include "test/cpp/microbenchmarks/helpers.h"
38 #include "test/cpp/util/test_config.h"
39
40 ////////////////////////////////////////////////////////////////////////////////
41 // Helper classes
42 //
43
44 class DummyEndpoint : public grpc_endpoint {
45 public:
DummyEndpoint()46 DummyEndpoint() {
47 static const grpc_endpoint_vtable my_vtable = {read,
48 write,
49 add_to_pollset,
50 add_to_pollset_set,
51 delete_from_pollset_set,
52 shutdown,
53 destroy,
54 get_resource_user,
55 get_peer,
56 get_fd,
57 can_track_err};
58 grpc_endpoint::vtable = &my_vtable;
59 ru_ = grpc_resource_user_create(LibraryInitializer::get().rq(),
60 "dummy_endpoint");
61 }
62
PushInput(grpc_slice slice)63 void PushInput(grpc_slice slice) {
64 if (read_cb_ == nullptr) {
65 GPR_ASSERT(!have_slice_);
66 buffered_slice_ = slice;
67 have_slice_ = true;
68 return;
69 }
70 grpc_slice_buffer_add(slices_, slice);
71 grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_cb_, GRPC_ERROR_NONE);
72 read_cb_ = nullptr;
73 }
74
75 private:
76 grpc_resource_user* ru_;
77 grpc_closure* read_cb_ = nullptr;
78 grpc_slice_buffer* slices_ = nullptr;
79 bool have_slice_ = false;
80 grpc_slice buffered_slice_;
81
QueueRead(grpc_slice_buffer * slices,grpc_closure * cb)82 void QueueRead(grpc_slice_buffer* slices, grpc_closure* cb) {
83 GPR_ASSERT(read_cb_ == nullptr);
84 if (have_slice_) {
85 have_slice_ = false;
86 grpc_slice_buffer_add(slices, buffered_slice_);
87 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
88 return;
89 }
90 read_cb_ = cb;
91 slices_ = slices;
92 }
93
read(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool)94 static void read(grpc_endpoint* ep, grpc_slice_buffer* slices,
95 grpc_closure* cb, bool /*urgent*/) {
96 static_cast<DummyEndpoint*>(ep)->QueueRead(slices, cb);
97 }
98
write(grpc_endpoint *,grpc_slice_buffer *,grpc_closure * cb,void *)99 static void write(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/,
100 grpc_closure* cb, void* /*arg*/) {
101 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
102 }
103
add_to_pollset(grpc_endpoint *,grpc_pollset *)104 static void add_to_pollset(grpc_endpoint* /*ep*/, grpc_pollset* /*pollset*/) {
105 }
106
add_to_pollset_set(grpc_endpoint *,grpc_pollset_set *)107 static void add_to_pollset_set(grpc_endpoint* /*ep*/,
108 grpc_pollset_set* /*pollset*/) {}
109
delete_from_pollset_set(grpc_endpoint *,grpc_pollset_set *)110 static void delete_from_pollset_set(grpc_endpoint* /*ep*/,
111 grpc_pollset_set* /*pollset*/) {}
112
shutdown(grpc_endpoint * ep,grpc_error * why)113 static void shutdown(grpc_endpoint* ep, grpc_error* why) {
114 grpc_resource_user_shutdown(static_cast<DummyEndpoint*>(ep)->ru_);
115 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
116 static_cast<DummyEndpoint*>(ep)->read_cb_, why);
117 }
118
destroy(grpc_endpoint * ep)119 static void destroy(grpc_endpoint* ep) {
120 grpc_resource_user_unref(static_cast<DummyEndpoint*>(ep)->ru_);
121 delete static_cast<DummyEndpoint*>(ep);
122 }
123
get_resource_user(grpc_endpoint * ep)124 static grpc_resource_user* get_resource_user(grpc_endpoint* ep) {
125 return static_cast<DummyEndpoint*>(ep)->ru_;
126 }
get_peer(grpc_endpoint *)127 static char* get_peer(grpc_endpoint* /*ep*/) { return gpr_strdup("test"); }
get_fd(grpc_endpoint *)128 static int get_fd(grpc_endpoint* /*ep*/) { return 0; }
can_track_err(grpc_endpoint *)129 static bool can_track_err(grpc_endpoint* /*ep*/) { return false; }
130 };
131
132 class Fixture {
133 public:
Fixture(const grpc::ChannelArguments & args,bool client)134 Fixture(const grpc::ChannelArguments& args, bool client) {
135 grpc_channel_args c_args = args.c_channel_args();
136 ep_ = new DummyEndpoint;
137 t_ = grpc_create_chttp2_transport(&c_args, ep_, client);
138 grpc_chttp2_transport_start_reading(t_, nullptr, nullptr);
139 FlushExecCtx();
140 }
141
FlushExecCtx()142 void FlushExecCtx() { grpc_core::ExecCtx::Get()->Flush(); }
143
~Fixture()144 ~Fixture() { grpc_transport_destroy(t_); }
145
chttp2_transport()146 grpc_chttp2_transport* chttp2_transport() {
147 return reinterpret_cast<grpc_chttp2_transport*>(t_);
148 }
transport()149 grpc_transport* transport() { return t_; }
150
PushInput(grpc_slice slice)151 void PushInput(grpc_slice slice) { ep_->PushInput(slice); }
152
153 private:
154 DummyEndpoint* ep_;
155 grpc_transport* t_;
156 };
157
158 class TestClosure : public grpc_closure {
159 public:
~TestClosure()160 virtual ~TestClosure() {}
161 };
162
163 template <class F>
MakeTestClosure(F f)164 std::unique_ptr<TestClosure> MakeTestClosure(F f) {
165 struct C : public TestClosure {
166 explicit C(const F& f) : f_(f) {
167 GRPC_CLOSURE_INIT(this, Execute, this, nullptr);
168 }
169 F f_;
170 static void Execute(void* arg, grpc_error* error) {
171 static_cast<C*>(arg)->f_(error);
172 }
173 };
174 return std::unique_ptr<TestClosure>(new C(f));
175 }
176
177 template <class F>
MakeOnceClosure(F f)178 grpc_closure* MakeOnceClosure(F f) {
179 struct C : public grpc_closure {
180 C(const F& f) : f_(f) {}
181 F f_;
182 static void Execute(void* arg, grpc_error* error) {
183 static_cast<C*>(arg)->f_(error);
184 delete static_cast<C*>(arg);
185 }
186 };
187 auto* c = new C{f};
188 return GRPC_CLOSURE_INIT(c, C::Execute, c, nullptr);
189 }
190
191 class Stream {
192 public:
Stream(Fixture * f)193 Stream(Fixture* f) : f_(f) {
194 stream_size_ = grpc_transport_stream_size(f->transport());
195 stream_ = gpr_malloc(stream_size_);
196 arena_ = grpc_core::Arena::Create(4096);
197 }
198
~Stream()199 ~Stream() {
200 gpr_event_wait(&done_, gpr_inf_future(GPR_CLOCK_REALTIME));
201 gpr_free(stream_);
202 arena_->Destroy();
203 }
204
Init(benchmark::State & state)205 void Init(benchmark::State& state) {
206 GRPC_STREAM_REF_INIT(&refcount_, 1, &Stream::FinishDestroy, this,
207 "test_stream");
208 gpr_event_init(&done_);
209 memset(stream_, 0, stream_size_);
210 if ((state.iterations() & 0xffff) == 0) {
211 arena_->Destroy();
212 arena_ = grpc_core::Arena::Create(4096);
213 }
214 grpc_transport_init_stream(f_->transport(),
215 static_cast<grpc_stream*>(stream_), &refcount_,
216 nullptr, arena_);
217 }
218
DestroyThen(grpc_closure * closure)219 void DestroyThen(grpc_closure* closure) {
220 destroy_closure_ = closure;
221 #ifndef NDEBUG
222 grpc_stream_unref(&refcount_, "DestroyThen");
223 #else
224 grpc_stream_unref(&refcount_);
225 #endif
226 }
227
Op(grpc_transport_stream_op_batch * op)228 void Op(grpc_transport_stream_op_batch* op) {
229 grpc_transport_perform_stream_op(f_->transport(),
230 static_cast<grpc_stream*>(stream_), op);
231 }
232
chttp2_stream()233 grpc_chttp2_stream* chttp2_stream() {
234 return static_cast<grpc_chttp2_stream*>(stream_);
235 }
236
237 private:
FinishDestroy(void * arg,grpc_error *)238 static void FinishDestroy(void* arg, grpc_error* /*error*/) {
239 auto stream = static_cast<Stream*>(arg);
240 grpc_transport_destroy_stream(stream->f_->transport(),
241 static_cast<grpc_stream*>(stream->stream_),
242 stream->destroy_closure_);
243 gpr_event_set(&stream->done_, (void*)(1));
244 }
245
246 Fixture* f_;
247 grpc_stream_refcount refcount_;
248 grpc_core::Arena* arena_;
249 size_t stream_size_;
250 void* stream_;
251 grpc_closure* destroy_closure_ = nullptr;
252 gpr_event done_;
253 };
254
255 ////////////////////////////////////////////////////////////////////////////////
256 // Benchmarks
257 //
258 std::vector<std::unique_ptr<gpr_event>> done_events;
259
BM_StreamCreateDestroy(benchmark::State & state)260 static void BM_StreamCreateDestroy(benchmark::State& state) {
261 TrackCounters track_counters;
262 grpc_core::ExecCtx exec_ctx;
263 Fixture f(grpc::ChannelArguments(), true);
264 auto* s = new Stream(&f);
265 grpc_transport_stream_op_batch op;
266 grpc_transport_stream_op_batch_payload op_payload(nullptr);
267 op = {};
268 op.cancel_stream = true;
269 op.payload = &op_payload;
270 op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
271 std::unique_ptr<TestClosure> next =
272 MakeTestClosure([&, s](grpc_error* /*error*/) {
273 if (!state.KeepRunning()) {
274 delete s;
275 return;
276 }
277 s->Init(state);
278 s->Op(&op);
279 s->DestroyThen(next.get());
280 });
281 grpc_core::Closure::Run(DEBUG_LOCATION, next.get(), GRPC_ERROR_NONE);
282 f.FlushExecCtx();
283 track_counters.Finish(state);
284 }
285 BENCHMARK(BM_StreamCreateDestroy);
286
287 class RepresentativeClientInitialMetadata {
288 public:
GetElems()289 static std::vector<grpc_mdelem> GetElems() {
290 return {
291 GRPC_MDELEM_SCHEME_HTTP,
292 GRPC_MDELEM_METHOD_POST,
293 grpc_mdelem_from_slices(
294 GRPC_MDSTR_PATH,
295 grpc_slice_intern(grpc_slice_from_static_string("/foo/bar"))),
296 grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY,
297 grpc_slice_intern(grpc_slice_from_static_string(
298 "foo.test.google.fr:1234"))),
299 GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE_COMMA_GZIP,
300 GRPC_MDELEM_TE_TRAILERS,
301 GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC,
302 grpc_mdelem_from_slices(
303 GRPC_MDSTR_USER_AGENT,
304 grpc_slice_intern(grpc_slice_from_static_string(
305 "grpc-c/3.0.0-dev (linux; chttp2; green)")))};
306 }
307 };
308
309 template <class Metadata>
BM_StreamCreateSendInitialMetadataDestroy(benchmark::State & state)310 static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State& state) {
311 TrackCounters track_counters;
312 grpc_core::ExecCtx exec_ctx;
313 Fixture f(grpc::ChannelArguments(), true);
314 auto* s = new Stream(&f);
315 grpc_transport_stream_op_batch op;
316 grpc_transport_stream_op_batch_payload op_payload(nullptr);
317 std::unique_ptr<TestClosure> start;
318 std::unique_ptr<TestClosure> done;
319
320 auto reset_op = [&]() {
321 op = {};
322 op.payload = &op_payload;
323 };
324
325 grpc_metadata_batch b;
326 grpc_metadata_batch_init(&b);
327 b.deadline = GRPC_MILLIS_INF_FUTURE;
328 std::vector<grpc_mdelem> elems = Metadata::GetElems();
329 std::vector<grpc_linked_mdelem> storage(elems.size());
330 for (size_t i = 0; i < elems.size(); i++) {
331 GPR_ASSERT(GRPC_LOG_IF_ERROR(
332 "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i])));
333 }
334
335 f.FlushExecCtx();
336 gpr_event bm_done;
337 gpr_event_init(&bm_done);
338 start = MakeTestClosure([&, s](grpc_error* /*error*/) {
339 if (!state.KeepRunning()) {
340 delete s;
341 gpr_event_set(&bm_done, (void*)1);
342 return;
343 }
344 s->Init(state);
345 reset_op();
346 op.on_complete = done.get();
347 op.send_initial_metadata = true;
348 op.payload->send_initial_metadata.send_initial_metadata = &b;
349 s->Op(&op);
350 });
351 done = MakeTestClosure([&](grpc_error* /*error*/) {
352 reset_op();
353 op.cancel_stream = true;
354 op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
355 s->Op(&op);
356 s->DestroyThen(start.get());
357 });
358 grpc_core::ExecCtx::Run(DEBUG_LOCATION, start.get(), GRPC_ERROR_NONE);
359 f.FlushExecCtx();
360 gpr_event_wait(&bm_done, gpr_inf_future(GPR_CLOCK_REALTIME));
361 grpc_metadata_batch_destroy(&b);
362 track_counters.Finish(state);
363 }
364 BENCHMARK_TEMPLATE(BM_StreamCreateSendInitialMetadataDestroy,
365 RepresentativeClientInitialMetadata);
366
BM_TransportEmptyOp(benchmark::State & state)367 static void BM_TransportEmptyOp(benchmark::State& state) {
368 TrackCounters track_counters;
369 grpc_core::ExecCtx exec_ctx;
370 Fixture f(grpc::ChannelArguments(), true);
371 auto* s = new Stream(&f);
372 s->Init(state);
373 grpc_transport_stream_op_batch op;
374 grpc_transport_stream_op_batch_payload op_payload(nullptr);
375 auto reset_op = [&]() {
376 op = {};
377 op.payload = &op_payload;
378 };
379 std::unique_ptr<TestClosure> c = MakeTestClosure([&](grpc_error* /*error*/) {
380 if (!state.KeepRunning()) return;
381 reset_op();
382 op.on_complete = c.get();
383 s->Op(&op);
384 });
385 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c.get(), GRPC_ERROR_NONE);
386 f.FlushExecCtx();
387 reset_op();
388 op.cancel_stream = true;
389 op_payload.cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
390 gpr_event* stream_cancel_done = new gpr_event;
391 gpr_event_init(stream_cancel_done);
392 std::unique_ptr<TestClosure> stream_cancel_closure =
393 MakeTestClosure([&](grpc_error* error) {
394 GPR_ASSERT(error == GRPC_ERROR_NONE);
395 gpr_event_set(stream_cancel_done, (void*)(1));
396 });
397 op.on_complete = stream_cancel_closure.get();
398 s->Op(&op);
399 f.FlushExecCtx();
400 gpr_event_wait(stream_cancel_done, gpr_inf_future(GPR_CLOCK_REALTIME));
401 done_events.emplace_back(stream_cancel_done);
402 s->DestroyThen(MakeOnceClosure([s](grpc_error* /*error*/) { delete s; }));
403 f.FlushExecCtx();
404 track_counters.Finish(state);
405 }
406 BENCHMARK(BM_TransportEmptyOp);
407
BM_TransportStreamSend(benchmark::State & state)408 static void BM_TransportStreamSend(benchmark::State& state) {
409 TrackCounters track_counters;
410 grpc_core::ExecCtx exec_ctx;
411 Fixture f(grpc::ChannelArguments(), true);
412 auto* s = new Stream(&f);
413 s->Init(state);
414 grpc_transport_stream_op_batch op;
415 grpc_transport_stream_op_batch_payload op_payload(nullptr);
416 auto reset_op = [&]() {
417 op = {};
418 op.payload = &op_payload;
419 };
420 // Create the send_message payload slice.
421 // Note: We use grpc_slice_malloc_large() instead of grpc_slice_malloc()
422 // to force the slice to be refcounted, so that it remains alive when it
423 // is unreffed after each send_message op.
424 grpc_slice send_slice = grpc_slice_malloc_large(state.range(0));
425 memset(GRPC_SLICE_START_PTR(send_slice), 0, GRPC_SLICE_LENGTH(send_slice));
426 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> send_stream;
427 grpc_metadata_batch b;
428 grpc_metadata_batch_init(&b);
429 b.deadline = GRPC_MILLIS_INF_FUTURE;
430 std::vector<grpc_mdelem> elems =
431 RepresentativeClientInitialMetadata::GetElems();
432 std::vector<grpc_linked_mdelem> storage(elems.size());
433 for (size_t i = 0; i < elems.size(); i++) {
434 GPR_ASSERT(GRPC_LOG_IF_ERROR(
435 "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i])));
436 }
437
438 gpr_event* bm_done = new gpr_event;
439 gpr_event_init(bm_done);
440
441 std::unique_ptr<TestClosure> c = MakeTestClosure([&](grpc_error* /*error*/) {
442 if (!state.KeepRunning()) {
443 gpr_event_set(bm_done, (void*)(1));
444 return;
445 }
446 grpc_slice_buffer send_buffer;
447 grpc_slice_buffer_init(&send_buffer);
448 grpc_slice_buffer_add(&send_buffer, grpc_slice_ref(send_slice));
449 send_stream.Init(&send_buffer, 0);
450 grpc_slice_buffer_destroy(&send_buffer);
451 // force outgoing window to be yuge
452 s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
453 f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
454 reset_op();
455 op.on_complete = c.get();
456 op.send_message = true;
457 op.payload->send_message.send_message.reset(send_stream.get());
458 s->Op(&op);
459 });
460
461 reset_op();
462 op.send_initial_metadata = true;
463 op.payload->send_initial_metadata.send_initial_metadata = &b;
464 op.on_complete = c.get();
465 s->Op(&op);
466
467 f.FlushExecCtx();
468 gpr_event_wait(bm_done, gpr_inf_future(GPR_CLOCK_REALTIME));
469 done_events.emplace_back(bm_done);
470
471 reset_op();
472 op.cancel_stream = true;
473 op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
474 gpr_event* stream_cancel_done = new gpr_event;
475 gpr_event_init(stream_cancel_done);
476 std::unique_ptr<TestClosure> stream_cancel_closure =
477 MakeTestClosure([&](grpc_error* error) {
478 GPR_ASSERT(error == GRPC_ERROR_NONE);
479 gpr_event_set(stream_cancel_done, (void*)(1));
480 });
481 op.on_complete = stream_cancel_closure.get();
482 s->Op(&op);
483 f.FlushExecCtx();
484 gpr_event_wait(stream_cancel_done, gpr_inf_future(GPR_CLOCK_REALTIME));
485 done_events.emplace_back(stream_cancel_done);
486 s->DestroyThen(MakeOnceClosure([s](grpc_error* /*error*/) { delete s; }));
487 f.FlushExecCtx();
488 track_counters.Finish(state);
489 grpc_metadata_batch_destroy(&b);
490 grpc_slice_unref(send_slice);
491 }
492 BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024);
493
494 #define SLICE_FROM_BUFFER(s) grpc_slice_from_static_buffer(s, sizeof(s) - 1)
495
CreateIncomingDataSlice(size_t length,size_t frame_size)496 static grpc_slice CreateIncomingDataSlice(size_t length, size_t frame_size) {
497 std::queue<char> unframed;
498
499 unframed.push(static_cast<uint8_t>(0));
500 unframed.push(static_cast<uint8_t>(length >> 24));
501 unframed.push(static_cast<uint8_t>(length >> 16));
502 unframed.push(static_cast<uint8_t>(length >> 8));
503 unframed.push(static_cast<uint8_t>(length));
504 for (size_t i = 0; i < length; i++) {
505 unframed.push('a');
506 }
507
508 std::vector<char> framed;
509 while (unframed.size() > frame_size) {
510 // frame size
511 framed.push_back(static_cast<uint8_t>(frame_size >> 16));
512 framed.push_back(static_cast<uint8_t>(frame_size >> 8));
513 framed.push_back(static_cast<uint8_t>(frame_size));
514 // data frame
515 framed.push_back(0);
516 // no flags
517 framed.push_back(0);
518 // stream id
519 framed.push_back(0);
520 framed.push_back(0);
521 framed.push_back(0);
522 framed.push_back(1);
523 // frame data
524 for (size_t i = 0; i < frame_size; i++) {
525 framed.push_back(unframed.front());
526 unframed.pop();
527 }
528 }
529
530 // frame size
531 framed.push_back(static_cast<uint8_t>(unframed.size() >> 16));
532 framed.push_back(static_cast<uint8_t>(unframed.size() >> 8));
533 framed.push_back(static_cast<uint8_t>(unframed.size()));
534 // data frame
535 framed.push_back(0);
536 // no flags
537 framed.push_back(0);
538 // stream id
539 framed.push_back(0);
540 framed.push_back(0);
541 framed.push_back(0);
542 framed.push_back(1);
543 while (!unframed.empty()) {
544 framed.push_back(unframed.front());
545 unframed.pop();
546 }
547
548 return grpc_slice_from_copied_buffer(framed.data(), framed.size());
549 }
550
BM_TransportStreamRecv(benchmark::State & state)551 static void BM_TransportStreamRecv(benchmark::State& state) {
552 TrackCounters track_counters;
553 grpc_core::ExecCtx exec_ctx;
554 Fixture f(grpc::ChannelArguments(), true);
555 auto* s = new Stream(&f);
556 s->Init(state);
557 grpc_transport_stream_op_batch_payload op_payload(nullptr);
558 grpc_transport_stream_op_batch op;
559 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_stream;
560 grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384);
561
562 auto reset_op = [&]() {
563 op = {};
564 op.payload = &op_payload;
565 };
566
567 grpc_metadata_batch b;
568 grpc_metadata_batch_init(&b);
569 grpc_metadata_batch b_recv;
570 grpc_metadata_batch_init(&b_recv);
571 b.deadline = GRPC_MILLIS_INF_FUTURE;
572 std::vector<grpc_mdelem> elems =
573 RepresentativeClientInitialMetadata::GetElems();
574 std::vector<grpc_linked_mdelem> storage(elems.size());
575 for (size_t i = 0; i < elems.size(); i++) {
576 GPR_ASSERT(GRPC_LOG_IF_ERROR(
577 "addmd", grpc_metadata_batch_add_tail(&b, &storage[i], elems[i])));
578 }
579
580 std::unique_ptr<TestClosure> do_nothing =
581 MakeTestClosure([](grpc_error* /*error*/) {});
582
583 uint32_t received;
584
585 std::unique_ptr<TestClosure> drain_start;
586 std::unique_ptr<TestClosure> drain;
587 std::unique_ptr<TestClosure> drain_continue;
588 grpc_slice recv_slice;
589
590 std::unique_ptr<TestClosure> c = MakeTestClosure([&](grpc_error* /*error*/) {
591 if (!state.KeepRunning()) return;
592 // force outgoing window to be yuge
593 s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
594 f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
595 received = 0;
596 reset_op();
597 op.on_complete = do_nothing.get();
598 op.recv_message = true;
599 op.payload->recv_message.recv_message = &recv_stream;
600 op.payload->recv_message.recv_message_ready = drain_start.get();
601 s->Op(&op);
602 f.PushInput(grpc_slice_ref(incoming_data));
603 });
604
605 drain_start = MakeTestClosure([&](grpc_error* /*error*/) {
606 if (recv_stream == nullptr) {
607 GPR_ASSERT(!state.KeepRunning());
608 return;
609 }
610 grpc_core::Closure::Run(DEBUG_LOCATION, drain.get(), GRPC_ERROR_NONE);
611 });
612
613 drain = MakeTestClosure([&](grpc_error* /*error*/) {
614 do {
615 if (received == recv_stream->length()) {
616 recv_stream.reset();
617 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c.get(), GRPC_ERROR_NONE);
618 return;
619 }
620 } while (recv_stream->Next(recv_stream->length() - received,
621 drain_continue.get()) &&
622 GRPC_ERROR_NONE == recv_stream->Pull(&recv_slice) &&
623 (received += GRPC_SLICE_LENGTH(recv_slice),
624 grpc_slice_unref_internal(recv_slice), true));
625 });
626
627 drain_continue = MakeTestClosure([&](grpc_error* /*error*/) {
628 recv_stream->Pull(&recv_slice);
629 received += GRPC_SLICE_LENGTH(recv_slice);
630 grpc_slice_unref_internal(recv_slice);
631 grpc_core::Closure::Run(DEBUG_LOCATION, drain.get(), GRPC_ERROR_NONE);
632 });
633
634 reset_op();
635 op.send_initial_metadata = true;
636 op.payload->send_initial_metadata.send_initial_metadata = &b;
637 op.recv_initial_metadata = true;
638 op.payload->recv_initial_metadata.recv_initial_metadata = &b_recv;
639 op.payload->recv_initial_metadata.recv_initial_metadata_ready =
640 do_nothing.get();
641 op.on_complete = c.get();
642 s->Op(&op);
643 f.PushInput(SLICE_FROM_BUFFER(
644 "\x00\x00\x00\x04\x00\x00\x00\x00\x00"
645 // Generated using:
646 // tools/codegen/core/gen_header_frame.py <
647 // test/cpp/microbenchmarks/representative_server_initial_metadata.headers
648 "\x00\x00X\x01\x04\x00\x00\x00\x01"
649 "\x10\x07:status\x03"
650 "200"
651 "\x10\x0c"
652 "content-type\x10"
653 "application/grpc"
654 "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"));
655
656 f.FlushExecCtx();
657 reset_op();
658 op.cancel_stream = true;
659 op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
660 gpr_event* stream_cancel_done = new gpr_event;
661 gpr_event_init(stream_cancel_done);
662 std::unique_ptr<TestClosure> stream_cancel_closure =
663 MakeTestClosure([&](grpc_error* error) {
664 GPR_ASSERT(error == GRPC_ERROR_NONE);
665 gpr_event_set(stream_cancel_done, (void*)(1));
666 });
667 op.on_complete = stream_cancel_closure.get();
668 s->Op(&op);
669 f.FlushExecCtx();
670 gpr_event_wait(stream_cancel_done, gpr_inf_future(GPR_CLOCK_REALTIME));
671 done_events.emplace_back(stream_cancel_done);
672 s->DestroyThen(MakeOnceClosure([s](grpc_error* /*error*/) { delete s; }));
673 grpc_metadata_batch_destroy(&b);
674 grpc_metadata_batch_destroy(&b_recv);
675 f.FlushExecCtx();
676 track_counters.Finish(state);
677 grpc_slice_unref(incoming_data);
678 }
679 BENCHMARK(BM_TransportStreamRecv)->Range(0, 128 * 1024 * 1024);
680
681 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
682 // and others do not. This allows us to support both modes.
683 namespace benchmark {
RunTheBenchmarksNamespaced()684 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
685 } // namespace benchmark
686
main(int argc,char ** argv)687 int main(int argc, char** argv) {
688 grpc::testing::TestEnvironment env(argc, argv);
689 LibraryInitializer libInit;
690 ::benchmark::Initialize(&argc, argv);
691 ::grpc::testing::InitTest(&argc, &argv, false);
692 benchmark::RunTheBenchmarksNamespaced();
693 return 0;
694 }
695