1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/call/request_buffer.h"
16
17 #include "gtest/gtest.h"
18 #include "test/core/promise/poll_matcher.h"
19
20 using testing::Mock;
21 using testing::StrictMock;
22
23 namespace grpc_core {
24
25 namespace {
CrashOnParseError(absl::string_view error,const Slice & data)26 void CrashOnParseError(absl::string_view error, const Slice& data) {
27 LOG(FATAL) << "Failed to parse " << error << " from "
28 << data.as_string_view();
29 }
30
31 // A mock activity that can be activated and deactivated.
32 class MockActivity : public Activity, public Wakeable {
33 public:
34 MOCK_METHOD(void, WakeupRequested, ());
35
ForceImmediateRepoll(WakeupMask)36 void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
Orphan()37 void Orphan() override {}
MakeOwningWaker()38 Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()39 Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)40 void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
WakeupAsync(WakeupMask)41 void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
Drop(WakeupMask)42 void Drop(WakeupMask /*mask*/) override {}
DebugTag() const43 std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const44 std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
45 return DebugTag();
46 }
47
Activate()48 void Activate() {
49 if (scoped_activity_ == nullptr) {
50 scoped_activity_ = std::make_unique<ScopedActivity>(this);
51 }
52 }
53
Deactivate()54 void Deactivate() { scoped_activity_.reset(); }
55
56 private:
57 std::unique_ptr<ScopedActivity> scoped_activity_;
58 };
59
60 #define EXPECT_WAKEUP(activity, statement) \
61 EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \
62 statement; \
63 Mock::VerifyAndClearExpectations(&(activity));
64
TestMetadata()65 ClientMetadataHandle TestMetadata() {
66 ClientMetadataHandle md = Arena::MakePooledForOverwrite<ClientMetadata>();
67 md->Append("key", Slice::FromStaticString("value"), CrashOnParseError);
68 return md;
69 }
70
TestMessage(int index=0)71 MessageHandle TestMessage(int index = 0) {
72 return Arena::MakePooled<Message>(
73 SliceBuffer(Slice::FromCopiedString(absl::StrCat("message ", index))), 0);
74 }
75
76 MATCHER(IsTestMetadata, "") {
77 if (arg == nullptr) return false;
78 std::string backing;
79 if (arg->GetStringValue("key", &backing) != "value") {
80 *result_listener << arg->DebugString();
81 return false;
82 }
83 return true;
84 }
85
86 MATCHER(IsTestMessage, "") {
87 if (arg == nullptr) return false;
88 if (arg->flags() != 0) {
89 *result_listener << "flags: " << arg->flags();
90 return false;
91 }
92 if (arg->payload()->JoinIntoString() != "message 0") {
93 *result_listener << "payload: " << arg->payload()->JoinIntoString();
94 return false;
95 }
96 return true;
97 }
98
99 MATCHER_P(IsTestMessage, index, "") {
100 if (arg == nullptr) return false;
101 if (arg->flags() != 0) {
102 *result_listener << "flags: " << arg->flags();
103 return false;
104 }
105 if (arg->payload()->JoinIntoString() != absl::StrCat("message ", index)) {
106 *result_listener << "payload: " << arg->payload()->JoinIntoString();
107 return false;
108 }
109 return true;
110 }
111
112 } // namespace
113
TEST(RequestBufferTest,NoOp)114 TEST(RequestBufferTest, NoOp) { RequestBuffer buffer; }
115
TEST(RequestBufferTest,PushThenPullClientInitialMetadata)116 TEST(RequestBufferTest, PushThenPullClientInitialMetadata) {
117 RequestBuffer buffer;
118 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
119 RequestBuffer::Reader reader(&buffer);
120 auto poll = reader.PullClientInitialMetadata()();
121 ASSERT_THAT(poll, IsReady());
122 auto value = std::move(poll.value());
123 ASSERT_TRUE(value.ok());
124 EXPECT_THAT(*value, IsTestMetadata());
125 }
126
TEST(RequestBufferTest,PushThenFinishThenPullClientInitialMetadata)127 TEST(RequestBufferTest, PushThenFinishThenPullClientInitialMetadata) {
128 RequestBuffer buffer;
129 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
130 buffer.FinishSends();
131 RequestBuffer::Reader reader(&buffer);
132 auto poll = reader.PullClientInitialMetadata()();
133 ASSERT_THAT(poll, IsReady());
134 auto value = std::move(poll.value());
135 ASSERT_TRUE(value.ok());
136 EXPECT_THAT(*value, IsTestMetadata());
137 }
138
TEST(RequestBufferTest,PullThenPushClientInitialMetadata)139 TEST(RequestBufferTest, PullThenPushClientInitialMetadata) {
140 StrictMock<MockActivity> activity;
141 RequestBuffer buffer;
142 RequestBuffer::Reader reader(&buffer);
143 activity.Activate();
144 auto poller = reader.PullClientInitialMetadata();
145 auto poll = poller();
146 EXPECT_THAT(poll, IsPending());
147 ClientMetadataHandle md = Arena::MakePooledForOverwrite<ClientMetadata>();
148 md->Append("key", Slice::FromStaticString("value"), CrashOnParseError);
149 EXPECT_WAKEUP(activity,
150 EXPECT_EQ(buffer.PushClientInitialMetadata(std::move(md)), 40));
151 poll = poller();
152 ASSERT_THAT(poll, IsReady());
153 auto value = std::move(poll.value());
154 ASSERT_TRUE(value.ok());
155 EXPECT_THAT(*value, IsTestMetadata());
156 }
157
TEST(RequestBufferTest,PushThenPullMessage)158 TEST(RequestBufferTest, PushThenPullMessage) {
159 RequestBuffer buffer;
160 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
161 auto pusher = buffer.PushMessage(TestMessage());
162 EXPECT_THAT(pusher(), IsReady(49));
163 RequestBuffer::Reader reader(&buffer);
164 auto pull_md = reader.PullClientInitialMetadata();
165 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
166 auto pull_msg = reader.PullMessage();
167 auto poll_msg = pull_msg();
168 ASSERT_THAT(poll_msg, IsReady());
169 ASSERT_TRUE(poll_msg.value().ok());
170 ASSERT_TRUE(poll_msg.value().value().has_value());
171 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
172 }
173
TEST(RequestBufferTest,PushThenPullMessageStreamBeforeInitialMetadata)174 TEST(RequestBufferTest, PushThenPullMessageStreamBeforeInitialMetadata) {
175 RequestBuffer buffer;
176 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
177 auto pusher = buffer.PushMessage(TestMessage());
178 EXPECT_THAT(pusher(), IsReady(49));
179 RequestBuffer::Reader reader(&buffer);
180 buffer.Commit(&reader);
181 auto pull_md = reader.PullClientInitialMetadata();
182 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
183 auto pull_msg = reader.PullMessage();
184 auto poll_msg = pull_msg();
185 ASSERT_THAT(poll_msg, IsReady());
186 ASSERT_TRUE(poll_msg.value().ok());
187 ASSERT_TRUE(poll_msg.value().value().has_value());
188 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
189 }
190
TEST(RequestBufferTest,PushThenPullMessageStreamBeforeFirstMessage)191 TEST(RequestBufferTest, PushThenPullMessageStreamBeforeFirstMessage) {
192 RequestBuffer buffer;
193 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
194 auto pusher = buffer.PushMessage(TestMessage());
195 EXPECT_THAT(pusher(), IsReady(49));
196 RequestBuffer::Reader reader(&buffer);
197 auto pull_md = reader.PullClientInitialMetadata();
198 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
199 buffer.Commit(&reader);
200 auto pull_msg = reader.PullMessage();
201 auto poll_msg = pull_msg();
202 ASSERT_THAT(poll_msg, IsReady());
203 ASSERT_TRUE(poll_msg.value().ok());
204 ASSERT_TRUE(poll_msg.value().value().has_value());
205 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
206 }
207
TEST(RequestBufferTest,PullThenPushMessage)208 TEST(RequestBufferTest, PullThenPushMessage) {
209 StrictMock<MockActivity> activity;
210 activity.Activate();
211 RequestBuffer buffer;
212 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
213 RequestBuffer::Reader reader(&buffer);
214 auto pull_md = reader.PullClientInitialMetadata();
215 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
216 auto pull_msg = reader.PullMessage();
217 auto poll_msg = pull_msg();
218 EXPECT_THAT(poll_msg, IsPending());
219 auto pusher = buffer.PushMessage(TestMessage());
220 EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(49)));
221 poll_msg = pull_msg();
222 ASSERT_THAT(poll_msg, IsReady());
223 ASSERT_TRUE(poll_msg.value().ok());
224 ASSERT_TRUE(poll_msg.value().value().has_value());
225 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
226 }
227
TEST(RequestBufferTest,PullThenPushMessageSwitchBeforePullMessage)228 TEST(RequestBufferTest, PullThenPushMessageSwitchBeforePullMessage) {
229 StrictMock<MockActivity> activity;
230 activity.Activate();
231 RequestBuffer buffer;
232 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
233 RequestBuffer::Reader reader(&buffer);
234 auto pull_md = reader.PullClientInitialMetadata();
235 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
236 buffer.Commit(&reader);
237 auto pull_msg = reader.PullMessage();
238 auto poll_msg = pull_msg();
239 EXPECT_THAT(poll_msg, IsPending());
240 auto pusher = buffer.PushMessage(TestMessage());
241 EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(0)));
242 poll_msg = pull_msg();
243 ASSERT_THAT(poll_msg, IsReady());
244 ASSERT_TRUE(poll_msg.value().ok());
245 ASSERT_TRUE(poll_msg.value().value().has_value());
246 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
247 }
248
TEST(RequestBufferTest,PullThenPushMessageSwitchBeforePushMessage)249 TEST(RequestBufferTest, PullThenPushMessageSwitchBeforePushMessage) {
250 StrictMock<MockActivity> activity;
251 activity.Activate();
252 RequestBuffer buffer;
253 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
254 RequestBuffer::Reader reader(&buffer);
255 auto pull_md = reader.PullClientInitialMetadata();
256 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
257 auto pull_msg = reader.PullMessage();
258 auto poll_msg = pull_msg();
259 EXPECT_THAT(poll_msg, IsPending());
260 buffer.Commit(&reader);
261 auto pusher = buffer.PushMessage(TestMessage());
262 EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(0)));
263 poll_msg = pull_msg();
264 ASSERT_THAT(poll_msg, IsReady());
265 ASSERT_TRUE(poll_msg.value().ok());
266 ASSERT_TRUE(poll_msg.value().value().has_value());
267 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
268 }
269
TEST(RequestBufferTest,PullThenPushMessageSwitchAfterPushMessage)270 TEST(RequestBufferTest, PullThenPushMessageSwitchAfterPushMessage) {
271 StrictMock<MockActivity> activity;
272 activity.Activate();
273 RequestBuffer buffer;
274 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
275 RequestBuffer::Reader reader(&buffer);
276 auto pull_md = reader.PullClientInitialMetadata();
277 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
278 auto pull_msg = reader.PullMessage();
279 auto poll_msg = pull_msg();
280 EXPECT_THAT(poll_msg, IsPending());
281 auto pusher = buffer.PushMessage(TestMessage());
282 EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(49)));
283 buffer.Commit(&reader);
284 poll_msg = pull_msg();
285 ASSERT_THAT(poll_msg, IsReady());
286 ASSERT_TRUE(poll_msg.value().ok());
287 ASSERT_TRUE(poll_msg.value().value().has_value());
288 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
289 }
290
TEST(RequestBufferTest,PullEndOfStream)291 TEST(RequestBufferTest, PullEndOfStream) {
292 RequestBuffer buffer;
293 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
294 auto pusher = buffer.PushMessage(TestMessage());
295 EXPECT_THAT(pusher(), IsReady(49));
296 RequestBuffer::Reader reader(&buffer);
297 auto pull_md = reader.PullClientInitialMetadata();
298 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
299 auto pull_msg = reader.PullMessage();
300 auto poll_msg = pull_msg();
301 ASSERT_THAT(poll_msg, IsReady());
302 ASSERT_TRUE(poll_msg.value().ok());
303 ASSERT_TRUE(poll_msg.value().value().has_value());
304 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
305 EXPECT_EQ(buffer.FinishSends(), Success{});
306 auto pull_msg2 = reader.PullMessage();
307 poll_msg = pull_msg2();
308 ASSERT_THAT(poll_msg, IsReady());
309 ASSERT_TRUE(poll_msg.value().ok());
310 ASSERT_FALSE(poll_msg.value().value().has_value());
311 }
312
TEST(RequestBufferTest,PullEndOfStreamSwitchBeforePullMessage)313 TEST(RequestBufferTest, PullEndOfStreamSwitchBeforePullMessage) {
314 RequestBuffer buffer;
315 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
316 auto pusher = buffer.PushMessage(TestMessage());
317 EXPECT_THAT(pusher(), IsReady(49));
318 RequestBuffer::Reader reader(&buffer);
319 auto pull_md = reader.PullClientInitialMetadata();
320 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
321 buffer.Commit(&reader);
322 auto pull_msg = reader.PullMessage();
323 auto poll_msg = pull_msg();
324 ASSERT_THAT(poll_msg, IsReady());
325 ASSERT_TRUE(poll_msg.value().ok());
326 ASSERT_TRUE(poll_msg.value().value().has_value());
327 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
328 EXPECT_EQ(buffer.FinishSends(), Success{});
329 auto pull_msg2 = reader.PullMessage();
330 poll_msg = pull_msg2();
331 ASSERT_THAT(poll_msg, IsReady());
332 ASSERT_TRUE(poll_msg.value().ok());
333 ASSERT_FALSE(poll_msg.value().value().has_value());
334 }
335
TEST(RequestBufferTest,PullEndOfStreamSwitchBeforePushMessage)336 TEST(RequestBufferTest, PullEndOfStreamSwitchBeforePushMessage) {
337 StrictMock<MockActivity> activity;
338 activity.Activate();
339 RequestBuffer buffer;
340 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
341 RequestBuffer::Reader reader(&buffer);
342 buffer.Commit(&reader);
343 auto pusher = buffer.PushMessage(TestMessage());
344 EXPECT_THAT(pusher(), IsPending());
345 auto pull_md = reader.PullClientInitialMetadata();
346 EXPECT_WAKEUP(activity,
347 EXPECT_THAT(pull_md(), IsReady())); // value tested elsewhere
348 EXPECT_THAT(pusher(), IsReady(0));
349 auto pull_msg = reader.PullMessage();
350 auto poll_msg = pull_msg();
351 ASSERT_THAT(poll_msg, IsReady());
352 ASSERT_TRUE(poll_msg.value().ok());
353 ASSERT_TRUE(poll_msg.value().value().has_value());
354 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
355 EXPECT_EQ(buffer.FinishSends(), Success{});
356 auto pull_msg2 = reader.PullMessage();
357 poll_msg = pull_msg2();
358 ASSERT_THAT(poll_msg, IsReady());
359 ASSERT_TRUE(poll_msg.value().ok());
360 ASSERT_FALSE(poll_msg.value().value().has_value());
361 }
362
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessage)363 TEST(RequestBufferTest, PullEndOfStreamQueuedWithMessage) {
364 RequestBuffer buffer;
365 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
366 auto pusher = buffer.PushMessage(TestMessage());
367 EXPECT_THAT(pusher(), IsReady(49));
368 EXPECT_EQ(buffer.FinishSends(), Success{});
369 RequestBuffer::Reader reader(&buffer);
370 auto pull_md = reader.PullClientInitialMetadata();
371 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
372 auto pull_msg = reader.PullMessage();
373 auto poll_msg = pull_msg();
374 ASSERT_THAT(poll_msg, IsReady());
375 ASSERT_TRUE(poll_msg.value().ok());
376 ASSERT_TRUE(poll_msg.value().value().has_value());
377 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
378 auto pull_msg2 = reader.PullMessage();
379 poll_msg = pull_msg2();
380 ASSERT_THAT(poll_msg, IsReady());
381 ASSERT_TRUE(poll_msg.value().ok());
382 ASSERT_FALSE(poll_msg.value().value().has_value());
383 }
384
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessageSwitchBeforePushMessage)385 TEST(RequestBufferTest,
386 PullEndOfStreamQueuedWithMessageSwitchBeforePushMessage) {
387 StrictMock<MockActivity> activity;
388 activity.Activate();
389 RequestBuffer buffer;
390 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
391 RequestBuffer::Reader reader(&buffer);
392 buffer.Commit(&reader);
393 auto pusher = buffer.PushMessage(TestMessage());
394 EXPECT_THAT(pusher(), IsPending());
395 auto pull_md = reader.PullClientInitialMetadata();
396 EXPECT_WAKEUP(activity,
397 EXPECT_THAT(pull_md(), IsReady())); // value tested elsewhere
398 EXPECT_THAT(pusher(), IsReady(0));
399 EXPECT_EQ(buffer.FinishSends(), Success{});
400 auto pull_msg = reader.PullMessage();
401 auto poll_msg = pull_msg();
402 ASSERT_THAT(poll_msg, IsReady());
403 ASSERT_TRUE(poll_msg.value().ok());
404 ASSERT_TRUE(poll_msg.value().value().has_value());
405 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
406 auto pull_msg2 = reader.PullMessage();
407 poll_msg = pull_msg2();
408 ASSERT_THAT(poll_msg, IsReady());
409 ASSERT_TRUE(poll_msg.value().ok());
410 ASSERT_FALSE(poll_msg.value().value().has_value());
411 }
412
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessageSwitchBeforePullMessage)413 TEST(RequestBufferTest,
414 PullEndOfStreamQueuedWithMessageSwitchBeforePullMessage) {
415 RequestBuffer buffer;
416 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
417 auto pusher = buffer.PushMessage(TestMessage());
418 EXPECT_THAT(pusher(), IsReady(49));
419 EXPECT_EQ(buffer.FinishSends(), Success{});
420 RequestBuffer::Reader reader(&buffer);
421 auto pull_md = reader.PullClientInitialMetadata();
422 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
423 buffer.Commit(&reader);
424 auto pull_msg = reader.PullMessage();
425 auto poll_msg = pull_msg();
426 ASSERT_THAT(poll_msg, IsReady());
427 ASSERT_TRUE(poll_msg.value().ok());
428 ASSERT_TRUE(poll_msg.value().value().has_value());
429 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
430 auto pull_msg2 = reader.PullMessage();
431 poll_msg = pull_msg2();
432 ASSERT_THAT(poll_msg, IsReady());
433 ASSERT_TRUE(poll_msg.value().ok());
434 ASSERT_FALSE(poll_msg.value().value().has_value());
435 }
436
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessageSwitchDuringPullMessage)437 TEST(RequestBufferTest,
438 PullEndOfStreamQueuedWithMessageSwitchDuringPullMessage) {
439 RequestBuffer buffer;
440 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
441 auto pusher = buffer.PushMessage(TestMessage());
442 EXPECT_THAT(pusher(), IsReady(49));
443 EXPECT_EQ(buffer.FinishSends(), Success{});
444 RequestBuffer::Reader reader(&buffer);
445 auto pull_md = reader.PullClientInitialMetadata();
446 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
447 auto pull_msg = reader.PullMessage();
448 buffer.Commit(&reader);
449 auto poll_msg = pull_msg();
450 ASSERT_THAT(poll_msg, IsReady());
451 ASSERT_TRUE(poll_msg.value().ok());
452 ASSERT_TRUE(poll_msg.value().value().has_value());
453 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
454 auto pull_msg2 = reader.PullMessage();
455 poll_msg = pull_msg2();
456 ASSERT_THAT(poll_msg, IsReady());
457 ASSERT_TRUE(poll_msg.value().ok());
458 ASSERT_FALSE(poll_msg.value().value().has_value());
459 }
460
TEST(RequestBufferTest,PushThenPullMessageRepeatedly)461 TEST(RequestBufferTest, PushThenPullMessageRepeatedly) {
462 RequestBuffer buffer;
463 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
464 RequestBuffer::Reader reader(&buffer);
465 auto pull_md = reader.PullClientInitialMetadata();
466 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
467 for (int i = 0; i < 10; i++) {
468 auto pusher = buffer.PushMessage(TestMessage(i));
469 EXPECT_THAT(pusher(), IsReady(40 + 9 * (i + 1)));
470 auto pull_msg = reader.PullMessage();
471 auto poll_msg = pull_msg();
472 ASSERT_THAT(poll_msg, IsReady());
473 ASSERT_TRUE(poll_msg.value().ok());
474 ASSERT_TRUE(poll_msg.value().value().has_value());
475 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage(i));
476 }
477 }
478
TEST(RequestBufferTest,PushSomeSwitchThenPushPullMessages)479 TEST(RequestBufferTest, PushSomeSwitchThenPushPullMessages) {
480 StrictMock<MockActivity> activity;
481 activity.Activate();
482 RequestBuffer buffer;
483 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
484 RequestBuffer::Reader reader(&buffer);
485 auto pull_md = reader.PullClientInitialMetadata();
486 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
487 for (int i = 0; i < 10; i++) {
488 auto pusher = buffer.PushMessage(TestMessage(i));
489 EXPECT_THAT(pusher(), IsReady(40 + 9 * (i + 1)));
490 }
491 buffer.Commit(&reader);
492 for (int i = 0; i < 10; i++) {
493 auto pull_msg = reader.PullMessage();
494 auto poll_msg = pull_msg();
495 ASSERT_THAT(poll_msg, IsReady());
496 ASSERT_TRUE(poll_msg.value().ok());
497 ASSERT_TRUE(poll_msg.value().value().has_value());
498 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage(i));
499 }
500 for (int i = 0; i < 10; i++) {
501 auto pusher = buffer.PushMessage(TestMessage(i));
502 EXPECT_THAT(pusher(), IsReady(0));
503 auto pull_msg = reader.PullMessage();
504 auto poll_msg = pull_msg();
505 ASSERT_THAT(poll_msg, IsReady());
506 ASSERT_TRUE(poll_msg.value().ok());
507 ASSERT_TRUE(poll_msg.value().value().has_value());
508 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage(i));
509 }
510 }
511
TEST(RequestBufferTest,HedgeReadMetadata)512 TEST(RequestBufferTest, HedgeReadMetadata) {
513 RequestBuffer buffer;
514 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
515 RequestBuffer::Reader reader1(&buffer);
516 RequestBuffer::Reader reader2(&buffer);
517 auto pull_md1 = reader1.PullClientInitialMetadata();
518 auto pull_md2 = reader2.PullClientInitialMetadata();
519 auto poll_md1 = pull_md1();
520 auto poll_md2 = pull_md2();
521 ASSERT_THAT(poll_md1, IsReady());
522 ASSERT_THAT(poll_md2, IsReady());
523 auto value1 = std::move(poll_md1.value());
524 auto value2 = std::move(poll_md2.value());
525 ASSERT_TRUE(value1.ok());
526 ASSERT_TRUE(value2.ok());
527 EXPECT_THAT(*value1, IsTestMetadata());
528 EXPECT_THAT(*value2, IsTestMetadata());
529 }
530
TEST(RequestBufferTest,HedgeReadMetadataSwitchBeforeFirstRead)531 TEST(RequestBufferTest, HedgeReadMetadataSwitchBeforeFirstRead) {
532 RequestBuffer buffer;
533 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
534 RequestBuffer::Reader reader1(&buffer);
535 buffer.Commit(&reader1);
536 RequestBuffer::Reader reader2(&buffer);
537 auto pull_md1 = reader1.PullClientInitialMetadata();
538 auto pull_md2 = reader2.PullClientInitialMetadata();
539 auto poll_md1 = pull_md1();
540 auto poll_md2 = pull_md2();
541 ASSERT_THAT(poll_md1, IsReady());
542 ASSERT_THAT(poll_md2, IsReady());
543 auto value1 = std::move(poll_md1.value());
544 auto value2 = std::move(poll_md2.value());
545 ASSERT_TRUE(value1.ok());
546 EXPECT_FALSE(value2.ok());
547 EXPECT_THAT(*value1, IsTestMetadata());
548 }
549
TEST(RequestBufferTest,HedgeReadMetadataLate)550 TEST(RequestBufferTest, HedgeReadMetadataLate) {
551 RequestBuffer buffer;
552 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
553 RequestBuffer::Reader reader1(&buffer);
554 auto pull_md1 = reader1.PullClientInitialMetadata();
555 auto poll_md1 = pull_md1();
556 ASSERT_THAT(poll_md1, IsReady());
557 auto value1 = std::move(poll_md1.value());
558 ASSERT_TRUE(value1.ok());
559 EXPECT_THAT(*value1, IsTestMetadata());
560 RequestBuffer::Reader reader2(&buffer);
561 auto pull_md2 = reader2.PullClientInitialMetadata();
562 auto poll_md2 = pull_md2();
563 ASSERT_THAT(poll_md2, IsReady());
564 auto value2 = std::move(poll_md2.value());
565 ASSERT_TRUE(value2.ok());
566 EXPECT_THAT(*value2, IsTestMetadata());
567 }
568
TEST(RequestBufferTest,HedgeReadMetadataLateSwitchAfterPullInitialMetadata)569 TEST(RequestBufferTest, HedgeReadMetadataLateSwitchAfterPullInitialMetadata) {
570 RequestBuffer buffer;
571 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
572 RequestBuffer::Reader reader1(&buffer);
573 auto pull_md1 = reader1.PullClientInitialMetadata();
574 auto poll_md1 = pull_md1();
575 ASSERT_THAT(poll_md1, IsReady());
576 auto value1 = std::move(poll_md1.value());
577 ASSERT_TRUE(value1.ok());
578 EXPECT_THAT(*value1, IsTestMetadata());
579 RequestBuffer::Reader reader2(&buffer);
580 buffer.Commit(&reader1);
581 auto pull_md2 = reader2.PullClientInitialMetadata();
582 auto poll_md2 = pull_md2();
583 ASSERT_THAT(poll_md2, IsReady());
584 auto value2 = std::move(poll_md2.value());
585 EXPECT_FALSE(value2.ok());
586 }
587
TEST(RequestBufferTest,StreamingPushBeforeLastMessagePulled)588 TEST(RequestBufferTest, StreamingPushBeforeLastMessagePulled) {
589 StrictMock<MockActivity> activity;
590 activity.Activate();
591 RequestBuffer buffer;
592 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
593 RequestBuffer::Reader reader(&buffer);
594 auto pull_md = reader.PullClientInitialMetadata();
595 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
596 buffer.Commit(&reader);
597 auto pusher1 = buffer.PushMessage(TestMessage(1));
598 EXPECT_THAT(pusher1(), IsReady(0));
599 auto pusher2 = buffer.PushMessage(TestMessage(2));
600 EXPECT_THAT(pusher2(), IsPending());
601 auto pull1 = reader.PullMessage();
602 EXPECT_WAKEUP(activity, auto poll1 = pull1());
603 ASSERT_THAT(poll1, IsReady());
604 ASSERT_TRUE(poll1.value().ok());
605 ASSERT_TRUE(poll1.value().value().has_value());
606 EXPECT_THAT(poll1.value().value().value(), IsTestMessage(1));
607 auto pull2 = reader.PullMessage();
608 auto poll2 = pull2();
609 EXPECT_THAT(poll2, IsPending());
610 EXPECT_WAKEUP(activity, EXPECT_THAT(pusher2(), IsReady(0)));
611 poll2 = pull2();
612 ASSERT_THAT(poll2, IsReady());
613 ASSERT_TRUE(poll2.value().ok());
614 ASSERT_TRUE(poll2.value().value().has_value());
615 EXPECT_THAT(poll2.value().value().value(), IsTestMessage(2));
616 }
617
TEST(RequestBufferTest,SwitchAfterEndOfStream)618 TEST(RequestBufferTest, SwitchAfterEndOfStream) {
619 RequestBuffer buffer;
620 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
621 RequestBuffer::Reader reader(&buffer);
622 auto pull_md = reader.PullClientInitialMetadata();
623 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
624 auto pusher = buffer.PushMessage(TestMessage());
625 EXPECT_THAT(pusher(), IsReady(49));
626 EXPECT_EQ(buffer.FinishSends(), Success{});
627 auto pull_msg = reader.PullMessage();
628 auto poll_msg = pull_msg();
629 ASSERT_THAT(poll_msg, IsReady());
630 ASSERT_TRUE(poll_msg.value().ok());
631 ASSERT_TRUE(poll_msg.value().value().has_value());
632 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
633 buffer.Commit(&reader);
634 auto pull_msg2 = reader.PullMessage();
635 poll_msg = pull_msg2();
636 ASSERT_THAT(poll_msg, IsReady());
637 ASSERT_TRUE(poll_msg.value().ok());
638 EXPECT_FALSE(poll_msg.value().value().has_value());
639 }
640
TEST(RequestBufferTest,NothingAfterEndOfStream)641 TEST(RequestBufferTest, NothingAfterEndOfStream) {
642 RequestBuffer buffer;
643 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
644 RequestBuffer::Reader reader(&buffer);
645 auto pull_md = reader.PullClientInitialMetadata();
646 EXPECT_THAT(pull_md(), IsReady()); // value tested elsewhere
647 auto pusher = buffer.PushMessage(TestMessage());
648 EXPECT_THAT(pusher(), IsReady(49));
649 EXPECT_EQ(buffer.FinishSends(), Success{});
650 auto pull_msg = reader.PullMessage();
651 auto poll_msg = pull_msg();
652 ASSERT_THAT(poll_msg, IsReady());
653 ASSERT_TRUE(poll_msg.value().ok());
654 ASSERT_TRUE(poll_msg.value().value().has_value());
655 EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
656 auto pull_msg2 = reader.PullMessage();
657 poll_msg = pull_msg2();
658 ASSERT_THAT(poll_msg, IsReady());
659 ASSERT_TRUE(poll_msg.value().ok());
660 EXPECT_FALSE(poll_msg.value().value().has_value());
661 }
662
TEST(RequestBufferTest,CancelBeforeInitialMetadataPush)663 TEST(RequestBufferTest, CancelBeforeInitialMetadataPush) {
664 RequestBuffer buffer;
665 buffer.Cancel();
666 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), Failure{});
667 RequestBuffer::Reader reader(&buffer);
668 auto pull_md = reader.PullClientInitialMetadata();
669 auto poll_md = pull_md();
670 ASSERT_THAT(poll_md, IsReady());
671 ASSERT_FALSE(poll_md.value().ok());
672 }
673
TEST(RequestBufferTest,CancelBeforeInitialMetadataPull)674 TEST(RequestBufferTest, CancelBeforeInitialMetadataPull) {
675 RequestBuffer buffer;
676 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
677 buffer.Cancel();
678 RequestBuffer::Reader reader(&buffer);
679 auto pull_md = reader.PullClientInitialMetadata();
680 auto poll_md = pull_md();
681 ASSERT_THAT(poll_md, IsReady());
682 ASSERT_FALSE(poll_md.value().ok());
683 }
684
TEST(RequestBufferTest,CancelBeforeMessagePush)685 TEST(RequestBufferTest, CancelBeforeMessagePush) {
686 RequestBuffer buffer;
687 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
688 buffer.Cancel();
689 auto pusher = buffer.PushMessage(TestMessage());
690 auto poll = pusher();
691 ASSERT_THAT(poll, IsReady());
692 ASSERT_FALSE(poll.value().ok());
693 RequestBuffer::Reader reader(&buffer);
694 auto pull_md = reader.PullClientInitialMetadata();
695 auto poll_md = pull_md();
696 ASSERT_THAT(poll_md, IsReady());
697 ASSERT_FALSE(poll_md.value().ok());
698 }
699
TEST(RequestBufferTest,CancelBeforeMessagePushButAfterInitialMetadataPull)700 TEST(RequestBufferTest, CancelBeforeMessagePushButAfterInitialMetadataPull) {
701 RequestBuffer buffer;
702 EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
703 RequestBuffer::Reader reader(&buffer);
704 auto pull_md = reader.PullClientInitialMetadata();
705 auto poll_md = pull_md();
706 ASSERT_THAT(poll_md, IsReady());
707 ASSERT_TRUE(poll_md.value().ok());
708 EXPECT_THAT(*poll_md.value(), IsTestMetadata());
709 buffer.Cancel();
710 auto pusher = buffer.PushMessage(TestMessage());
711 auto poll = pusher();
712 ASSERT_THAT(poll, IsReady());
713 ASSERT_FALSE(poll.value().ok());
714 }
715
716 } // namespace grpc_core
717
main(int argc,char ** argv)718 int main(int argc, char** argv) {
719 ::testing::InitGoogleTest(&argc, argv);
720 return RUN_ALL_TESTS();
721 }
722