• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/call/request_buffer.h"
16 
17 #include "gtest/gtest.h"
18 #include "test/core/promise/poll_matcher.h"
19 
20 using testing::Mock;
21 using testing::StrictMock;
22 
23 namespace grpc_core {
24 
25 namespace {
CrashOnParseError(absl::string_view error,const Slice & data)26 void CrashOnParseError(absl::string_view error, const Slice& data) {
27   LOG(FATAL) << "Failed to parse " << error << " from "
28              << data.as_string_view();
29 }
30 
31 // A mock activity that can be activated and deactivated.
32 class MockActivity : public Activity, public Wakeable {
33  public:
34   MOCK_METHOD(void, WakeupRequested, ());
35 
ForceImmediateRepoll(WakeupMask)36   void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
Orphan()37   void Orphan() override {}
MakeOwningWaker()38   Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()39   Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)40   void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
WakeupAsync(WakeupMask)41   void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
Drop(WakeupMask)42   void Drop(WakeupMask /*mask*/) override {}
DebugTag() const43   std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const44   std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
45     return DebugTag();
46   }
47 
Activate()48   void Activate() {
49     if (scoped_activity_ == nullptr) {
50       scoped_activity_ = std::make_unique<ScopedActivity>(this);
51     }
52   }
53 
Deactivate()54   void Deactivate() { scoped_activity_.reset(); }
55 
56  private:
57   std::unique_ptr<ScopedActivity> scoped_activity_;
58 };
59 
60 #define EXPECT_WAKEUP(activity, statement)                                 \
61   EXPECT_CALL((activity), WakeupRequested()).Times(::testing::AtLeast(1)); \
62   statement;                                                               \
63   Mock::VerifyAndClearExpectations(&(activity));
64 
TestMetadata()65 ClientMetadataHandle TestMetadata() {
66   ClientMetadataHandle md = Arena::MakePooledForOverwrite<ClientMetadata>();
67   md->Append("key", Slice::FromStaticString("value"), CrashOnParseError);
68   return md;
69 }
70 
TestMessage(int index=0)71 MessageHandle TestMessage(int index = 0) {
72   return Arena::MakePooled<Message>(
73       SliceBuffer(Slice::FromCopiedString(absl::StrCat("message ", index))), 0);
74 }
75 
76 MATCHER(IsTestMetadata, "") {
77   if (arg == nullptr) return false;
78   std::string backing;
79   if (arg->GetStringValue("key", &backing) != "value") {
80     *result_listener << arg->DebugString();
81     return false;
82   }
83   return true;
84 }
85 
86 MATCHER(IsTestMessage, "") {
87   if (arg == nullptr) return false;
88   if (arg->flags() != 0) {
89     *result_listener << "flags: " << arg->flags();
90     return false;
91   }
92   if (arg->payload()->JoinIntoString() != "message 0") {
93     *result_listener << "payload: " << arg->payload()->JoinIntoString();
94     return false;
95   }
96   return true;
97 }
98 
99 MATCHER_P(IsTestMessage, index, "") {
100   if (arg == nullptr) return false;
101   if (arg->flags() != 0) {
102     *result_listener << "flags: " << arg->flags();
103     return false;
104   }
105   if (arg->payload()->JoinIntoString() != absl::StrCat("message ", index)) {
106     *result_listener << "payload: " << arg->payload()->JoinIntoString();
107     return false;
108   }
109   return true;
110 }
111 
112 }  // namespace
113 
TEST(RequestBufferTest,NoOp)114 TEST(RequestBufferTest, NoOp) { RequestBuffer buffer; }
115 
TEST(RequestBufferTest,PushThenPullClientInitialMetadata)116 TEST(RequestBufferTest, PushThenPullClientInitialMetadata) {
117   RequestBuffer buffer;
118   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
119   RequestBuffer::Reader reader(&buffer);
120   auto poll = reader.PullClientInitialMetadata()();
121   ASSERT_THAT(poll, IsReady());
122   auto value = std::move(poll.value());
123   ASSERT_TRUE(value.ok());
124   EXPECT_THAT(*value, IsTestMetadata());
125 }
126 
TEST(RequestBufferTest,PushThenFinishThenPullClientInitialMetadata)127 TEST(RequestBufferTest, PushThenFinishThenPullClientInitialMetadata) {
128   RequestBuffer buffer;
129   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
130   buffer.FinishSends();
131   RequestBuffer::Reader reader(&buffer);
132   auto poll = reader.PullClientInitialMetadata()();
133   ASSERT_THAT(poll, IsReady());
134   auto value = std::move(poll.value());
135   ASSERT_TRUE(value.ok());
136   EXPECT_THAT(*value, IsTestMetadata());
137 }
138 
TEST(RequestBufferTest,PullThenPushClientInitialMetadata)139 TEST(RequestBufferTest, PullThenPushClientInitialMetadata) {
140   StrictMock<MockActivity> activity;
141   RequestBuffer buffer;
142   RequestBuffer::Reader reader(&buffer);
143   activity.Activate();
144   auto poller = reader.PullClientInitialMetadata();
145   auto poll = poller();
146   EXPECT_THAT(poll, IsPending());
147   ClientMetadataHandle md = Arena::MakePooledForOverwrite<ClientMetadata>();
148   md->Append("key", Slice::FromStaticString("value"), CrashOnParseError);
149   EXPECT_WAKEUP(activity,
150                 EXPECT_EQ(buffer.PushClientInitialMetadata(std::move(md)), 40));
151   poll = poller();
152   ASSERT_THAT(poll, IsReady());
153   auto value = std::move(poll.value());
154   ASSERT_TRUE(value.ok());
155   EXPECT_THAT(*value, IsTestMetadata());
156 }
157 
TEST(RequestBufferTest,PushThenPullMessage)158 TEST(RequestBufferTest, PushThenPullMessage) {
159   RequestBuffer buffer;
160   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
161   auto pusher = buffer.PushMessage(TestMessage());
162   EXPECT_THAT(pusher(), IsReady(49));
163   RequestBuffer::Reader reader(&buffer);
164   auto pull_md = reader.PullClientInitialMetadata();
165   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
166   auto pull_msg = reader.PullMessage();
167   auto poll_msg = pull_msg();
168   ASSERT_THAT(poll_msg, IsReady());
169   ASSERT_TRUE(poll_msg.value().ok());
170   ASSERT_TRUE(poll_msg.value().value().has_value());
171   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
172 }
173 
TEST(RequestBufferTest,PushThenPullMessageStreamBeforeInitialMetadata)174 TEST(RequestBufferTest, PushThenPullMessageStreamBeforeInitialMetadata) {
175   RequestBuffer buffer;
176   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
177   auto pusher = buffer.PushMessage(TestMessage());
178   EXPECT_THAT(pusher(), IsReady(49));
179   RequestBuffer::Reader reader(&buffer);
180   buffer.Commit(&reader);
181   auto pull_md = reader.PullClientInitialMetadata();
182   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
183   auto pull_msg = reader.PullMessage();
184   auto poll_msg = pull_msg();
185   ASSERT_THAT(poll_msg, IsReady());
186   ASSERT_TRUE(poll_msg.value().ok());
187   ASSERT_TRUE(poll_msg.value().value().has_value());
188   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
189 }
190 
TEST(RequestBufferTest,PushThenPullMessageStreamBeforeFirstMessage)191 TEST(RequestBufferTest, PushThenPullMessageStreamBeforeFirstMessage) {
192   RequestBuffer buffer;
193   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
194   auto pusher = buffer.PushMessage(TestMessage());
195   EXPECT_THAT(pusher(), IsReady(49));
196   RequestBuffer::Reader reader(&buffer);
197   auto pull_md = reader.PullClientInitialMetadata();
198   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
199   buffer.Commit(&reader);
200   auto pull_msg = reader.PullMessage();
201   auto poll_msg = pull_msg();
202   ASSERT_THAT(poll_msg, IsReady());
203   ASSERT_TRUE(poll_msg.value().ok());
204   ASSERT_TRUE(poll_msg.value().value().has_value());
205   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
206 }
207 
TEST(RequestBufferTest,PullThenPushMessage)208 TEST(RequestBufferTest, PullThenPushMessage) {
209   StrictMock<MockActivity> activity;
210   activity.Activate();
211   RequestBuffer buffer;
212   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
213   RequestBuffer::Reader reader(&buffer);
214   auto pull_md = reader.PullClientInitialMetadata();
215   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
216   auto pull_msg = reader.PullMessage();
217   auto poll_msg = pull_msg();
218   EXPECT_THAT(poll_msg, IsPending());
219   auto pusher = buffer.PushMessage(TestMessage());
220   EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(49)));
221   poll_msg = pull_msg();
222   ASSERT_THAT(poll_msg, IsReady());
223   ASSERT_TRUE(poll_msg.value().ok());
224   ASSERT_TRUE(poll_msg.value().value().has_value());
225   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
226 }
227 
TEST(RequestBufferTest,PullThenPushMessageSwitchBeforePullMessage)228 TEST(RequestBufferTest, PullThenPushMessageSwitchBeforePullMessage) {
229   StrictMock<MockActivity> activity;
230   activity.Activate();
231   RequestBuffer buffer;
232   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
233   RequestBuffer::Reader reader(&buffer);
234   auto pull_md = reader.PullClientInitialMetadata();
235   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
236   buffer.Commit(&reader);
237   auto pull_msg = reader.PullMessage();
238   auto poll_msg = pull_msg();
239   EXPECT_THAT(poll_msg, IsPending());
240   auto pusher = buffer.PushMessage(TestMessage());
241   EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(0)));
242   poll_msg = pull_msg();
243   ASSERT_THAT(poll_msg, IsReady());
244   ASSERT_TRUE(poll_msg.value().ok());
245   ASSERT_TRUE(poll_msg.value().value().has_value());
246   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
247 }
248 
TEST(RequestBufferTest,PullThenPushMessageSwitchBeforePushMessage)249 TEST(RequestBufferTest, PullThenPushMessageSwitchBeforePushMessage) {
250   StrictMock<MockActivity> activity;
251   activity.Activate();
252   RequestBuffer buffer;
253   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
254   RequestBuffer::Reader reader(&buffer);
255   auto pull_md = reader.PullClientInitialMetadata();
256   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
257   auto pull_msg = reader.PullMessage();
258   auto poll_msg = pull_msg();
259   EXPECT_THAT(poll_msg, IsPending());
260   buffer.Commit(&reader);
261   auto pusher = buffer.PushMessage(TestMessage());
262   EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(0)));
263   poll_msg = pull_msg();
264   ASSERT_THAT(poll_msg, IsReady());
265   ASSERT_TRUE(poll_msg.value().ok());
266   ASSERT_TRUE(poll_msg.value().value().has_value());
267   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
268 }
269 
TEST(RequestBufferTest,PullThenPushMessageSwitchAfterPushMessage)270 TEST(RequestBufferTest, PullThenPushMessageSwitchAfterPushMessage) {
271   StrictMock<MockActivity> activity;
272   activity.Activate();
273   RequestBuffer buffer;
274   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
275   RequestBuffer::Reader reader(&buffer);
276   auto pull_md = reader.PullClientInitialMetadata();
277   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
278   auto pull_msg = reader.PullMessage();
279   auto poll_msg = pull_msg();
280   EXPECT_THAT(poll_msg, IsPending());
281   auto pusher = buffer.PushMessage(TestMessage());
282   EXPECT_WAKEUP(activity, EXPECT_THAT(pusher(), IsReady(49)));
283   buffer.Commit(&reader);
284   poll_msg = pull_msg();
285   ASSERT_THAT(poll_msg, IsReady());
286   ASSERT_TRUE(poll_msg.value().ok());
287   ASSERT_TRUE(poll_msg.value().value().has_value());
288   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
289 }
290 
TEST(RequestBufferTest,PullEndOfStream)291 TEST(RequestBufferTest, PullEndOfStream) {
292   RequestBuffer buffer;
293   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
294   auto pusher = buffer.PushMessage(TestMessage());
295   EXPECT_THAT(pusher(), IsReady(49));
296   RequestBuffer::Reader reader(&buffer);
297   auto pull_md = reader.PullClientInitialMetadata();
298   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
299   auto pull_msg = reader.PullMessage();
300   auto poll_msg = pull_msg();
301   ASSERT_THAT(poll_msg, IsReady());
302   ASSERT_TRUE(poll_msg.value().ok());
303   ASSERT_TRUE(poll_msg.value().value().has_value());
304   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
305   EXPECT_EQ(buffer.FinishSends(), Success{});
306   auto pull_msg2 = reader.PullMessage();
307   poll_msg = pull_msg2();
308   ASSERT_THAT(poll_msg, IsReady());
309   ASSERT_TRUE(poll_msg.value().ok());
310   ASSERT_FALSE(poll_msg.value().value().has_value());
311 }
312 
TEST(RequestBufferTest,PullEndOfStreamSwitchBeforePullMessage)313 TEST(RequestBufferTest, PullEndOfStreamSwitchBeforePullMessage) {
314   RequestBuffer buffer;
315   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
316   auto pusher = buffer.PushMessage(TestMessage());
317   EXPECT_THAT(pusher(), IsReady(49));
318   RequestBuffer::Reader reader(&buffer);
319   auto pull_md = reader.PullClientInitialMetadata();
320   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
321   buffer.Commit(&reader);
322   auto pull_msg = reader.PullMessage();
323   auto poll_msg = pull_msg();
324   ASSERT_THAT(poll_msg, IsReady());
325   ASSERT_TRUE(poll_msg.value().ok());
326   ASSERT_TRUE(poll_msg.value().value().has_value());
327   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
328   EXPECT_EQ(buffer.FinishSends(), Success{});
329   auto pull_msg2 = reader.PullMessage();
330   poll_msg = pull_msg2();
331   ASSERT_THAT(poll_msg, IsReady());
332   ASSERT_TRUE(poll_msg.value().ok());
333   ASSERT_FALSE(poll_msg.value().value().has_value());
334 }
335 
TEST(RequestBufferTest,PullEndOfStreamSwitchBeforePushMessage)336 TEST(RequestBufferTest, PullEndOfStreamSwitchBeforePushMessage) {
337   StrictMock<MockActivity> activity;
338   activity.Activate();
339   RequestBuffer buffer;
340   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
341   RequestBuffer::Reader reader(&buffer);
342   buffer.Commit(&reader);
343   auto pusher = buffer.PushMessage(TestMessage());
344   EXPECT_THAT(pusher(), IsPending());
345   auto pull_md = reader.PullClientInitialMetadata();
346   EXPECT_WAKEUP(activity,
347                 EXPECT_THAT(pull_md(), IsReady()));  // value tested elsewhere
348   EXPECT_THAT(pusher(), IsReady(0));
349   auto pull_msg = reader.PullMessage();
350   auto poll_msg = pull_msg();
351   ASSERT_THAT(poll_msg, IsReady());
352   ASSERT_TRUE(poll_msg.value().ok());
353   ASSERT_TRUE(poll_msg.value().value().has_value());
354   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
355   EXPECT_EQ(buffer.FinishSends(), Success{});
356   auto pull_msg2 = reader.PullMessage();
357   poll_msg = pull_msg2();
358   ASSERT_THAT(poll_msg, IsReady());
359   ASSERT_TRUE(poll_msg.value().ok());
360   ASSERT_FALSE(poll_msg.value().value().has_value());
361 }
362 
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessage)363 TEST(RequestBufferTest, PullEndOfStreamQueuedWithMessage) {
364   RequestBuffer buffer;
365   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
366   auto pusher = buffer.PushMessage(TestMessage());
367   EXPECT_THAT(pusher(), IsReady(49));
368   EXPECT_EQ(buffer.FinishSends(), Success{});
369   RequestBuffer::Reader reader(&buffer);
370   auto pull_md = reader.PullClientInitialMetadata();
371   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
372   auto pull_msg = reader.PullMessage();
373   auto poll_msg = pull_msg();
374   ASSERT_THAT(poll_msg, IsReady());
375   ASSERT_TRUE(poll_msg.value().ok());
376   ASSERT_TRUE(poll_msg.value().value().has_value());
377   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
378   auto pull_msg2 = reader.PullMessage();
379   poll_msg = pull_msg2();
380   ASSERT_THAT(poll_msg, IsReady());
381   ASSERT_TRUE(poll_msg.value().ok());
382   ASSERT_FALSE(poll_msg.value().value().has_value());
383 }
384 
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessageSwitchBeforePushMessage)385 TEST(RequestBufferTest,
386      PullEndOfStreamQueuedWithMessageSwitchBeforePushMessage) {
387   StrictMock<MockActivity> activity;
388   activity.Activate();
389   RequestBuffer buffer;
390   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
391   RequestBuffer::Reader reader(&buffer);
392   buffer.Commit(&reader);
393   auto pusher = buffer.PushMessage(TestMessage());
394   EXPECT_THAT(pusher(), IsPending());
395   auto pull_md = reader.PullClientInitialMetadata();
396   EXPECT_WAKEUP(activity,
397                 EXPECT_THAT(pull_md(), IsReady()));  // value tested elsewhere
398   EXPECT_THAT(pusher(), IsReady(0));
399   EXPECT_EQ(buffer.FinishSends(), Success{});
400   auto pull_msg = reader.PullMessage();
401   auto poll_msg = pull_msg();
402   ASSERT_THAT(poll_msg, IsReady());
403   ASSERT_TRUE(poll_msg.value().ok());
404   ASSERT_TRUE(poll_msg.value().value().has_value());
405   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
406   auto pull_msg2 = reader.PullMessage();
407   poll_msg = pull_msg2();
408   ASSERT_THAT(poll_msg, IsReady());
409   ASSERT_TRUE(poll_msg.value().ok());
410   ASSERT_FALSE(poll_msg.value().value().has_value());
411 }
412 
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessageSwitchBeforePullMessage)413 TEST(RequestBufferTest,
414      PullEndOfStreamQueuedWithMessageSwitchBeforePullMessage) {
415   RequestBuffer buffer;
416   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
417   auto pusher = buffer.PushMessage(TestMessage());
418   EXPECT_THAT(pusher(), IsReady(49));
419   EXPECT_EQ(buffer.FinishSends(), Success{});
420   RequestBuffer::Reader reader(&buffer);
421   auto pull_md = reader.PullClientInitialMetadata();
422   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
423   buffer.Commit(&reader);
424   auto pull_msg = reader.PullMessage();
425   auto poll_msg = pull_msg();
426   ASSERT_THAT(poll_msg, IsReady());
427   ASSERT_TRUE(poll_msg.value().ok());
428   ASSERT_TRUE(poll_msg.value().value().has_value());
429   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
430   auto pull_msg2 = reader.PullMessage();
431   poll_msg = pull_msg2();
432   ASSERT_THAT(poll_msg, IsReady());
433   ASSERT_TRUE(poll_msg.value().ok());
434   ASSERT_FALSE(poll_msg.value().value().has_value());
435 }
436 
TEST(RequestBufferTest,PullEndOfStreamQueuedWithMessageSwitchDuringPullMessage)437 TEST(RequestBufferTest,
438      PullEndOfStreamQueuedWithMessageSwitchDuringPullMessage) {
439   RequestBuffer buffer;
440   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
441   auto pusher = buffer.PushMessage(TestMessage());
442   EXPECT_THAT(pusher(), IsReady(49));
443   EXPECT_EQ(buffer.FinishSends(), Success{});
444   RequestBuffer::Reader reader(&buffer);
445   auto pull_md = reader.PullClientInitialMetadata();
446   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
447   auto pull_msg = reader.PullMessage();
448   buffer.Commit(&reader);
449   auto poll_msg = pull_msg();
450   ASSERT_THAT(poll_msg, IsReady());
451   ASSERT_TRUE(poll_msg.value().ok());
452   ASSERT_TRUE(poll_msg.value().value().has_value());
453   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
454   auto pull_msg2 = reader.PullMessage();
455   poll_msg = pull_msg2();
456   ASSERT_THAT(poll_msg, IsReady());
457   ASSERT_TRUE(poll_msg.value().ok());
458   ASSERT_FALSE(poll_msg.value().value().has_value());
459 }
460 
TEST(RequestBufferTest,PushThenPullMessageRepeatedly)461 TEST(RequestBufferTest, PushThenPullMessageRepeatedly) {
462   RequestBuffer buffer;
463   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
464   RequestBuffer::Reader reader(&buffer);
465   auto pull_md = reader.PullClientInitialMetadata();
466   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
467   for (int i = 0; i < 10; i++) {
468     auto pusher = buffer.PushMessage(TestMessage(i));
469     EXPECT_THAT(pusher(), IsReady(40 + 9 * (i + 1)));
470     auto pull_msg = reader.PullMessage();
471     auto poll_msg = pull_msg();
472     ASSERT_THAT(poll_msg, IsReady());
473     ASSERT_TRUE(poll_msg.value().ok());
474     ASSERT_TRUE(poll_msg.value().value().has_value());
475     EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage(i));
476   }
477 }
478 
TEST(RequestBufferTest,PushSomeSwitchThenPushPullMessages)479 TEST(RequestBufferTest, PushSomeSwitchThenPushPullMessages) {
480   StrictMock<MockActivity> activity;
481   activity.Activate();
482   RequestBuffer buffer;
483   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
484   RequestBuffer::Reader reader(&buffer);
485   auto pull_md = reader.PullClientInitialMetadata();
486   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
487   for (int i = 0; i < 10; i++) {
488     auto pusher = buffer.PushMessage(TestMessage(i));
489     EXPECT_THAT(pusher(), IsReady(40 + 9 * (i + 1)));
490   }
491   buffer.Commit(&reader);
492   for (int i = 0; i < 10; i++) {
493     auto pull_msg = reader.PullMessage();
494     auto poll_msg = pull_msg();
495     ASSERT_THAT(poll_msg, IsReady());
496     ASSERT_TRUE(poll_msg.value().ok());
497     ASSERT_TRUE(poll_msg.value().value().has_value());
498     EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage(i));
499   }
500   for (int i = 0; i < 10; i++) {
501     auto pusher = buffer.PushMessage(TestMessage(i));
502     EXPECT_THAT(pusher(), IsReady(0));
503     auto pull_msg = reader.PullMessage();
504     auto poll_msg = pull_msg();
505     ASSERT_THAT(poll_msg, IsReady());
506     ASSERT_TRUE(poll_msg.value().ok());
507     ASSERT_TRUE(poll_msg.value().value().has_value());
508     EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage(i));
509   }
510 }
511 
TEST(RequestBufferTest,HedgeReadMetadata)512 TEST(RequestBufferTest, HedgeReadMetadata) {
513   RequestBuffer buffer;
514   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
515   RequestBuffer::Reader reader1(&buffer);
516   RequestBuffer::Reader reader2(&buffer);
517   auto pull_md1 = reader1.PullClientInitialMetadata();
518   auto pull_md2 = reader2.PullClientInitialMetadata();
519   auto poll_md1 = pull_md1();
520   auto poll_md2 = pull_md2();
521   ASSERT_THAT(poll_md1, IsReady());
522   ASSERT_THAT(poll_md2, IsReady());
523   auto value1 = std::move(poll_md1.value());
524   auto value2 = std::move(poll_md2.value());
525   ASSERT_TRUE(value1.ok());
526   ASSERT_TRUE(value2.ok());
527   EXPECT_THAT(*value1, IsTestMetadata());
528   EXPECT_THAT(*value2, IsTestMetadata());
529 }
530 
TEST(RequestBufferTest,HedgeReadMetadataSwitchBeforeFirstRead)531 TEST(RequestBufferTest, HedgeReadMetadataSwitchBeforeFirstRead) {
532   RequestBuffer buffer;
533   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
534   RequestBuffer::Reader reader1(&buffer);
535   buffer.Commit(&reader1);
536   RequestBuffer::Reader reader2(&buffer);
537   auto pull_md1 = reader1.PullClientInitialMetadata();
538   auto pull_md2 = reader2.PullClientInitialMetadata();
539   auto poll_md1 = pull_md1();
540   auto poll_md2 = pull_md2();
541   ASSERT_THAT(poll_md1, IsReady());
542   ASSERT_THAT(poll_md2, IsReady());
543   auto value1 = std::move(poll_md1.value());
544   auto value2 = std::move(poll_md2.value());
545   ASSERT_TRUE(value1.ok());
546   EXPECT_FALSE(value2.ok());
547   EXPECT_THAT(*value1, IsTestMetadata());
548 }
549 
TEST(RequestBufferTest,HedgeReadMetadataLate)550 TEST(RequestBufferTest, HedgeReadMetadataLate) {
551   RequestBuffer buffer;
552   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
553   RequestBuffer::Reader reader1(&buffer);
554   auto pull_md1 = reader1.PullClientInitialMetadata();
555   auto poll_md1 = pull_md1();
556   ASSERT_THAT(poll_md1, IsReady());
557   auto value1 = std::move(poll_md1.value());
558   ASSERT_TRUE(value1.ok());
559   EXPECT_THAT(*value1, IsTestMetadata());
560   RequestBuffer::Reader reader2(&buffer);
561   auto pull_md2 = reader2.PullClientInitialMetadata();
562   auto poll_md2 = pull_md2();
563   ASSERT_THAT(poll_md2, IsReady());
564   auto value2 = std::move(poll_md2.value());
565   ASSERT_TRUE(value2.ok());
566   EXPECT_THAT(*value2, IsTestMetadata());
567 }
568 
TEST(RequestBufferTest,HedgeReadMetadataLateSwitchAfterPullInitialMetadata)569 TEST(RequestBufferTest, HedgeReadMetadataLateSwitchAfterPullInitialMetadata) {
570   RequestBuffer buffer;
571   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
572   RequestBuffer::Reader reader1(&buffer);
573   auto pull_md1 = reader1.PullClientInitialMetadata();
574   auto poll_md1 = pull_md1();
575   ASSERT_THAT(poll_md1, IsReady());
576   auto value1 = std::move(poll_md1.value());
577   ASSERT_TRUE(value1.ok());
578   EXPECT_THAT(*value1, IsTestMetadata());
579   RequestBuffer::Reader reader2(&buffer);
580   buffer.Commit(&reader1);
581   auto pull_md2 = reader2.PullClientInitialMetadata();
582   auto poll_md2 = pull_md2();
583   ASSERT_THAT(poll_md2, IsReady());
584   auto value2 = std::move(poll_md2.value());
585   EXPECT_FALSE(value2.ok());
586 }
587 
TEST(RequestBufferTest,StreamingPushBeforeLastMessagePulled)588 TEST(RequestBufferTest, StreamingPushBeforeLastMessagePulled) {
589   StrictMock<MockActivity> activity;
590   activity.Activate();
591   RequestBuffer buffer;
592   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
593   RequestBuffer::Reader reader(&buffer);
594   auto pull_md = reader.PullClientInitialMetadata();
595   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
596   buffer.Commit(&reader);
597   auto pusher1 = buffer.PushMessage(TestMessage(1));
598   EXPECT_THAT(pusher1(), IsReady(0));
599   auto pusher2 = buffer.PushMessage(TestMessage(2));
600   EXPECT_THAT(pusher2(), IsPending());
601   auto pull1 = reader.PullMessage();
602   EXPECT_WAKEUP(activity, auto poll1 = pull1());
603   ASSERT_THAT(poll1, IsReady());
604   ASSERT_TRUE(poll1.value().ok());
605   ASSERT_TRUE(poll1.value().value().has_value());
606   EXPECT_THAT(poll1.value().value().value(), IsTestMessage(1));
607   auto pull2 = reader.PullMessage();
608   auto poll2 = pull2();
609   EXPECT_THAT(poll2, IsPending());
610   EXPECT_WAKEUP(activity, EXPECT_THAT(pusher2(), IsReady(0)));
611   poll2 = pull2();
612   ASSERT_THAT(poll2, IsReady());
613   ASSERT_TRUE(poll2.value().ok());
614   ASSERT_TRUE(poll2.value().value().has_value());
615   EXPECT_THAT(poll2.value().value().value(), IsTestMessage(2));
616 }
617 
TEST(RequestBufferTest,SwitchAfterEndOfStream)618 TEST(RequestBufferTest, SwitchAfterEndOfStream) {
619   RequestBuffer buffer;
620   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
621   RequestBuffer::Reader reader(&buffer);
622   auto pull_md = reader.PullClientInitialMetadata();
623   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
624   auto pusher = buffer.PushMessage(TestMessage());
625   EXPECT_THAT(pusher(), IsReady(49));
626   EXPECT_EQ(buffer.FinishSends(), Success{});
627   auto pull_msg = reader.PullMessage();
628   auto poll_msg = pull_msg();
629   ASSERT_THAT(poll_msg, IsReady());
630   ASSERT_TRUE(poll_msg.value().ok());
631   ASSERT_TRUE(poll_msg.value().value().has_value());
632   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
633   buffer.Commit(&reader);
634   auto pull_msg2 = reader.PullMessage();
635   poll_msg = pull_msg2();
636   ASSERT_THAT(poll_msg, IsReady());
637   ASSERT_TRUE(poll_msg.value().ok());
638   EXPECT_FALSE(poll_msg.value().value().has_value());
639 }
640 
TEST(RequestBufferTest,NothingAfterEndOfStream)641 TEST(RequestBufferTest, NothingAfterEndOfStream) {
642   RequestBuffer buffer;
643   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
644   RequestBuffer::Reader reader(&buffer);
645   auto pull_md = reader.PullClientInitialMetadata();
646   EXPECT_THAT(pull_md(), IsReady());  // value tested elsewhere
647   auto pusher = buffer.PushMessage(TestMessage());
648   EXPECT_THAT(pusher(), IsReady(49));
649   EXPECT_EQ(buffer.FinishSends(), Success{});
650   auto pull_msg = reader.PullMessage();
651   auto poll_msg = pull_msg();
652   ASSERT_THAT(poll_msg, IsReady());
653   ASSERT_TRUE(poll_msg.value().ok());
654   ASSERT_TRUE(poll_msg.value().value().has_value());
655   EXPECT_THAT(poll_msg.value().value().value(), IsTestMessage());
656   auto pull_msg2 = reader.PullMessage();
657   poll_msg = pull_msg2();
658   ASSERT_THAT(poll_msg, IsReady());
659   ASSERT_TRUE(poll_msg.value().ok());
660   EXPECT_FALSE(poll_msg.value().value().has_value());
661 }
662 
TEST(RequestBufferTest,CancelBeforeInitialMetadataPush)663 TEST(RequestBufferTest, CancelBeforeInitialMetadataPush) {
664   RequestBuffer buffer;
665   buffer.Cancel();
666   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), Failure{});
667   RequestBuffer::Reader reader(&buffer);
668   auto pull_md = reader.PullClientInitialMetadata();
669   auto poll_md = pull_md();
670   ASSERT_THAT(poll_md, IsReady());
671   ASSERT_FALSE(poll_md.value().ok());
672 }
673 
TEST(RequestBufferTest,CancelBeforeInitialMetadataPull)674 TEST(RequestBufferTest, CancelBeforeInitialMetadataPull) {
675   RequestBuffer buffer;
676   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
677   buffer.Cancel();
678   RequestBuffer::Reader reader(&buffer);
679   auto pull_md = reader.PullClientInitialMetadata();
680   auto poll_md = pull_md();
681   ASSERT_THAT(poll_md, IsReady());
682   ASSERT_FALSE(poll_md.value().ok());
683 }
684 
TEST(RequestBufferTest,CancelBeforeMessagePush)685 TEST(RequestBufferTest, CancelBeforeMessagePush) {
686   RequestBuffer buffer;
687   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
688   buffer.Cancel();
689   auto pusher = buffer.PushMessage(TestMessage());
690   auto poll = pusher();
691   ASSERT_THAT(poll, IsReady());
692   ASSERT_FALSE(poll.value().ok());
693   RequestBuffer::Reader reader(&buffer);
694   auto pull_md = reader.PullClientInitialMetadata();
695   auto poll_md = pull_md();
696   ASSERT_THAT(poll_md, IsReady());
697   ASSERT_FALSE(poll_md.value().ok());
698 }
699 
TEST(RequestBufferTest,CancelBeforeMessagePushButAfterInitialMetadataPull)700 TEST(RequestBufferTest, CancelBeforeMessagePushButAfterInitialMetadataPull) {
701   RequestBuffer buffer;
702   EXPECT_EQ(buffer.PushClientInitialMetadata(TestMetadata()), 40);
703   RequestBuffer::Reader reader(&buffer);
704   auto pull_md = reader.PullClientInitialMetadata();
705   auto poll_md = pull_md();
706   ASSERT_THAT(poll_md, IsReady());
707   ASSERT_TRUE(poll_md.value().ok());
708   EXPECT_THAT(*poll_md.value(), IsTestMetadata());
709   buffer.Cancel();
710   auto pusher = buffer.PushMessage(TestMessage());
711   auto poll = pusher();
712   ASSERT_THAT(poll, IsReady());
713   ASSERT_FALSE(poll.value().ok());
714 }
715 
716 }  // namespace grpc_core
717 
main(int argc,char ** argv)718 int main(int argc, char** argv) {
719   ::testing::InitGoogleTest(&argc, argv);
720   return RUN_ALL_TESTS();
721 }
722