• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/transport/promise_endpoint.h"
16 
17 // IWYU pragma: no_include <sys/socket.h>
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/event_engine/port.h>  // IWYU pragma: keep
21 #include <grpc/event_engine/slice_buffer.h>
22 
23 #include <cstring>
24 #include <memory>
25 #include <string>
26 #include <tuple>
27 
28 #include "absl/functional/any_invocable.h"
29 #include "gmock/gmock.h"
30 #include "gtest/gtest.h"
31 #include "src/core/lib/promise/activity.h"
32 #include "src/core/lib/promise/join.h"
33 #include "src/core/lib/promise/seq.h"
34 #include "src/core/lib/slice/slice.h"
35 #include "src/core/lib/slice/slice_buffer.h"
36 #include "src/core/lib/slice/slice_internal.h"
37 #include "test/core/promise/test_wakeup_schedulers.h"
38 
39 using testing::AtMost;
40 using testing::MockFunction;
41 using testing::Return;
42 using testing::ReturnRef;
43 using testing::Sequence;
44 using testing::StrictMock;
45 using testing::WithArg;
46 using testing::WithArgs;
47 
48 namespace grpc_core {
49 namespace testing {
50 
51 class MockEndpoint
52     : public grpc_event_engine::experimental::EventEngine::Endpoint {
53  public:
54   MOCK_METHOD(
55       bool, Read,
56       (absl::AnyInvocable<void(absl::Status)> on_read,
57        grpc_event_engine::experimental::SliceBuffer* buffer,
58        const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
59            args),
60       (override));
61 
62   MOCK_METHOD(
63       bool, Write,
64       (absl::AnyInvocable<void(absl::Status)> on_writable,
65        grpc_event_engine::experimental::SliceBuffer* data,
66        const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
67            args),
68       (override));
69 
70   MOCK_METHOD(
71       const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
72       GetPeerAddress, (), (const, override));
73   MOCK_METHOD(
74       const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
75       GetLocalAddress, (), (const, override));
76 };
77 
78 class MockActivity : public Activity, public Wakeable {
79  public:
80   MOCK_METHOD(void, WakeupRequested, ());
81 
ForceImmediateRepoll(WakeupMask)82   void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
Orphan()83   void Orphan() override {}
MakeOwningWaker()84   Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()85   Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)86   void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
WakeupAsync(WakeupMask)87   void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
Drop(WakeupMask)88   void Drop(WakeupMask /*mask*/) override {}
DebugTag() const89   std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const90   std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
91     return DebugTag();
92   }
93 
Activate()94   void Activate() {
95     if (scoped_activity_ == nullptr) {
96       scoped_activity_ = std::make_unique<ScopedActivity>(this);
97     }
98   }
99 
Deactivate()100   void Deactivate() { scoped_activity_.reset(); }
101 
102  private:
103   std::unique_ptr<ScopedActivity> scoped_activity_;
104 };
105 
106 class PromiseEndpointTest : public ::testing::Test {
107  public:
PromiseEndpointTest()108   PromiseEndpointTest()
109       : mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
110         mock_endpoint_(*mock_endpoint_ptr_),
111         promise_endpoint_(std::make_unique<PromiseEndpoint>(
112             std::unique_ptr<
113                 grpc_event_engine::experimental::EventEngine::Endpoint>(
114                 mock_endpoint_ptr_),
115             SliceBuffer())) {}
116 
117  private:
118   MockEndpoint* mock_endpoint_ptr_;
119 
120  protected:
121   MockEndpoint& mock_endpoint_;
122   std::unique_ptr<PromiseEndpoint> promise_endpoint_;
123 
124   const absl::Status kDummyErrorStatus =
125       absl::ErrnoToStatus(5566, "just an error");
126   static constexpr size_t kDummyRequestSize = 5566u;
127 };
128 
TEST_F(PromiseEndpointTest,OneReadSuccessful)129 TEST_F(PromiseEndpointTest, OneReadSuccessful) {
130   MockActivity activity;
131   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
132   activity.Activate();
133   EXPECT_CALL(activity, WakeupRequested).Times(0);
134   EXPECT_CALL(mock_endpoint_, Read)
135       .WillOnce(WithArgs<1>(
136           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
137             // Schedule mock_endpoint to read buffer.
138             grpc_event_engine::experimental::Slice slice(
139                 grpc_slice_from_cpp_string(kBuffer));
140             buffer->Append(std::move(slice));
141             return true;
142           }));
143   auto promise = promise_endpoint_->Read(kBuffer.size());
144   auto poll = promise();
145   ASSERT_TRUE(poll.ready());
146   ASSERT_TRUE(poll.value().ok());
147   EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
148   activity.Deactivate();
149 }
150 
TEST_F(PromiseEndpointTest,OneReadFailed)151 TEST_F(PromiseEndpointTest, OneReadFailed) {
152   MockActivity activity;
153   activity.Activate();
154   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
155   EXPECT_CALL(mock_endpoint_, Read)
156       .WillOnce(WithArgs<0>(
157           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
158             // Mock EventEngine endpoint read fails.
159             read_callback(this->kDummyErrorStatus);
160             return false;
161           }));
162   auto promise = promise_endpoint_->Read(kDummyRequestSize);
163   auto poll = promise();
164   ASSERT_TRUE(poll.ready());
165   ASSERT_FALSE(poll.value().ok());
166   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
167   activity.Deactivate();
168 }
169 
TEST_F(PromiseEndpointTest,MultipleReadsSuccessful)170 TEST_F(PromiseEndpointTest, MultipleReadsSuccessful) {
171   MockActivity activity;
172   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
173   activity.Activate();
174   EXPECT_CALL(activity, WakeupRequested).Times(0);
175   Sequence s;
176   EXPECT_CALL(mock_endpoint_, Read)
177       .InSequence(s)
178       .WillOnce(WithArg<1>(
179           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
180             // Schedule mock_endpoint to read buffer.
181             grpc_event_engine::experimental::Slice slice(
182                 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
183             buffer->Append(std::move(slice));
184             return true;
185           }));
186   EXPECT_CALL(mock_endpoint_, Read)
187       .InSequence(s)
188       .WillOnce(WithArg<1>(
189           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
190             // Schedule mock_endpoint to read buffer.
191             grpc_event_engine::experimental::Slice slice(
192                 grpc_slice_from_cpp_string(kBuffer.substr(4)));
193             buffer->Append(std::move(slice));
194             return true;
195           }));
196   {
197     auto promise = promise_endpoint_->Read(4u);
198     auto poll = promise();
199     ASSERT_TRUE(poll.ready());
200     ASSERT_TRUE(poll.value().ok());
201     EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(0, 4));
202   }
203   {
204     auto promise = promise_endpoint_->Read(4u);
205     auto poll = promise();
206     ASSERT_TRUE(poll.ready());
207     ASSERT_TRUE(poll.value().ok());
208     EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(4));
209   }
210   activity.Deactivate();
211 }
212 
TEST_F(PromiseEndpointTest,OnePendingReadSuccessful)213 TEST_F(PromiseEndpointTest, OnePendingReadSuccessful) {
214   MockActivity activity;
215   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
216   absl::AnyInvocable<void(absl::Status)> read_callback;
217   activity.Activate();
218   EXPECT_CALL(activity, WakeupRequested).Times(1);
219   EXPECT_CALL(mock_endpoint_, Read)
220       .WillOnce(WithArgs<0, 1>(
221           [&read_callback, &kBuffer](
222               absl::AnyInvocable<void(absl::Status)> on_read,
223               grpc_event_engine::experimental::SliceBuffer* buffer) {
224             read_callback = std::move(on_read);
225             // Schedule mock_endpoint to read buffer.
226             grpc_event_engine::experimental::Slice slice(
227                 grpc_slice_from_cpp_string(kBuffer));
228             buffer->Append(std::move(slice));
229             // Return false to mock EventEngine read not finish..
230             return false;
231           }));
232   auto promise = promise_endpoint_->Read(kBuffer.size());
233   EXPECT_TRUE(promise().pending());
234   // Mock EventEngine read succeeds, and promise resolves.
235   read_callback(absl::OkStatus());
236   auto poll = promise();
237   ASSERT_TRUE(poll.ready());
238   ASSERT_TRUE(poll.value().ok());
239   EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
240   activity.Deactivate();
241 }
242 
TEST_F(PromiseEndpointTest,OnePendingReadFailed)243 TEST_F(PromiseEndpointTest, OnePendingReadFailed) {
244   MockActivity activity;
245   absl::AnyInvocable<void(absl::Status)> read_callback;
246   activity.Activate();
247   EXPECT_CALL(activity, WakeupRequested).Times(1);
248   EXPECT_CALL(mock_endpoint_, Read)
249       .WillOnce(WithArgs<0>(
250           [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
251             read_callback = std::move(on_read);
252             // Return false to mock EventEngine read not finish.
253             return false;
254           }));
255   auto promise = promise_endpoint_->Read(kDummyRequestSize);
256   EXPECT_TRUE(promise().pending());
257   // Mock EventEngine read fails, and promise returns error.
258   read_callback(kDummyErrorStatus);
259   auto poll = promise();
260   ASSERT_TRUE(poll.ready());
261   ASSERT_FALSE(poll.value().ok());
262   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
263   activity.Deactivate();
264 }
265 
TEST_F(PromiseEndpointTest,OneReadSliceSuccessful)266 TEST_F(PromiseEndpointTest, OneReadSliceSuccessful) {
267   MockActivity activity;
268   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
269   activity.Activate();
270   EXPECT_CALL(activity, WakeupRequested).Times(0);
271   EXPECT_CALL(mock_endpoint_, Read)
272       .WillOnce(WithArgs<1>(
273           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
274             // Schedule mock_endpoint to read buffer.
275             grpc_event_engine::experimental::Slice slice(
276                 grpc_slice_from_cpp_string(kBuffer));
277             buffer->Append(std::move(slice));
278             return true;
279           }));
280   auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
281   auto poll = promise();
282   ASSERT_TRUE(poll.ready());
283   ASSERT_TRUE(poll.value().ok());
284   EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
285   activity.Deactivate();
286 }
287 
TEST_F(PromiseEndpointTest,OneReadSliceFailed)288 TEST_F(PromiseEndpointTest, OneReadSliceFailed) {
289   MockActivity activity;
290   activity.Activate();
291   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
292   EXPECT_CALL(mock_endpoint_, Read)
293       .WillOnce(WithArgs<0>(
294           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
295             // Mock EventEngine endpoint read fails.
296             read_callback(this->kDummyErrorStatus);
297             return false;
298           }));
299   auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
300   auto poll = promise();
301   ASSERT_TRUE(poll.ready());
302   ASSERT_FALSE(poll.value().ok());
303   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
304   activity.Deactivate();
305 }
306 
TEST_F(PromiseEndpointTest,MultipleReadSlicesSuccessful)307 TEST_F(PromiseEndpointTest, MultipleReadSlicesSuccessful) {
308   MockActivity activity;
309   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
310   activity.Activate();
311   EXPECT_CALL(activity, WakeupRequested).Times(0);
312   Sequence s;
313   EXPECT_CALL(mock_endpoint_, Read)
314       .InSequence(s)
315       .WillOnce(WithArg<1>(
316           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
317             // Schedule mock_endpoint to read buffer.
318             grpc_event_engine::experimental::Slice slice(
319                 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
320             buffer->Append(std::move(slice));
321             return true;
322           }));
323   EXPECT_CALL(mock_endpoint_, Read)
324       .InSequence(s)
325       .WillOnce(WithArg<1>(
326           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
327             // Schedule mock_endpoint to read buffer.
328             grpc_event_engine::experimental::Slice slice(
329                 grpc_slice_from_cpp_string(kBuffer.substr(4)));
330             buffer->Append(std::move(slice));
331             return true;
332           }));
333   {
334     auto promise = promise_endpoint_->ReadSlice(4u);
335     auto poll = promise();
336     ASSERT_TRUE(poll.ready());
337     ASSERT_TRUE(poll.value().ok());
338     EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(0, 4));
339   }
340   {
341     auto promise = promise_endpoint_->ReadSlice(4u);
342     auto poll = promise();
343     ASSERT_TRUE(poll.ready());
344     ASSERT_TRUE(poll.value().ok());
345     EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(4));
346   }
347   activity.Deactivate();
348 }
349 
TEST_F(PromiseEndpointTest,OnePendingReadSliceSuccessful)350 TEST_F(PromiseEndpointTest, OnePendingReadSliceSuccessful) {
351   MockActivity activity;
352   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
353   absl::AnyInvocable<void(absl::Status)> read_callback;
354   activity.Activate();
355   EXPECT_CALL(activity, WakeupRequested).Times(1);
356   EXPECT_CALL(mock_endpoint_, Read)
357       .WillOnce(WithArgs<0, 1>(
358           [&read_callback, &kBuffer](
359               absl::AnyInvocable<void(absl::Status)> on_read,
360               grpc_event_engine::experimental::SliceBuffer* buffer) {
361             read_callback = std::move(on_read);
362             // Schedule mock_endpoint to read buffer.
363             grpc_event_engine::experimental::Slice slice(
364                 grpc_slice_from_cpp_string(kBuffer));
365             buffer->Append(std::move(slice));
366             // Return false to mock EventEngine read not finish..
367             return false;
368           }));
369   auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
370   EXPECT_TRUE(promise().pending());
371   // Mock EventEngine read succeeds, and promise resolves.
372   read_callback(absl::OkStatus());
373   auto poll = promise();
374   ASSERT_TRUE(poll.ready());
375   ASSERT_TRUE(poll.value().ok());
376   EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
377   activity.Deactivate();
378 }
379 
TEST_F(PromiseEndpointTest,OnePendingReadSliceFailed)380 TEST_F(PromiseEndpointTest, OnePendingReadSliceFailed) {
381   MockActivity activity;
382   absl::AnyInvocable<void(absl::Status)> read_callback;
383   activity.Activate();
384   EXPECT_CALL(activity, WakeupRequested).Times(1);
385   EXPECT_CALL(mock_endpoint_, Read)
386       .WillOnce(WithArgs<0>(
387           [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
388             read_callback = std::move(on_read);
389             // Return false to mock EventEngine read not finish.
390             return false;
391           }));
392   auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
393   EXPECT_TRUE(promise().pending());
394   // Mock EventEngine read fails, and promise returns error.
395   read_callback(kDummyErrorStatus);
396   auto poll = promise();
397   ASSERT_TRUE(poll.ready());
398   ASSERT_FALSE(poll.value().ok());
399   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
400   activity.Deactivate();
401 }
402 
TEST_F(PromiseEndpointTest,OneReadByteSuccessful)403 TEST_F(PromiseEndpointTest, OneReadByteSuccessful) {
404   MockActivity activity;
405   const std::string kBuffer = {0x01};
406   activity.Activate();
407   EXPECT_CALL(activity, WakeupRequested).Times(0);
408   EXPECT_CALL(mock_endpoint_, Read)
409       .WillOnce(WithArgs<1>(
410           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
411             // Schedule mock_endpoint to read buffer.
412             grpc_event_engine::experimental::Slice slice(
413                 grpc_slice_from_cpp_string(kBuffer));
414             buffer->Append(std::move(slice));
415             return true;
416           }));
417   auto promise = promise_endpoint_->ReadByte();
418   auto poll = promise();
419   ASSERT_TRUE(poll.ready());
420   ASSERT_TRUE(poll.value().ok());
421   EXPECT_EQ(*poll.value(), kBuffer[0]);
422   activity.Deactivate();
423 }
424 
TEST_F(PromiseEndpointTest,OneReadByteFailed)425 TEST_F(PromiseEndpointTest, OneReadByteFailed) {
426   MockActivity activity;
427   activity.Activate();
428   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
429   EXPECT_CALL(mock_endpoint_, Read)
430       .WillOnce(WithArgs<0>(
431           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
432             // Mock EventEngine endpoint read fails.
433             read_callback(this->kDummyErrorStatus);
434             return false;
435           }));
436   auto promise = promise_endpoint_->ReadByte();
437   auto poll = promise();
438   ASSERT_TRUE(poll.ready());
439   ASSERT_FALSE(poll.value().ok());
440   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
441   activity.Deactivate();
442 }
443 
TEST_F(PromiseEndpointTest,MultipleReadBytesSuccessful)444 TEST_F(PromiseEndpointTest, MultipleReadBytesSuccessful) {
445   MockActivity activity;
446   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
447   activity.Activate();
448   EXPECT_CALL(activity, WakeupRequested).Times(0);
449   EXPECT_CALL(mock_endpoint_, Read)
450       .WillOnce(WithArg<1>(
451           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
452             // Schedule mock_endpoint to read buffer.
453             grpc_event_engine::experimental::Slice slice(
454                 grpc_slice_from_cpp_string(kBuffer));
455             buffer->Append(std::move(slice));
456             return true;
457           }));
458   for (size_t i = 0; i < kBuffer.size(); ++i) {
459     auto promise = promise_endpoint_->ReadByte();
460     auto poll = promise();
461     ASSERT_TRUE(poll.ready());
462     ASSERT_TRUE(poll.value().ok());
463     EXPECT_EQ(*poll.value(), kBuffer[i]);
464   }
465   activity.Deactivate();
466 }
467 
TEST_F(PromiseEndpointTest,OnePendingReadByteSuccessful)468 TEST_F(PromiseEndpointTest, OnePendingReadByteSuccessful) {
469   MockActivity activity;
470   const std::string kBuffer = {0x01};
471   absl::AnyInvocable<void(absl::Status)> read_callback;
472   activity.Activate();
473   EXPECT_CALL(activity, WakeupRequested).Times(1);
474   EXPECT_CALL(mock_endpoint_, Read)
475       .WillOnce(WithArgs<0, 1>(
476           [&read_callback, &kBuffer](
477               absl::AnyInvocable<void(absl::Status)> on_read,
478               grpc_event_engine::experimental::SliceBuffer* buffer) {
479             read_callback = std::move(on_read);
480             // Schedule mock_endpoint to read buffer.
481             grpc_event_engine::experimental::Slice slice(
482                 grpc_slice_from_cpp_string(kBuffer));
483             buffer->Append(std::move(slice));
484             // Return false to mock EventEngine read not finish..
485             return false;
486           }));
487   auto promise = promise_endpoint_->ReadByte();
488   ASSERT_TRUE(promise().pending());
489   // Mock EventEngine read succeeds, and promise resolves.
490   read_callback(absl::OkStatus());
491   auto poll = promise();
492   ASSERT_TRUE(poll.ready());
493   ASSERT_TRUE(poll.value().ok());
494   EXPECT_EQ(*poll.value(), kBuffer[0]);
495   activity.Deactivate();
496 }
497 
TEST_F(PromiseEndpointTest,OnePendingReadByteFailed)498 TEST_F(PromiseEndpointTest, OnePendingReadByteFailed) {
499   MockActivity activity;
500   absl::AnyInvocable<void(absl::Status)> read_callback;
501   activity.Activate();
502   EXPECT_CALL(activity, WakeupRequested).Times(1);
503   EXPECT_CALL(mock_endpoint_, Read)
504       .WillOnce(WithArgs<0>(
505           [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
506             read_callback = std::move(on_read);
507             // Return false to mock EventEngine read not finish.
508             return false;
509           }));
510   auto promise = promise_endpoint_->ReadByte();
511   ASSERT_TRUE(promise().pending());
512   // Mock EventEngine read fails, and promise returns error.
513   read_callback(kDummyErrorStatus);
514   auto poll = promise();
515   ASSERT_TRUE(poll.ready());
516   ASSERT_FALSE(poll.value().ok());
517   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
518   activity.Deactivate();
519 }
520 
TEST_F(PromiseEndpointTest,OneWriteSuccessful)521 TEST_F(PromiseEndpointTest, OneWriteSuccessful) {
522   MockActivity activity;
523   activity.Activate();
524   EXPECT_CALL(activity, WakeupRequested).Times(0);
525   EXPECT_CALL(mock_endpoint_, Write).WillOnce(Return(true));
526   auto promise = promise_endpoint_->Write(
527       SliceBuffer(Slice::FromCopiedString("hello world")));
528   auto poll = promise();
529   ASSERT_TRUE(poll.ready());
530   EXPECT_EQ(absl::OkStatus(), poll.value());
531   activity.Deactivate();
532 }
533 
TEST_F(PromiseEndpointTest,EmptyWriteIsNoOp)534 TEST_F(PromiseEndpointTest, EmptyWriteIsNoOp) {
535   MockActivity activity;
536   activity.Activate();
537   EXPECT_CALL(activity, WakeupRequested).Times(0);
538   EXPECT_CALL(mock_endpoint_, Write).Times(0);
539   auto promise = promise_endpoint_->Write(SliceBuffer());
540   auto poll = promise();
541   ASSERT_TRUE(poll.ready());
542   EXPECT_EQ(absl::OkStatus(), poll.value());
543   activity.Deactivate();
544 }
545 
TEST_F(PromiseEndpointTest,OneWriteFailed)546 TEST_F(PromiseEndpointTest, OneWriteFailed) {
547   MockActivity activity;
548   activity.Activate();
549   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
550   EXPECT_CALL(mock_endpoint_, Write)
551       .WillOnce(
552           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
553             on_write(this->kDummyErrorStatus);
554             return false;
555           }));
556   auto promise = promise_endpoint_->Write(
557       SliceBuffer(Slice::FromCopiedString("hello world")));
558   auto poll = promise();
559   ASSERT_TRUE(poll.ready());
560   EXPECT_EQ(kDummyErrorStatus, poll.value());
561   activity.Deactivate();
562 }
563 
TEST_F(PromiseEndpointTest,OnePendingWriteSuccessful)564 TEST_F(PromiseEndpointTest, OnePendingWriteSuccessful) {
565   MockActivity activity;
566   absl::AnyInvocable<void(absl::Status)> write_callback;
567   activity.Activate();
568   EXPECT_CALL(activity, WakeupRequested).Times(1);
569   EXPECT_CALL(mock_endpoint_, Write)
570       .WillOnce(WithArgs<0, 1>(
571           [&write_callback](
572               absl::AnyInvocable<void(absl::Status)> on_write,
573               grpc_event_engine::experimental::SliceBuffer* buffer) {
574             write_callback = std::move(on_write);
575             // Schedule mock_endpoint to write buffer.
576             buffer->Append(grpc_event_engine::experimental::Slice());
577             // Return false to mock EventEngine write pending..
578             return false;
579           }));
580   auto promise = promise_endpoint_->Write(
581       SliceBuffer(Slice::FromCopiedString("hello world")));
582   EXPECT_TRUE(promise().pending());
583   // Mock EventEngine write succeeds, and promise resolves.
584   write_callback(absl::OkStatus());
585   auto poll = promise();
586   ASSERT_TRUE(poll.ready());
587   EXPECT_EQ(absl::OkStatus(), poll.value());
588   activity.Deactivate();
589 }
590 
TEST_F(PromiseEndpointTest,OnePendingWriteFailed)591 TEST_F(PromiseEndpointTest, OnePendingWriteFailed) {
592   MockActivity activity;
593   absl::AnyInvocable<void(absl::Status)> write_callback;
594   activity.Activate();
595   EXPECT_CALL(activity, WakeupRequested).Times(1);
596   EXPECT_CALL(mock_endpoint_, Write)
597       .WillOnce(WithArg<0>(
598           [&write_callback](absl::AnyInvocable<void(absl::Status)> on_write) {
599             write_callback = std::move(on_write);
600             // Return false to mock EventEngine write pending..
601             return false;
602           }));
603   auto promise = promise_endpoint_->Write(
604       SliceBuffer(Slice::FromCopiedString("hello world")));
605   EXPECT_TRUE(promise().pending());
606   write_callback(kDummyErrorStatus);
607   auto poll = promise();
608   ASSERT_TRUE(poll.ready());
609   EXPECT_EQ(kDummyErrorStatus, poll.value());
610   activity.Deactivate();
611 }
612 
TEST_F(PromiseEndpointTest,GetPeerAddress)613 TEST_F(PromiseEndpointTest, GetPeerAddress) {
614   const char raw_test_address[] = {0x55, 0x66, 0x01, 0x55, 0x66, 0x01};
615   grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
616       reinterpret_cast<const sockaddr*>(raw_test_address),
617       sizeof(raw_test_address));
618   EXPECT_CALL(mock_endpoint_, GetPeerAddress).WillOnce(ReturnRef(test_address));
619   auto peer_address = promise_endpoint_->GetPeerAddress();
620   EXPECT_EQ(0, std::memcmp(test_address.address(), test_address.address(),
621                            test_address.size()));
622   EXPECT_EQ(test_address.size(), peer_address.size());
623 }
624 
TEST_F(PromiseEndpointTest,GetLocalAddress)625 TEST_F(PromiseEndpointTest, GetLocalAddress) {
626   const char raw_test_address[] = {0x52, 0x55, 0x66, 0x52, 0x55, 0x66};
627   grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
628       reinterpret_cast<const sockaddr*>(raw_test_address),
629       sizeof(raw_test_address));
630   EXPECT_CALL(mock_endpoint_, GetLocalAddress)
631       .WillOnce(ReturnRef(test_address));
632   auto local_address = promise_endpoint_->GetLocalAddress();
633   EXPECT_EQ(0, std::memcmp(test_address.address(), local_address.address(),
634                            test_address.size()));
635   EXPECT_EQ(test_address.size(), local_address.size());
636 }
637 
TEST_F(PromiseEndpointTest,DestroyedBeforeReadCompletes)638 TEST_F(PromiseEndpointTest, DestroyedBeforeReadCompletes) {
639   MockActivity activity;
640   const std::string kBuffer = {0x01};
641   absl::AnyInvocable<void(absl::Status)> read_callback;
642   activity.Activate();
643   EXPECT_CALL(activity, WakeupRequested).Times(1);
644   EXPECT_CALL(mock_endpoint_, Read)
645       .WillOnce(WithArgs<0, 1>(
646           [&read_callback, &kBuffer](
647               absl::AnyInvocable<void(absl::Status)> on_read,
648               grpc_event_engine::experimental::SliceBuffer* buffer) {
649             read_callback = std::move(on_read);
650             // Schedule mock_endpoint to read buffer.
651             grpc_event_engine::experimental::Slice slice(
652                 grpc_slice_from_cpp_string(kBuffer));
653             buffer->Append(std::move(slice));
654             // Return false to mock EventEngine read not finish..
655             return false;
656           }));
657   auto promise = promise_endpoint_->ReadByte();
658   ASSERT_TRUE(promise().pending());
659   promise_endpoint_.reset();
660   // Mock EventEngine read succeeds, and promise resolves.
661   read_callback(absl::OkStatus());
662   auto poll = promise();
663   ASSERT_TRUE(poll.ready());
664   ASSERT_TRUE(poll.value().ok());
665   EXPECT_EQ(*poll.value(), kBuffer[0]);
666   activity.Deactivate();
667 }
668 
669 class MultiplePromiseEndpointTest : public ::testing::Test {
670  public:
MultiplePromiseEndpointTest()671   MultiplePromiseEndpointTest()
672       : first_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
673         second_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
674         first_mock_endpoint_(*first_mock_endpoint_ptr_),
675         second_mock_endpoint_(*second_mock_endpoint_ptr_),
676         first_promise_endpoint_(
677             std::unique_ptr<
678                 grpc_event_engine::experimental::EventEngine::Endpoint>(
679                 first_mock_endpoint_ptr_),
680             SliceBuffer()),
681         second_promise_endpoint_(
682             std::unique_ptr<
683                 grpc_event_engine::experimental::EventEngine::Endpoint>(
684                 second_mock_endpoint_ptr_),
685             SliceBuffer()) {}
686 
687  private:
688   MockEndpoint* first_mock_endpoint_ptr_;
689   MockEndpoint* second_mock_endpoint_ptr_;
690 
691  protected:
692   MockEndpoint& first_mock_endpoint_;
693   MockEndpoint& second_mock_endpoint_;
694   PromiseEndpoint first_promise_endpoint_;
695   PromiseEndpoint second_promise_endpoint_;
696 
697   const absl::Status kDummyErrorStatus =
698       absl::ErrnoToStatus(5566, "just an error");
699   static constexpr size_t kDummyRequestSize = 5566u;
700 };
701 
TEST_F(MultiplePromiseEndpointTest,JoinReadsSuccessful)702 TEST_F(MultiplePromiseEndpointTest, JoinReadsSuccessful) {
703   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
704   EXPECT_CALL(first_mock_endpoint_, Read)
705       .WillOnce(WithArgs<1>(
706           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
707             // Schedule mock_endpoint to read buffer.
708             grpc_event_engine::experimental::Slice slice(
709                 grpc_slice_from_cpp_string(kBuffer));
710             buffer->Append(std::move(slice));
711             return true;
712           }));
713   EXPECT_CALL(second_mock_endpoint_, Read)
714       .WillOnce(WithArgs<1>(
715           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
716             // Schedule mock_endpoint to read buffer.
717             grpc_event_engine::experimental::Slice slice(
718                 grpc_slice_from_cpp_string(kBuffer));
719             buffer->Append(std::move(slice));
720             return true;
721           }));
722   StrictMock<MockFunction<void(absl::Status)>> on_done;
723   EXPECT_CALL(on_done, Call(absl::OkStatus()));
724   auto activity = MakeActivity(
725       [this, &kBuffer] {
726         return Seq(Join(this->first_promise_endpoint_.Read(kBuffer.size()),
727                         this->second_promise_endpoint_.Read(kBuffer.size())),
728                    [](std::tuple<absl::StatusOr<SliceBuffer>,
729                                  absl::StatusOr<SliceBuffer>>
730                           ret) {
731                      // Both reads finish with `absl::OkStatus`.
732                      EXPECT_TRUE(std::get<0>(ret).ok());
733                      EXPECT_TRUE(std::get<1>(ret).ok());
734                      return absl::OkStatus();
735                    });
736       },
737       InlineWakeupScheduler(),
738       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
739 }
740 
TEST_F(MultiplePromiseEndpointTest,JoinOneReadSuccessfulOneReadFailed)741 TEST_F(MultiplePromiseEndpointTest, JoinOneReadSuccessfulOneReadFailed) {
742   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
743   EXPECT_CALL(first_mock_endpoint_, Read)
744       .WillOnce(WithArgs<1>(
745           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
746             // Schedule mock_endpoint to read buffer.
747             grpc_event_engine::experimental::Slice slice(
748                 grpc_slice_from_cpp_string(kBuffer));
749             buffer->Append(std::move(slice));
750             return true;
751           }));
752   EXPECT_CALL(second_mock_endpoint_, Read)
753       .WillOnce(WithArgs<0>(
754           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
755             // Mock EventEngine endpoint read fails.
756             read_callback(this->kDummyErrorStatus);
757             return false;
758           }));
759   StrictMock<MockFunction<void(absl::Status)>> on_done;
760   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
761   auto activity = MakeActivity(
762       [this, &kBuffer] {
763         return Seq(
764             Join(this->first_promise_endpoint_.Read(kBuffer.size()),
765                  this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
766             [this](std::tuple<absl::StatusOr<SliceBuffer>,
767                               absl::StatusOr<SliceBuffer>>
768                        ret) {
769               // One read finishes with `absl::OkStatus` and the other read
770               // fails.
771               EXPECT_TRUE(std::get<0>(ret).ok());
772               EXPECT_FALSE(std::get<1>(ret).ok());
773               EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
774               return this->kDummyErrorStatus;
775             });
776       },
777       InlineWakeupScheduler(),
778       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
779 }
780 
TEST_F(MultiplePromiseEndpointTest,JoinReadsFailed)781 TEST_F(MultiplePromiseEndpointTest, JoinReadsFailed) {
782   EXPECT_CALL(first_mock_endpoint_, Read)
783       .WillOnce(WithArgs<0>(
784           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
785             // Mock EventEngine endpoint read fails.
786             read_callback(this->kDummyErrorStatus);
787             return false;
788           }));
789   EXPECT_CALL(second_mock_endpoint_, Read)
790       .WillOnce(WithArgs<0>(
791           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
792             // Mock EventEngine endpoint read fails.
793             read_callback(this->kDummyErrorStatus);
794             return false;
795           }));
796   StrictMock<MockFunction<void(absl::Status)>> on_done;
797   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
798   auto activity = MakeActivity(
799       [this] {
800         return Seq(
801             Join(this->first_promise_endpoint_.Read(this->kDummyRequestSize),
802                  this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
803             [this](std::tuple<absl::StatusOr<SliceBuffer>,
804                               absl::StatusOr<SliceBuffer>>
805                        ret) {
806               // Both reads finish with errors.
807               EXPECT_FALSE(std::get<0>(ret).ok());
808               EXPECT_FALSE(std::get<1>(ret).ok());
809               EXPECT_EQ(std::get<0>(ret).status(), this->kDummyErrorStatus);
810               EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
811               return this->kDummyErrorStatus;
812             });
813       },
814       InlineWakeupScheduler(),
815       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
816 }
817 
TEST_F(MultiplePromiseEndpointTest,JoinWritesSuccessful)818 TEST_F(MultiplePromiseEndpointTest, JoinWritesSuccessful) {
819   EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
820   EXPECT_CALL(second_mock_endpoint_, Write).WillOnce(Return(true));
821   StrictMock<MockFunction<void(absl::Status)>> on_done;
822   EXPECT_CALL(on_done, Call(absl::OkStatus()));
823   auto activity = MakeActivity(
824       [this] {
825         return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
826                             Slice::FromCopiedString("hello world"))),
827                         this->second_promise_endpoint_.Write(SliceBuffer(
828                             Slice::FromCopiedString("hello world")))),
829                    [](std::tuple<absl::Status, absl::Status> ret) {
830                      // Both writes finish with `absl::OkStatus`.
831                      EXPECT_TRUE(std::get<0>(ret).ok());
832                      EXPECT_TRUE(std::get<1>(ret).ok());
833                      return absl::OkStatus();
834                    });
835       },
836       InlineWakeupScheduler(),
837       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
838 }
839 
TEST_F(MultiplePromiseEndpointTest,JoinOneWriteSuccessfulOneWriteFailed)840 TEST_F(MultiplePromiseEndpointTest, JoinOneWriteSuccessfulOneWriteFailed) {
841   EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
842   EXPECT_CALL(second_mock_endpoint_, Write)
843       .WillOnce(
844           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
845             on_write(this->kDummyErrorStatus);
846             return false;
847           }));
848   StrictMock<MockFunction<void(absl::Status)>> on_done;
849   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
850   auto activity = MakeActivity(
851       [this] {
852         return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
853                             Slice::FromCopiedString("hello world"))),
854                         this->second_promise_endpoint_.Write(SliceBuffer(
855                             Slice::FromCopiedString("hello world")))),
856                    [this](std::tuple<absl::Status, absl::Status> ret) {
857                      // One write finish with `absl::OkStatus` and the other
858                      // write fails.
859                      EXPECT_TRUE(std::get<0>(ret).ok());
860                      EXPECT_FALSE(std::get<1>(ret).ok());
861                      EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
862                      return this->kDummyErrorStatus;
863                    });
864       },
865       InlineWakeupScheduler(),
866       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
867 }
868 
TEST_F(MultiplePromiseEndpointTest,JoinWritesFailed)869 TEST_F(MultiplePromiseEndpointTest, JoinWritesFailed) {
870   EXPECT_CALL(first_mock_endpoint_, Write)
871       .WillOnce(
872           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
873             on_write(this->kDummyErrorStatus);
874             return false;
875           }));
876   EXPECT_CALL(second_mock_endpoint_, Write)
877       .WillOnce(
878           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
879             on_write(this->kDummyErrorStatus);
880             return false;
881           }));
882   StrictMock<MockFunction<void(absl::Status)>> on_done;
883   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
884   auto activity = MakeActivity(
885       [this] {
886         return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
887                             Slice::FromCopiedString("hello world"))),
888                         this->second_promise_endpoint_.Write(SliceBuffer(
889                             Slice::FromCopiedString("hello world")))),
890                    [this](std::tuple<absl::Status, absl::Status> ret) {
891                      // Both writes fail with errors.
892                      EXPECT_FALSE(std::get<0>(ret).ok());
893                      EXPECT_FALSE(std::get<1>(ret).ok());
894                      EXPECT_EQ(std::get<0>(ret), this->kDummyErrorStatus);
895                      EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
896                      return this->kDummyErrorStatus;
897                    });
898       },
899       InlineWakeupScheduler(),
900       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
901 }
902 
903 }  // namespace testing
904 }  // namespace grpc_core
905 
main(int argc,char ** argv)906 int main(int argc, char** argv) {
907   ::testing::InitGoogleTest(&argc, argv);
908   return RUN_ALL_TESTS();
909 }
910