1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/transport/call_filters.h"
16
17 #include <vector>
18
19 #include "gmock/gmock.h"
20 #include "gtest/gtest.h"
21 #include "test/core/promise/poll_matcher.h"
22
23 using testing::Mock;
24 using testing::StrictMock;
25
26 namespace grpc_core {
27
28 namespace {
29 // A mock activity that can be activated and deactivated.
30 class MockActivity : public Activity, public Wakeable {
31 public:
32 MOCK_METHOD(void, WakeupRequested, ());
33
ForceImmediateRepoll(WakeupMask)34 void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
Orphan()35 void Orphan() override {}
MakeOwningWaker()36 Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()37 Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)38 void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
WakeupAsync(WakeupMask)39 void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
Drop(WakeupMask)40 void Drop(WakeupMask /*mask*/) override {}
DebugTag() const41 std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const42 std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
43 return DebugTag();
44 }
45
Activate()46 void Activate() {
47 if (scoped_activity_ == nullptr) {
48 scoped_activity_ = std::make_unique<ScopedActivity>(this);
49 }
50 }
51
Deactivate()52 void Deactivate() { scoped_activity_.reset(); }
53
54 private:
55 std::unique_ptr<ScopedActivity> scoped_activity_;
56 };
57
58 #define EXPECT_WAKEUP(activity, statement) \
59 EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \
60 statement; \
61 Mock::VerifyAndClearExpectations(&(activity));
62
63 } // namespace
64
65 ////////////////////////////////////////////////////////////////////////////////
66 // Layout
67
68 namespace filters_detail {
69
TEST(LayoutTest,Empty)70 TEST(LayoutTest, Empty) {
71 Layout<ClientMetadataHandle> l;
72 ASSERT_EQ(l.ops.size(), 0u);
73 EXPECT_EQ(l.promise_size, 0u);
74 EXPECT_EQ(l.promise_alignment, 0u);
75 }
76
TEST(LayoutTest,Add)77 TEST(LayoutTest, Add) {
78 Layout<ClientMetadataHandle> l;
79 l.Add(1, 4,
80 Operator<ClientMetadataHandle>{&l, 120, nullptr, nullptr, nullptr});
81 ASSERT_EQ(l.ops.size(), 1u);
82 EXPECT_EQ(l.promise_size, 1u);
83 EXPECT_EQ(l.promise_alignment, 4u);
84 EXPECT_EQ(l.ops[0].call_offset, 120);
85 }
86
87 } // namespace filters_detail
88
89 ////////////////////////////////////////////////////////////////////////////////
90 // StackData
91
92 namespace filters_detail {
93
TEST(StackDataTest,Empty)94 TEST(StackDataTest, Empty) {
95 StackData d;
96 EXPECT_EQ(d.call_data_alignment, 1u);
97 EXPECT_EQ(d.call_data_size, 0u);
98 }
99
TEST(StackDataTest,OneByteAlignmentAndSize)100 TEST(StackDataTest, OneByteAlignmentAndSize) {
101 struct Filter1 {
102 struct Call {
103 char c;
104 };
105 };
106 static_assert(alignof(typename Filter1::Call) == 1,
107 "Expect 1 byte alignment");
108 static_assert(sizeof(typename Filter1::Call) == 1, "Expect 1 byte size");
109 StackData d;
110 Filter1 f1;
111 d.AddFilter(&f1);
112 EXPECT_EQ(d.call_data_alignment, 1);
113 EXPECT_EQ(d.call_data_size, 1);
114 ASSERT_EQ(d.filter_constructor.size(), 1u);
115 ASSERT_EQ(d.filter_destructor.size(), 0u);
116 // Check channel data
117 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
118 // Check call offsets
119 EXPECT_EQ(d.filter_constructor[0].call_offset, 0);
120 }
121
TEST(StackDataTest,PointerAlignmentAndSize)122 TEST(StackDataTest, PointerAlignmentAndSize) {
123 struct Filter1 {
124 struct Call {
125 void* p;
126 };
127 };
128 static_assert(alignof(typename Filter1::Call) == alignof(void*),
129 "Expect pointer alignment");
130 static_assert(sizeof(typename Filter1::Call) == sizeof(void*),
131 "Expect pointer size");
132 StackData d;
133 Filter1 f1;
134 d.AddFilter(&f1);
135 EXPECT_EQ(d.call_data_alignment, alignof(void*));
136 EXPECT_EQ(d.call_data_size, sizeof(void*));
137 ASSERT_EQ(d.filter_constructor.size(), 1u);
138 ASSERT_EQ(d.filter_destructor.size(), 0u);
139 // Check channel data
140 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
141 // Check call offsets
142 EXPECT_EQ(d.filter_constructor[0].call_offset, 0);
143 }
144
TEST(StackDataTest,PointerAndOneByteAlignmentAndSize)145 TEST(StackDataTest, PointerAndOneByteAlignmentAndSize) {
146 struct Filter1 {
147 struct Call {
148 char c;
149 };
150 };
151 static_assert(alignof(typename Filter1::Call) == 1,
152 "Expect 1 byte alignment");
153 static_assert(sizeof(typename Filter1::Call) == 1, "Expect 1 byte size");
154 struct Filter2 {
155 struct Call {
156 void* p;
157 };
158 };
159 static_assert(alignof(typename Filter2::Call) == alignof(void*),
160 "Expect pointer alignment");
161 static_assert(sizeof(typename Filter2::Call) == sizeof(void*),
162 "Expect pointer size");
163 StackData d;
164 Filter1 f1;
165 Filter2 f2;
166 d.AddFilter(&f1);
167 d.AddFilter(&f2);
168 EXPECT_EQ(d.call_data_alignment, alignof(void*));
169 // Padding added after 1-byte element to align pointer.
170 EXPECT_EQ(d.call_data_size, 2 * sizeof(void*));
171 ASSERT_EQ(d.filter_constructor.size(), 2u);
172 ASSERT_EQ(d.filter_destructor.size(), 0u);
173 // Check channel data
174 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
175 EXPECT_EQ(d.filter_constructor[1].channel_data, &f2);
176 // Check call offsets
177 EXPECT_EQ(d.filter_constructor[0].call_offset, 0);
178 EXPECT_EQ(d.filter_constructor[1].call_offset, sizeof(void*));
179 }
180
TEST(StackDataTest,PointerAndOneByteAlignmentAndSizeBackwards)181 TEST(StackDataTest, PointerAndOneByteAlignmentAndSizeBackwards) {
182 struct Filter1 {
183 struct Call {
184 char c;
185 };
186 };
187 static_assert(alignof(typename Filter1::Call) == 1,
188 "Expect 1 byte alignment");
189 static_assert(sizeof(typename Filter1::Call) == 1, "Expect 1 byte size");
190 struct Filter2 {
191 struct Call {
192 void* p;
193 };
194 };
195 static_assert(alignof(typename Filter2::Call) == alignof(void*),
196 "Expect pointer alignment");
197 static_assert(sizeof(typename Filter2::Call) == sizeof(void*),
198 "Expect pointer size");
199 StackData d;
200 Filter1 f1;
201 Filter2 f2;
202 d.AddFilter(&f2);
203 d.AddFilter(&f1);
204 EXPECT_EQ(d.call_data_alignment, alignof(void*));
205 // No padding needed, so just the sum of sizes.
206 EXPECT_EQ(d.call_data_size, sizeof(void*) + 1);
207 ASSERT_EQ(d.filter_constructor.size(), 2u);
208 ASSERT_EQ(d.filter_destructor.size(), 0u);
209 // Check channel data
210 EXPECT_EQ(d.filter_constructor[0].channel_data, &f2);
211 EXPECT_EQ(d.filter_constructor[1].channel_data, &f1);
212 // Check call offsets
213 EXPECT_EQ(d.filter_constructor[0].call_offset, 0);
214 EXPECT_EQ(d.filter_constructor[1].call_offset, sizeof(void*));
215 }
216
TEST(StackDataTest,EmptyFilter)217 TEST(StackDataTest, EmptyFilter) {
218 struct Filter1 {
219 struct Call {};
220 };
221 static_assert(std::is_empty<typename Filter1::Call>::value,
222 "Expect empty filter");
223 StackData d;
224 Filter1 f1;
225 d.AddFilter(&f1);
226 EXPECT_EQ(d.call_data_size, 0);
227 }
228
TEST(StackDataTest,OneFilterThenManyEmptyThenOneNonEmpty)229 TEST(StackDataTest, OneFilterThenManyEmptyThenOneNonEmpty) {
230 struct Filter1 {
231 struct Call {
232 char c;
233 };
234 };
235 struct Filter2 {
236 struct Call {};
237 };
238 StackData d;
239 // Declare filters
240 Filter1 f1a;
241 Filter2 f2a;
242 Filter2 f2b;
243 Filter2 f2c;
244 Filter2 f2d;
245 Filter1 f1b;
246 // Add filters
247 d.AddFilter(&f1a);
248 d.AddFilter(&f2a);
249 d.AddFilter(&f2b);
250 d.AddFilter(&f2c);
251 d.AddFilter(&f2d);
252 d.AddFilter(&f1b);
253 // Check overall size
254 EXPECT_EQ(d.call_data_size, 2);
255 ASSERT_EQ(d.filter_constructor.size(), 2u);
256 ASSERT_EQ(d.filter_destructor.size(), 0u);
257 // Check channel data
258 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1a);
259 EXPECT_EQ(d.filter_constructor[1].channel_data, &f1b);
260 // Check call offsets
261 EXPECT_EQ(d.filter_constructor[0].call_offset, 0);
262 EXPECT_EQ(d.filter_constructor[1].call_offset, 1);
263 }
264
TEST(StackDataTest,FilterInit)265 TEST(StackDataTest, FilterInit) {
266 struct Filter1 {
267 struct Call {
268 std::unique_ptr<int> p{new int(42)};
269 };
270 };
271 StackData d;
272 Filter1 f1;
273 d.AddFilter(&f1);
274 ASSERT_EQ(d.filter_constructor.size(), 1u);
275 ASSERT_EQ(d.filter_destructor.size(), 1u);
276 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
277 EXPECT_EQ(d.filter_constructor[0].call_offset, 0);
278 EXPECT_EQ(d.filter_destructor[0].call_offset, 0);
279 void* p = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
280 d.filter_constructor[0].call_init(p, &f1);
281 EXPECT_EQ(*static_cast<Filter1::Call*>(p)->p, 42);
282 d.filter_destructor[0].call_destroy(p);
283 gpr_free_aligned(p);
284 }
285
TEST(StackDataTest,FilterInitWithArg)286 TEST(StackDataTest, FilterInitWithArg) {
287 struct Filter1 {
288 struct Call {
289 explicit Call(Filter1* f) : p(new Filter1*(f)) {}
290 std::unique_ptr<Filter1*> p;
291 };
292 };
293 StackData d;
294 Filter1 f1;
295 d.AddFilter(&f1);
296 ASSERT_EQ(d.filter_constructor.size(), 1u);
297 ASSERT_EQ(d.filter_destructor.size(), 1u);
298 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
299 EXPECT_EQ(d.filter_constructor[0].call_offset, 0);
300 EXPECT_EQ(d.filter_destructor[0].call_offset, 0);
301 void* p = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
302 d.filter_constructor[0].call_init(p, &f1);
303 EXPECT_EQ(*static_cast<Filter1::Call*>(p)->p, &f1);
304 d.filter_destructor[0].call_destroy(p);
305 gpr_free_aligned(p);
306 }
307
TEST(StackDataTest,InstantClientInitialMetadataReturningVoid)308 TEST(StackDataTest, InstantClientInitialMetadataReturningVoid) {
309 struct Filter1 {
310 struct Call {
311 void OnClientInitialMetadata(ClientMetadata& md) {
312 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
313 }
314 };
315 };
316 StackData d;
317 Filter1 f1;
318 const size_t call_offset = d.AddFilter(&f1);
319 EXPECT_EQ(call_offset, 0);
320 EXPECT_EQ(d.call_data_size, 0);
321 d.AddClientInitialMetadataOp(&f1, call_offset);
322 ASSERT_EQ(d.filter_constructor.size(), 0u);
323 ASSERT_EQ(d.filter_destructor.size(), 0u);
324 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
325 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
326 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
327 // Instant => no poll/early destroy
328 EXPECT_EQ(d.client_initial_metadata.ops[0].poll, nullptr);
329 EXPECT_EQ(d.client_initial_metadata.ops[0].early_destroy, nullptr);
330 // Check promise init
331 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
332 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
333 char call_data;
334 auto r = d.client_initial_metadata.ops[0].promise_init(
335 nullptr, &call_data, d.client_initial_metadata.ops[0].channel_data,
336 std::move(md));
337 EXPECT_TRUE(r.ready());
338 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
339 "hello");
340 }
341
TEST(StackDataTest,InstantClientInitialMetadataReturningVoidTakingChannelPtr)342 TEST(StackDataTest, InstantClientInitialMetadataReturningVoidTakingChannelPtr) {
343 struct Filter1 {
344 struct Call {
345 void OnClientInitialMetadata(ClientMetadata& md, Filter1* p) {
346 p->v.push_back(42);
347 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
348 }
349 };
350 std::vector<int> v;
351 };
352 StackData d;
353 Filter1 f1;
354 const size_t call_offset = d.AddFilter(&f1);
355 EXPECT_EQ(call_offset, 0);
356 EXPECT_EQ(d.call_data_size, 0);
357 d.AddClientInitialMetadataOp(&f1, call_offset);
358 ASSERT_EQ(d.filter_constructor.size(), 0u);
359 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
360 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
361 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
362 // Instant => no poll/early destroy
363 EXPECT_EQ(d.client_initial_metadata.ops[0].poll, nullptr);
364 EXPECT_EQ(d.client_initial_metadata.ops[0].early_destroy, nullptr);
365 // Check promise init
366 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
367 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
368 char call_data;
369 auto r = d.client_initial_metadata.ops[0].promise_init(
370 nullptr, &call_data, d.client_initial_metadata.ops[0].channel_data,
371 std::move(md));
372 EXPECT_TRUE(r.ready());
373 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
374 "hello");
375 EXPECT_THAT(f1.v, ::testing::ElementsAre(42));
376 }
377
TEST(StackDataTest,InstantClientInitialMetadataReturningAbslStatus)378 TEST(StackDataTest, InstantClientInitialMetadataReturningAbslStatus) {
379 struct Filter1 {
380 class Call {
381 public:
382 absl::Status OnClientInitialMetadata(ClientMetadata& md) {
383 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
384 bool first = std::exchange(first_, false);
385 return first ? absl::OkStatus() : absl::CancelledError();
386 }
387
388 private:
389 bool first_ = true;
390 };
391 };
392 StackData d;
393 Filter1 f1;
394 const size_t call_offset = d.AddFilter(&f1);
395 EXPECT_EQ(call_offset, 0);
396 d.AddClientInitialMetadataOp(&f1, call_offset);
397 ASSERT_EQ(d.filter_constructor.size(), 1u);
398 ASSERT_EQ(d.filter_destructor.size(), 0u);
399 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
400 EXPECT_EQ(d.filter_constructor[0].call_offset, call_offset);
401 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
402 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
403 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
404 // Instant => no poll/early destroy
405 EXPECT_EQ(d.client_initial_metadata.ops[0].poll, nullptr);
406 EXPECT_EQ(d.client_initial_metadata.ops[0].early_destroy, nullptr);
407 // Check promise init
408 void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
409 d.filter_constructor[0].call_init(call_data, &f1);
410 auto arena = SimpleArenaAllocator()->MakeArena();
411 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
412 promise_detail::Context<Arena> ctx(arena.get());
413 // A succeeding call
414 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
415 auto r = d.client_initial_metadata.ops[0].promise_init(
416 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
417 std::move(md));
418 EXPECT_TRUE(r.ready());
419 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
420 "hello");
421 // A failing call
422 md = Arena::MakePooledForOverwrite<ClientMetadata>();
423 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
424 r = d.client_initial_metadata.ops[0].promise_init(
425 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
426 std::move(md));
427 EXPECT_TRUE(r.ready());
428 EXPECT_EQ(r.value().ok, nullptr);
429 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
430 gpr_free_aligned(call_data);
431 }
432
TEST(StackDataTest,InstantClientInitialMetadataReturningAbslStatusTakingChannelPtr)433 TEST(StackDataTest,
434 InstantClientInitialMetadataReturningAbslStatusTakingChannelPtr) {
435 struct Filter1 {
436 class Call {
437 public:
438 absl::Status OnClientInitialMetadata(ClientMetadata& md, Filter1* p) {
439 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
440 const bool first = std::exchange(first_, false);
441 p->v.push_back(first ? 11 : 22);
442 return first ? absl::OkStatus() : absl::CancelledError();
443 }
444
445 private:
446 bool first_ = true;
447 };
448 std::vector<int> v;
449 };
450 StackData d;
451 Filter1 f1;
452 const size_t call_offset = d.AddFilter(&f1);
453 EXPECT_EQ(call_offset, 0);
454 d.AddClientInitialMetadataOp(&f1, call_offset);
455 ASSERT_EQ(d.filter_constructor.size(), 1u);
456 ASSERT_EQ(d.filter_destructor.size(), 0u);
457 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
458 EXPECT_EQ(d.filter_constructor[0].call_offset, call_offset);
459 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
460 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
461 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
462 // Instant => no poll/early destroy
463 EXPECT_EQ(d.client_initial_metadata.ops[0].poll, nullptr);
464 EXPECT_EQ(d.client_initial_metadata.ops[0].early_destroy, nullptr);
465 // Check promise init
466 void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
467 d.filter_constructor[0].call_init(call_data, &f1);
468 auto arena = SimpleArenaAllocator()->MakeArena();
469 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
470 promise_detail::Context<Arena> ctx(arena.get());
471 // A succeeding call
472 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
473 auto r = d.client_initial_metadata.ops[0].promise_init(
474 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
475 std::move(md));
476 EXPECT_TRUE(r.ready());
477 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
478 "hello");
479 // A failing call
480 md = Arena::MakePooledForOverwrite<ClientMetadata>();
481 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
482 r = d.client_initial_metadata.ops[0].promise_init(
483 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
484 std::move(md));
485 EXPECT_TRUE(r.ready());
486 EXPECT_EQ(r.value().ok, nullptr);
487 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
488 gpr_free_aligned(call_data);
489 EXPECT_THAT(f1.v, ::testing::ElementsAre(11, 22));
490 }
491
TEST(StackDataTest,InstantClientInitialMetadataReturningServerMetadata)492 TEST(StackDataTest, InstantClientInitialMetadataReturningServerMetadata) {
493 struct Filter1 {
494 class Call {
495 public:
496 ServerMetadataHandle OnClientInitialMetadata(ClientMetadata& md) {
497 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
498 bool first = std::exchange(first_, false);
499 return first ? nullptr
500 : ServerMetadataFromStatus(GRPC_STATUS_CANCELLED);
501 }
502
503 private:
504 bool first_ = true;
505 };
506 };
507 StackData d;
508 Filter1 f1;
509 const size_t call_offset = d.AddFilter(&f1);
510 EXPECT_EQ(call_offset, 0);
511 d.AddClientInitialMetadataOp(&f1, call_offset);
512 ASSERT_EQ(d.filter_constructor.size(), 1u);
513 ASSERT_EQ(d.filter_destructor.size(), 0u);
514 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
515 EXPECT_EQ(d.filter_constructor[0].call_offset, call_offset);
516 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
517 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
518 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
519 // Instant => no poll/early destroy
520 EXPECT_EQ(d.client_initial_metadata.ops[0].poll, nullptr);
521 EXPECT_EQ(d.client_initial_metadata.ops[0].early_destroy, nullptr);
522 // Check promise init
523 void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
524 d.filter_constructor[0].call_init(call_data, &f1);
525 auto arena = SimpleArenaAllocator()->MakeArena();
526 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
527 promise_detail::Context<Arena> ctx(arena.get());
528 // A succeeding call
529 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
530 auto r = d.client_initial_metadata.ops[0].promise_init(
531 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
532 std::move(md));
533 EXPECT_TRUE(r.ready());
534 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
535 "hello");
536 // A failing call
537 md = Arena::MakePooledForOverwrite<ClientMetadata>();
538 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
539 r = d.client_initial_metadata.ops[0].promise_init(
540 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
541 std::move(md));
542 EXPECT_TRUE(r.ready());
543 EXPECT_EQ(r.value().ok, nullptr);
544 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
545 gpr_free_aligned(call_data);
546 }
547
TEST(StackDataTest,InstantClientInitialMetadataReturningServerMetadataTakingChannelPtr)548 TEST(StackDataTest,
549 InstantClientInitialMetadataReturningServerMetadataTakingChannelPtr) {
550 struct Filter1 {
551 class Call {
552 public:
553 ServerMetadataHandle OnClientInitialMetadata(ClientMetadata& md,
554 Filter1* p) {
555 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
556 const bool first = std::exchange(first_, false);
557 p->v.push_back(first ? 11 : 22);
558 return first ? nullptr
559 : ServerMetadataFromStatus(GRPC_STATUS_CANCELLED);
560 }
561
562 private:
563 bool first_ = true;
564 };
565 std::vector<int> v;
566 };
567 StackData d;
568 Filter1 f1;
569 const size_t call_offset = d.AddFilter(&f1);
570 EXPECT_EQ(call_offset, 0);
571 d.AddClientInitialMetadataOp(&f1, call_offset);
572 ASSERT_EQ(d.filter_constructor.size(), 1u);
573 ASSERT_EQ(d.filter_destructor.size(), 0u);
574 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
575 EXPECT_EQ(d.filter_constructor[0].call_offset, call_offset);
576 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
577 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
578 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
579 // Instant => no poll/early destroy
580 EXPECT_EQ(d.client_initial_metadata.ops[0].poll, nullptr);
581 EXPECT_EQ(d.client_initial_metadata.ops[0].early_destroy, nullptr);
582 // Check promise init
583 void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
584 d.filter_constructor[0].call_init(call_data, &f1);
585 auto arena = SimpleArenaAllocator()->MakeArena();
586 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
587 promise_detail::Context<Arena> ctx(arena.get());
588 // A succeeding call
589 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
590 auto r = d.client_initial_metadata.ops[0].promise_init(
591 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
592 std::move(md));
593 EXPECT_TRUE(r.ready());
594 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
595 "hello");
596 // A failing call
597 md = Arena::MakePooledForOverwrite<ClientMetadata>();
598 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
599 r = d.client_initial_metadata.ops[0].promise_init(
600 nullptr, call_data, d.client_initial_metadata.ops[0].channel_data,
601 std::move(md));
602 EXPECT_TRUE(r.ready());
603 EXPECT_EQ(r.value().ok, nullptr);
604 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
605 gpr_free_aligned(call_data);
606 EXPECT_THAT(f1.v, ::testing::ElementsAre(11, 22));
607 }
608
TEST(StackDataTest,PromiseClientInitialMetadataReturningAbslStatus)609 TEST(StackDataTest, PromiseClientInitialMetadataReturningAbslStatus) {
610 struct Filter1 {
611 class Call {
612 public:
613 auto OnClientInitialMetadata(ClientMetadata& md) {
614 return [this, i = std::make_unique<int>(3),
615 md = &md]() mutable -> Poll<absl::Status> {
616 --*i;
617 if (*i > 0) return Pending{};
618 md->Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
619 bool first = std::exchange(first_, false);
620 return first ? absl::OkStatus() : absl::CancelledError();
621 };
622 }
623
624 private:
625 bool first_ = true;
626 };
627 };
628 StackData d;
629 Filter1 f1;
630 const size_t call_offset = d.AddFilter(&f1);
631 EXPECT_EQ(call_offset, 0);
632 d.AddClientInitialMetadataOp(&f1, call_offset);
633 ASSERT_EQ(d.filter_constructor.size(), 1u);
634 ASSERT_EQ(d.filter_destructor.size(), 0u);
635 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
636 EXPECT_EQ(d.filter_constructor[0].call_offset, call_offset);
637 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
638 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
639 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
640 // Check promise init
641 void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
642 d.filter_constructor[0].call_init(call_data, &f1);
643 auto arena = SimpleArenaAllocator()->MakeArena();
644 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
645 promise_detail::Context<Arena> ctx(arena.get());
646 // A succeeding call
647 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
648 void* promise_data =
649 gpr_malloc_aligned(d.client_initial_metadata.promise_size,
650 d.client_initial_metadata.promise_alignment);
651 auto r = d.client_initial_metadata.ops[0].promise_init(
652 promise_data, call_data, d.client_initial_metadata.ops[0].channel_data,
653 std::move(md));
654 EXPECT_FALSE(r.ready());
655 r = d.client_initial_metadata.ops[0].poll(promise_data);
656 EXPECT_FALSE(r.ready());
657 r = d.client_initial_metadata.ops[0].poll(promise_data);
658 EXPECT_TRUE(r.ready());
659 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
660 "hello");
661 // A failing call
662 md = Arena::MakePooledForOverwrite<ClientMetadata>();
663 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
664 r = d.client_initial_metadata.ops[0].promise_init(
665 promise_data, call_data, d.client_initial_metadata.ops[0].channel_data,
666 std::move(md));
667 EXPECT_FALSE(r.ready());
668 r = d.client_initial_metadata.ops[0].poll(promise_data);
669 EXPECT_FALSE(r.ready());
670 r = d.client_initial_metadata.ops[0].poll(promise_data);
671 EXPECT_TRUE(r.ready());
672 EXPECT_EQ(r.value().ok, nullptr);
673 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
674 // A cancelled call
675 md = Arena::MakePooledForOverwrite<ClientMetadata>();
676 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
677 r = d.client_initial_metadata.ops[0].promise_init(
678 promise_data, call_data, d.client_initial_metadata.ops[0].channel_data,
679 std::move(md));
680 EXPECT_FALSE(r.ready());
681 d.client_initial_metadata.ops[0].early_destroy(promise_data);
682 // ASAN will trigger if things aren't cleaned up
683 gpr_free_aligned(promise_data);
684 gpr_free_aligned(call_data);
685 }
686
TEST(StackDataTest,PromiseClientInitialMetadataReturningAbslStatusTakingChannelPtr)687 TEST(StackDataTest,
688 PromiseClientInitialMetadataReturningAbslStatusTakingChannelPtr) {
689 struct Filter1 {
690 class Call {
691 public:
692 auto OnClientInitialMetadata(ClientMetadata& md, Filter1* p) {
693 return [this, p, i = std::make_unique<int>(3),
694 md = &md]() mutable -> Poll<absl::Status> {
695 --*i;
696 if (*i > 0) return Pending{};
697 md->Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
698 bool first = std::exchange(first_, false);
699 p->v.push_back(first ? 11 : 22);
700 return first ? absl::OkStatus() : absl::CancelledError();
701 };
702 }
703
704 private:
705 bool first_ = true;
706 };
707 std::vector<int> v;
708 };
709 StackData d;
710 Filter1 f1;
711 const size_t call_offset = d.AddFilter(&f1);
712 EXPECT_EQ(call_offset, 0);
713 d.AddClientInitialMetadataOp(&f1, call_offset);
714 ASSERT_EQ(d.filter_constructor.size(), 1u);
715 ASSERT_EQ(d.filter_destructor.size(), 0u);
716 EXPECT_EQ(d.filter_constructor[0].channel_data, &f1);
717 EXPECT_EQ(d.filter_constructor[0].call_offset, call_offset);
718 ASSERT_EQ(d.client_initial_metadata.ops.size(), 1u);
719 EXPECT_EQ(d.client_initial_metadata.ops[0].call_offset, call_offset);
720 EXPECT_EQ(d.client_initial_metadata.ops[0].channel_data, &f1);
721 // Check promise init
722 void* call_data = gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
723 d.filter_constructor[0].call_init(call_data, &f1);
724 auto arena = SimpleArenaAllocator()->MakeArena();
725 auto md = Arena::MakePooledForOverwrite<ClientMetadata>();
726 promise_detail::Context<Arena> ctx(arena.get());
727 // A succeeding call
728 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
729 void* promise_data =
730 gpr_malloc_aligned(d.client_initial_metadata.promise_size,
731 d.client_initial_metadata.promise_alignment);
732 auto r = d.client_initial_metadata.ops[0].promise_init(
733 promise_data, call_data, d.client_initial_metadata.ops[0].channel_data,
734 std::move(md));
735 EXPECT_FALSE(r.ready());
736 r = d.client_initial_metadata.ops[0].poll(promise_data);
737 EXPECT_FALSE(r.ready());
738 r = d.client_initial_metadata.ops[0].poll(promise_data);
739 EXPECT_TRUE(r.ready());
740 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
741 "hello");
742 // A failing call
743 md = Arena::MakePooledForOverwrite<ClientMetadata>();
744 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
745 r = d.client_initial_metadata.ops[0].promise_init(
746 promise_data, call_data, d.client_initial_metadata.ops[0].channel_data,
747 std::move(md));
748 EXPECT_FALSE(r.ready());
749 r = d.client_initial_metadata.ops[0].poll(promise_data);
750 EXPECT_FALSE(r.ready());
751 r = d.client_initial_metadata.ops[0].poll(promise_data);
752 EXPECT_TRUE(r.ready());
753 EXPECT_EQ(r.value().ok, nullptr);
754 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
755 // A cancelled call
756 md = Arena::MakePooledForOverwrite<ClientMetadata>();
757 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
758 r = d.client_initial_metadata.ops[0].promise_init(
759 promise_data, call_data, d.client_initial_metadata.ops[0].channel_data,
760 std::move(md));
761 EXPECT_FALSE(r.ready());
762 d.client_initial_metadata.ops[0].early_destroy(promise_data);
763 // ASAN will trigger if things aren't cleaned up
764 gpr_free_aligned(promise_data);
765 gpr_free_aligned(call_data);
766 EXPECT_THAT(f1.v, ::testing::ElementsAre(11, 22));
767 }
768
TEST(StackDataTest,InstantServerInitialMetadataReturningVoid)769 TEST(StackDataTest, InstantServerInitialMetadataReturningVoid) {
770 struct Filter1 {
771 struct Call {
772 void OnServerInitialMetadata(ServerMetadata& md) {
773 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
774 }
775 };
776 };
777 StackData d;
778 Filter1 f1;
779 const size_t call_offset = d.AddFilter(&f1);
780 EXPECT_EQ(call_offset, 0);
781 EXPECT_EQ(d.call_data_size, 0);
782 d.AddServerInitialMetadataOp(&f1, call_offset);
783 ASSERT_EQ(d.filter_constructor.size(), 0);
784 ASSERT_EQ(d.filter_destructor.size(), 0);
785 ASSERT_EQ(d.server_initial_metadata.ops.size(), 1u);
786 EXPECT_EQ(d.server_initial_metadata.ops[0].call_offset, call_offset);
787 EXPECT_EQ(d.server_initial_metadata.ops[0].channel_data, &f1);
788 // Instant => no poll/early destroy
789 EXPECT_EQ(d.server_initial_metadata.ops[0].poll, nullptr);
790 EXPECT_EQ(d.server_initial_metadata.ops[0].early_destroy, nullptr);
791 // Check promise init
792 auto arena = SimpleArenaAllocator()->MakeArena();
793 auto md = Arena::MakePooledForOverwrite<ServerMetadata>();
794 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
795 char call_data;
796 auto r = d.server_initial_metadata.ops[0].promise_init(
797 nullptr, &call_data, d.server_initial_metadata.ops[0].channel_data,
798 std::move(md));
799 EXPECT_TRUE(r.ready());
800 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
801 "hello");
802 }
803
TEST(StackDataTest,InstantClientToServerMessagesReturningVoid)804 TEST(StackDataTest, InstantClientToServerMessagesReturningVoid) {
805 struct Filter1 {
806 struct Call {
807 void OnClientToServerMessage(Message& message) {
808 message.mutable_flags() |= 1;
809 }
810 };
811 };
812 StackData d;
813 Filter1 f1;
814 const size_t call_offset = d.AddFilter(&f1);
815 EXPECT_EQ(call_offset, 0);
816 EXPECT_EQ(d.call_data_size, 0);
817 d.AddClientToServerMessageOp(&f1, call_offset);
818 ASSERT_EQ(d.filter_constructor.size(), 0u);
819 ASSERT_EQ(d.filter_destructor.size(), 0u);
820 ASSERT_EQ(d.client_to_server_messages.ops.size(), 1u);
821 EXPECT_EQ(d.client_to_server_messages.ops[0].call_offset, call_offset);
822 EXPECT_EQ(d.client_to_server_messages.ops[0].channel_data, &f1);
823 // Instant => no poll/early destroy
824 EXPECT_EQ(d.client_to_server_messages.ops[0].poll, nullptr);
825 EXPECT_EQ(d.client_to_server_messages.ops[0].early_destroy, nullptr);
826 // Check promise init
827 auto arena = SimpleArenaAllocator()->MakeArena();
828 auto message = Arena::MakePooled<Message>(SliceBuffer(), 0);
829 char call_data;
830 auto r = d.client_to_server_messages.ops[0].promise_init(
831 nullptr, &call_data, d.client_to_server_messages.ops[0].channel_data,
832 std::move(message));
833 EXPECT_TRUE(r.ready());
834 EXPECT_EQ(r.value().ok->flags(), 1u);
835 }
836
TEST(StackDataTest,InstantServerToClientMessagesReturningVoid)837 TEST(StackDataTest, InstantServerToClientMessagesReturningVoid) {
838 struct Filter1 {
839 struct Call {
840 void OnServerToClientMessage(Message& message) {
841 message.mutable_flags() |= 1;
842 }
843 };
844 };
845 StackData d;
846 Filter1 f1;
847 const size_t call_offset = d.AddFilter(&f1);
848 EXPECT_EQ(call_offset, 0);
849 EXPECT_EQ(d.call_data_size, 0);
850 d.AddServerToClientMessageOp(&f1, call_offset);
851 ASSERT_EQ(d.filter_constructor.size(), 0u);
852 ASSERT_EQ(d.filter_destructor.size(), 0u);
853 ASSERT_EQ(d.server_to_client_messages.ops.size(), 1u);
854 EXPECT_EQ(d.server_to_client_messages.ops[0].call_offset, call_offset);
855 EXPECT_EQ(d.server_to_client_messages.ops[0].channel_data, &f1);
856 // Instant => no poll/early destroy
857 EXPECT_EQ(d.server_to_client_messages.ops[0].poll, nullptr);
858 EXPECT_EQ(d.server_to_client_messages.ops[0].early_destroy, nullptr);
859 // Check promise init
860 auto arena = SimpleArenaAllocator()->MakeArena();
861 auto message = Arena::MakePooled<Message>(SliceBuffer(), 0);
862 char call_data;
863 auto r = d.server_to_client_messages.ops[0].promise_init(
864 nullptr, &call_data, d.server_to_client_messages.ops[0].channel_data,
865 std::move(message));
866 EXPECT_TRUE(r.ready());
867 EXPECT_EQ(r.value().ok->flags(), 1u);
868 }
869
TEST(StackDataTest,ServerTrailingMetadataReturningVoid)870 TEST(StackDataTest, ServerTrailingMetadataReturningVoid) {
871 struct Filter1 {
872 struct Call {
873 void OnServerTrailingMetadata(ServerMetadata& md) {
874 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
875 }
876 };
877 };
878 StackData d;
879 Filter1 f1;
880 const size_t call_offset = d.AddFilter(&f1);
881 EXPECT_EQ(call_offset, 0);
882 EXPECT_EQ(d.call_data_size, 0);
883 d.AddServerTrailingMetadataOp(&f1, call_offset);
884 ASSERT_EQ(d.filter_constructor.size(), 0u);
885 ASSERT_EQ(d.filter_destructor.size(), 0u);
886 ASSERT_EQ(d.server_trailing_metadata.size(), 1u);
887 EXPECT_EQ(d.server_trailing_metadata[0].call_offset, call_offset);
888 EXPECT_EQ(d.server_trailing_metadata[0].channel_data, &f1);
889 // Check operation
890 auto arena = SimpleArenaAllocator()->MakeArena();
891 auto md = Arena::MakePooledForOverwrite<ServerMetadata>();
892 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
893 char call_data;
894 auto r = d.server_trailing_metadata[0].server_trailing_metadata(
895 &call_data, d.server_trailing_metadata[0].channel_data, std::move(md));
896 EXPECT_EQ(r->get_pointer(HttpPathMetadata())->as_string_view(), "hello");
897 }
898
TEST(StackDataTest,ServerTrailingMetadataReturningVoidTakingChannelPtr)899 TEST(StackDataTest, ServerTrailingMetadataReturningVoidTakingChannelPtr) {
900 struct Filter1 {
901 struct Call {
902 void OnServerTrailingMetadata(ServerMetadata& md, Filter1* p) {
903 p->v.push_back(42);
904 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
905 }
906 };
907 std::vector<int> v;
908 };
909 StackData d;
910 Filter1 f1;
911 const size_t call_offset = d.AddFilter(&f1);
912 EXPECT_EQ(call_offset, 0);
913 EXPECT_EQ(d.call_data_size, 0);
914 d.AddServerTrailingMetadataOp(&f1, call_offset);
915 ASSERT_EQ(d.filter_constructor.size(), 0u);
916 ASSERT_EQ(d.filter_destructor.size(), 0u);
917 ASSERT_EQ(d.server_trailing_metadata.size(), 1u);
918 EXPECT_EQ(d.server_trailing_metadata[0].call_offset, call_offset);
919 EXPECT_EQ(d.server_trailing_metadata[0].channel_data, &f1);
920 // Check operation
921 auto arena = SimpleArenaAllocator()->MakeArena();
922 auto md = Arena::MakePooledForOverwrite<ServerMetadata>();
923 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
924 char call_data;
925 auto r = d.server_trailing_metadata[0].server_trailing_metadata(
926 &call_data, d.server_trailing_metadata[0].channel_data, std::move(md));
927 EXPECT_EQ(r->get_pointer(HttpPathMetadata())->as_string_view(), "hello");
928 EXPECT_THAT(f1.v, ::testing::ElementsAre(42));
929 }
930
931 } // namespace filters_detail
932
933 ///////////////////////////////////////////////////////////////////////////////
934 // StackBuilder
935
936 class CallFilters::StackTestSpouse {
937 public:
StackDataFrom(const Stack & stack)938 static const filters_detail::StackData& StackDataFrom(const Stack& stack) {
939 return stack.data_;
940 }
941 };
942
TEST(StackBuilderTest,AddOnServerTrailingMetadata)943 TEST(StackBuilderTest, AddOnServerTrailingMetadata) {
944 CallFilters::StackBuilder b;
945 b.AddOnServerTrailingMetadata(
946 [x = std::make_unique<int>(42)](ServerMetadata&) { EXPECT_EQ(*x, 42); });
947 auto stack = b.Build();
948 const auto& data = CallFilters::StackTestSpouse().StackDataFrom(*stack);
949 ASSERT_EQ(data.server_trailing_metadata.size(), 1u);
950 ASSERT_EQ(data.client_initial_metadata.ops.size(), 0u);
951 ASSERT_EQ(data.client_to_server_messages.ops.size(), 0u);
952 ASSERT_EQ(data.server_to_client_messages.ops.size(), 0u);
953 ASSERT_EQ(data.server_initial_metadata.ops.size(), 0u);
954 EXPECT_EQ(data.server_trailing_metadata[0].call_offset, 0);
955 EXPECT_NE(data.server_trailing_metadata[0].channel_data, nullptr);
956 }
957
958 ///////////////////////////////////////////////////////////////////////////////
959 // OperationExecutor
960
961 namespace filters_detail {
962
TEST(OperationExecutorTest,NoOp)963 TEST(OperationExecutorTest, NoOp) {
964 OperationExecutor<ClientMetadataHandle> pipe;
965 EXPECT_FALSE(pipe.IsRunning());
966 }
967
TEST(OperationExecutorTest,InstantTwo)968 TEST(OperationExecutorTest, InstantTwo) {
969 class Filter1 {
970 public:
971 class Call {
972 public:
973 absl::Status OnClientInitialMetadata(ClientMetadata& md) {
974 bool first = std::exchange(first_, false);
975 if (!first) {
976 EXPECT_EQ(md.get_pointer(HttpPathMetadata()), nullptr);
977 }
978 if (md.get_pointer(HttpPathMetadata()) != nullptr) {
979 md.Set(HttpPathMetadata(), Slice::FromStaticString("world"));
980 } else {
981 md.Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
982 }
983 return first ? absl::OkStatus() : absl::CancelledError();
984 }
985
986 private:
987 bool first_ = true;
988 };
989 };
990 StackData d;
991 Filter1 f1;
992 Filter1 f2;
993 const size_t call_offset1 = d.AddFilter(&f1);
994 const size_t call_offset2 = d.AddFilter(&f2);
995 d.AddClientInitialMetadataOp(&f1, call_offset1);
996 d.AddClientInitialMetadataOp(&f2, call_offset2);
997 ASSERT_EQ(d.filter_constructor.size(), 2u);
998 ASSERT_EQ(d.filter_destructor.size(), 0u);
999 ASSERT_EQ(d.client_initial_metadata.ops.size(), 2u);
1000 void* call_data1 =
1001 gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
1002 void* call_data2 = Offset(call_data1, d.filter_constructor[1].call_offset);
1003 d.filter_constructor[0].call_init(call_data1, &f1);
1004 d.filter_constructor[1].call_init(call_data2, &f2);
1005 OperationExecutor<ClientMetadataHandle> transformer;
1006 auto arena = SimpleArenaAllocator()->MakeArena();
1007 promise_detail::Context<Arena> ctx(arena.get());
1008 // First call succeeds
1009 auto md = Arena::MakePooledForOverwrite<ServerMetadata>();
1010 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
1011 auto r =
1012 transformer.Start(&d.client_initial_metadata, std::move(md), call_data1);
1013 EXPECT_TRUE(r.ready());
1014 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
1015 "world");
1016 // Second fails
1017 md = Arena::MakePooledForOverwrite<ServerMetadata>();
1018 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
1019 r = transformer.Start(&d.client_initial_metadata, std::move(md), call_data1);
1020 EXPECT_TRUE(r.ready());
1021 EXPECT_EQ(r.value().ok, nullptr);
1022 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
1023 gpr_free_aligned(call_data1);
1024 }
1025
TEST(OperationExecutorTest,PromiseTwo)1026 TEST(OperationExecutorTest, PromiseTwo) {
1027 class Filter1 {
1028 public:
1029 class Call {
1030 public:
1031 auto OnClientInitialMetadata(ClientMetadata& md) {
1032 return [md = &md, this,
1033 i = std::make_unique<int>(3)]() mutable -> Poll<absl::Status> {
1034 --*i;
1035 if (*i > 0) return Pending{};
1036 bool first = std::exchange(first_, false);
1037 if (!first) {
1038 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
1039 }
1040 if (md->get_pointer(HttpPathMetadata()) != nullptr) {
1041 md->Set(HttpPathMetadata(), Slice::FromStaticString("world"));
1042 } else {
1043 md->Set(HttpPathMetadata(), Slice::FromStaticString("hello"));
1044 }
1045 return first ? absl::OkStatus() : absl::CancelledError();
1046 };
1047 }
1048
1049 private:
1050 bool first_ = true;
1051 };
1052 };
1053 StackData d;
1054 Filter1 f1;
1055 Filter1 f2;
1056 const size_t call_offset1 = d.AddFilter(&f1);
1057 const size_t call_offset2 = d.AddFilter(&f2);
1058 d.AddClientInitialMetadataOp(&f1, call_offset1);
1059 d.AddClientInitialMetadataOp(&f2, call_offset2);
1060 ASSERT_EQ(d.filter_constructor.size(), 2u);
1061 ASSERT_EQ(d.filter_destructor.size(), 0u);
1062 ASSERT_EQ(d.client_initial_metadata.ops.size(), 2u);
1063 void* call_data1 =
1064 gpr_malloc_aligned(d.call_data_size, d.call_data_alignment);
1065 void* call_data2 = Offset(call_data1, d.filter_constructor[1].call_offset);
1066 d.filter_constructor[0].call_init(call_data1, &f1);
1067 d.filter_constructor[1].call_init(call_data2, &f2);
1068 OperationExecutor<ClientMetadataHandle> transformer;
1069 auto arena = SimpleArenaAllocator()->MakeArena();
1070 promise_detail::Context<Arena> ctx(arena.get());
1071 // First call succeeds after two sets of two step delays.
1072 auto md = Arena::MakePooledForOverwrite<ServerMetadata>();
1073 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
1074 auto r =
1075 transformer.Start(&d.client_initial_metadata, std::move(md), call_data1);
1076 EXPECT_FALSE(r.ready());
1077 r = transformer.Step(call_data1);
1078 EXPECT_FALSE(r.ready());
1079 r = transformer.Step(call_data1);
1080 EXPECT_FALSE(r.ready());
1081 r = transformer.Step(call_data1);
1082 EXPECT_FALSE(r.ready());
1083 r = transformer.Step(call_data1);
1084 EXPECT_TRUE(r.ready());
1085 EXPECT_EQ(r.value().ok->get_pointer(HttpPathMetadata())->as_string_view(),
1086 "world");
1087 // Second fails after one set of two step delays.
1088 md = Arena::MakePooledForOverwrite<ServerMetadata>();
1089 EXPECT_EQ(md->get_pointer(HttpPathMetadata()), nullptr);
1090 r = transformer.Start(&d.client_initial_metadata, std::move(md), call_data1);
1091 EXPECT_FALSE(r.ready());
1092 r = transformer.Step(call_data1);
1093 EXPECT_FALSE(r.ready());
1094 r = transformer.Step(call_data1);
1095 EXPECT_TRUE(r.ready());
1096 EXPECT_EQ(r.value().ok, nullptr);
1097 EXPECT_EQ(r.value().error->get(GrpcStatusMetadata()), GRPC_STATUS_CANCELLED);
1098 gpr_free_aligned(call_data1);
1099 }
1100
1101 } // namespace filters_detail
1102
1103 ///////////////////////////////////////////////////////////////////////////////
1104 // CallFilters
1105
TEST(CallFiltersTest,CanBuildStack)1106 TEST(CallFiltersTest, CanBuildStack) {
1107 struct Filter {
1108 struct Call {
1109 void OnClientInitialMetadata(ClientMetadata&) {}
1110 void OnServerInitialMetadata(ServerMetadata&) {}
1111 void OnClientToServerMessage(Message&) {}
1112 void OnClientToServerHalfClose() {}
1113 void OnServerToClientMessage(Message&) {}
1114 void OnServerTrailingMetadata(ServerMetadata&) {}
1115 void OnFinalize(const grpc_call_final_info*) {}
1116 };
1117 };
1118 CallFilters::StackBuilder builder;
1119 Filter f;
1120 builder.Add(&f);
1121 auto stack = builder.Build();
1122 EXPECT_NE(stack, nullptr);
1123 }
1124
TEST(CallFiltersTest,UnaryCall)1125 TEST(CallFiltersTest, UnaryCall) {
1126 struct Filter {
1127 struct Call {
1128 void OnClientInitialMetadata(ClientMetadata&, Filter* f) {
1129 f->steps.push_back(absl::StrCat(f->label, ":OnClientInitialMetadata"));
1130 }
1131 void OnServerInitialMetadata(ServerMetadata&, Filter* f) {
1132 f->steps.push_back(absl::StrCat(f->label, ":OnServerInitialMetadata"));
1133 }
1134 void OnClientToServerMessage(Message&, Filter* f) {
1135 f->steps.push_back(absl::StrCat(f->label, ":OnClientToServerMessage"));
1136 }
1137 void OnClientToServerHalfClose(Filter* f) {
1138 f->steps.push_back(
1139 absl::StrCat(f->label, ":OnClientToServerHalfClose"));
1140 }
1141 void OnServerToClientMessage(Message&, Filter* f) {
1142 f->steps.push_back(absl::StrCat(f->label, ":OnServerToClientMessage"));
1143 }
1144 void OnServerTrailingMetadata(ServerMetadata&, Filter* f) {
1145 f->steps.push_back(absl::StrCat(f->label, ":OnServerTrailingMetadata"));
1146 }
1147 void OnFinalize(const grpc_call_final_info*, Filter* f) {
1148 f->steps.push_back(absl::StrCat(f->label, ":OnFinalize"));
1149 }
1150 std::unique_ptr<int> i = std::make_unique<int>(3);
1151 };
1152
1153 const std::string label;
1154 std::vector<std::string>& steps;
1155 };
1156 std::vector<std::string> steps;
1157 Filter f1{"f1", steps};
1158 Filter f2{"f2", steps};
1159 CallFilters::StackBuilder builder;
1160 builder.Add(&f1);
1161 builder.Add(&f2);
1162 auto arena = SimpleArenaAllocator()->MakeArena();
1163 CallFilters filters(Arena::MakePooledForOverwrite<ClientMetadata>());
1164 filters.AddStack(builder.Build());
1165 filters.Start();
1166 promise_detail::Context<Arena> ctx(arena.get());
1167 StrictMock<MockActivity> activity;
1168 activity.Activate();
1169 // Pull client initial metadata
1170 auto pull_client_initial_metadata = filters.PullClientInitialMetadata();
1171 EXPECT_THAT(pull_client_initial_metadata(), IsReady());
1172 Mock::VerifyAndClearExpectations(&activity);
1173 // Push client to server message
1174 auto push_client_to_server_message = filters.PushClientToServerMessage(
1175 Arena::MakePooled<Message>(SliceBuffer(), 0));
1176 EXPECT_THAT(push_client_to_server_message(), IsPending());
1177 auto pull_client_to_server_message = filters.PullClientToServerMessage();
1178 // Pull client to server message, expect a wakeup
1179 EXPECT_WAKEUP(activity,
1180 EXPECT_THAT(pull_client_to_server_message(), IsReady()));
1181 // Push should be done
1182 EXPECT_THAT(push_client_to_server_message(), IsReady(Success{}));
1183 // Push server initial metadata
1184 filters.PushServerInitialMetadata(
1185 Arena::MakePooledForOverwrite<ServerMetadata>());
1186 auto pull_server_initial_metadata = filters.PullServerInitialMetadata();
1187 // Pull server initial metadata
1188 EXPECT_THAT(pull_server_initial_metadata(), IsReady());
1189 Mock::VerifyAndClearExpectations(&activity);
1190 // Push server to client message
1191 auto push_server_to_client_message = filters.PushServerToClientMessage(
1192 Arena::MakePooled<Message>(SliceBuffer(), 0));
1193 EXPECT_THAT(push_server_to_client_message(), IsPending());
1194 auto pull_server_to_client_message = filters.PullServerToClientMessage();
1195 // Pull server to client message, expect a wakeup
1196 EXPECT_WAKEUP(activity,
1197 EXPECT_THAT(pull_server_to_client_message(), IsReady()));
1198 // Push should be done
1199 EXPECT_THAT(push_server_to_client_message(), IsReady(Success{}));
1200 // Push server trailing metadata
1201 filters.PushServerTrailingMetadata(
1202 Arena::MakePooledForOverwrite<ServerMetadata>());
1203 // Pull server trailing metadata
1204 auto pull_server_trailing_metadata = filters.PullServerTrailingMetadata();
1205 // Should be done
1206 EXPECT_THAT(pull_server_trailing_metadata(), IsReady());
1207 filters.Finalize(nullptr);
1208 EXPECT_THAT(steps,
1209 ::testing::ElementsAre(
1210 "f1:OnClientInitialMetadata", "f2:OnClientInitialMetadata",
1211 "f1:OnClientToServerMessage", "f2:OnClientToServerMessage",
1212 "f2:OnServerInitialMetadata", "f1:OnServerInitialMetadata",
1213 "f2:OnServerToClientMessage", "f1:OnServerToClientMessage",
1214 "f2:OnServerTrailingMetadata", "f1:OnServerTrailingMetadata",
1215 "f1:OnFinalize", "f2:OnFinalize"));
1216 }
1217
TEST(CallFiltersTest,UnaryCallWithMultiStack)1218 TEST(CallFiltersTest, UnaryCallWithMultiStack) {
1219 struct Filter {
1220 struct Call {
1221 void OnClientInitialMetadata(ClientMetadata&, Filter* f) {
1222 f->steps.push_back(absl::StrCat(f->label, ":OnClientInitialMetadata"));
1223 }
1224 void OnServerInitialMetadata(ServerMetadata&, Filter* f) {
1225 f->steps.push_back(absl::StrCat(f->label, ":OnServerInitialMetadata"));
1226 }
1227 void OnClientToServerMessage(Message&, Filter* f) {
1228 f->steps.push_back(absl::StrCat(f->label, ":OnClientToServerMessage"));
1229 }
1230 void OnClientToServerHalfClose(Filter* f) {
1231 f->steps.push_back(
1232 absl::StrCat(f->label, ":OnClientToServerHalfClose"));
1233 }
1234 void OnServerToClientMessage(Message&, Filter* f) {
1235 f->steps.push_back(absl::StrCat(f->label, ":OnServerToClientMessage"));
1236 }
1237 void OnServerTrailingMetadata(ServerMetadata&, Filter* f) {
1238 f->steps.push_back(absl::StrCat(f->label, ":OnServerTrailingMetadata"));
1239 }
1240 void OnFinalize(const grpc_call_final_info*, Filter* f) {
1241 f->steps.push_back(absl::StrCat(f->label, ":OnFinalize"));
1242 }
1243 std::unique_ptr<int> i = std::make_unique<int>(3);
1244 };
1245
1246 const std::string label;
1247 std::vector<std::string>& steps;
1248 };
1249 std::vector<std::string> steps;
1250 Filter f1{"f1", steps};
1251 Filter f2{"f2", steps};
1252 CallFilters::StackBuilder builder1;
1253 CallFilters::StackBuilder builder2;
1254 builder1.Add(&f1);
1255 builder2.Add(&f2);
1256 auto arena = SimpleArenaAllocator()->MakeArena();
1257 CallFilters filters(Arena::MakePooledForOverwrite<ClientMetadata>());
1258 filters.AddStack(builder1.Build());
1259 filters.AddStack(builder2.Build());
1260 filters.Start();
1261 promise_detail::Context<Arena> ctx(arena.get());
1262 StrictMock<MockActivity> activity;
1263 activity.Activate();
1264 // Pull client initial metadata
1265 auto pull_client_initial_metadata = filters.PullClientInitialMetadata();
1266 EXPECT_THAT(pull_client_initial_metadata(), IsReady());
1267 Mock::VerifyAndClearExpectations(&activity);
1268 // Push client to server message
1269 auto push_client_to_server_message = filters.PushClientToServerMessage(
1270 Arena::MakePooled<Message>(SliceBuffer(), 0));
1271 EXPECT_THAT(push_client_to_server_message(), IsPending());
1272 auto pull_client_to_server_message = filters.PullClientToServerMessage();
1273 // Pull client to server message, expect a wakeup
1274 EXPECT_WAKEUP(activity,
1275 EXPECT_THAT(pull_client_to_server_message(), IsReady()));
1276 // Push should be done
1277 EXPECT_THAT(push_client_to_server_message(), IsReady(Success{}));
1278 // Push server initial metadata
1279 filters.PushServerInitialMetadata(
1280 Arena::MakePooledForOverwrite<ServerMetadata>());
1281 auto pull_server_initial_metadata = filters.PullServerInitialMetadata();
1282 // Pull server initial metadata
1283 EXPECT_THAT(pull_server_initial_metadata(), IsReady());
1284 Mock::VerifyAndClearExpectations(&activity);
1285 // Push server to client message
1286 auto push_server_to_client_message = filters.PushServerToClientMessage(
1287 Arena::MakePooled<Message>(SliceBuffer(), 0));
1288 EXPECT_THAT(push_server_to_client_message(), IsPending());
1289 auto pull_server_to_client_message = filters.PullServerToClientMessage();
1290 // Pull server to client message, expect a wakeup
1291 EXPECT_WAKEUP(activity,
1292 EXPECT_THAT(pull_server_to_client_message(), IsReady()));
1293 // Push should be done
1294 EXPECT_THAT(push_server_to_client_message(), IsReady(Success{}));
1295 // Push server trailing metadata
1296 filters.PushServerTrailingMetadata(
1297 Arena::MakePooledForOverwrite<ServerMetadata>());
1298 // Pull server trailing metadata
1299 auto pull_server_trailing_metadata = filters.PullServerTrailingMetadata();
1300 // Should be done
1301 EXPECT_THAT(pull_server_trailing_metadata(), IsReady());
1302 filters.Finalize(nullptr);
1303 EXPECT_THAT(steps,
1304 ::testing::ElementsAre(
1305 "f1:OnClientInitialMetadata", "f2:OnClientInitialMetadata",
1306 "f1:OnClientToServerMessage", "f2:OnClientToServerMessage",
1307 "f2:OnServerInitialMetadata", "f1:OnServerInitialMetadata",
1308 "f2:OnServerToClientMessage", "f1:OnServerToClientMessage",
1309 "f2:OnServerTrailingMetadata", "f1:OnServerTrailingMetadata",
1310 "f1:OnFinalize", "f2:OnFinalize"));
1311 }
1312
1313 } // namespace grpc_core
1314
main(int argc,char ** argv)1315 int main(int argc, char** argv) {
1316 ::testing::InitGoogleTest(&argc, argv);
1317 grpc_tracer_init();
1318 return RUN_ALL_TESTS();
1319 }
1320