1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/transport/promise_endpoint.h"
16
17 // IWYU pragma: no_include <sys/socket.h>
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/event_engine/port.h> // IWYU pragma: keep
21 #include <grpc/event_engine/slice_buffer.h>
22
23 #include <cstring>
24 #include <memory>
25 #include <string>
26 #include <tuple>
27
28 #include "absl/functional/any_invocable.h"
29 #include "gmock/gmock.h"
30 #include "gtest/gtest.h"
31 #include "src/core/lib/promise/activity.h"
32 #include "src/core/lib/promise/join.h"
33 #include "src/core/lib/promise/seq.h"
34 #include "src/core/lib/slice/slice.h"
35 #include "src/core/lib/slice/slice_buffer.h"
36 #include "src/core/lib/slice/slice_internal.h"
37 #include "test/core/promise/test_wakeup_schedulers.h"
38
39 using testing::AtMost;
40 using testing::MockFunction;
41 using testing::Return;
42 using testing::ReturnRef;
43 using testing::Sequence;
44 using testing::StrictMock;
45 using testing::WithArg;
46 using testing::WithArgs;
47
48 namespace grpc_core {
49 namespace testing {
50
51 class MockEndpoint
52 : public grpc_event_engine::experimental::EventEngine::Endpoint {
53 public:
54 MOCK_METHOD(
55 bool, Read,
56 (absl::AnyInvocable<void(absl::Status)> on_read,
57 grpc_event_engine::experimental::SliceBuffer* buffer,
58 const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
59 args),
60 (override));
61
62 MOCK_METHOD(
63 bool, Write,
64 (absl::AnyInvocable<void(absl::Status)> on_writable,
65 grpc_event_engine::experimental::SliceBuffer* data,
66 const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
67 args),
68 (override));
69
70 MOCK_METHOD(
71 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
72 GetPeerAddress, (), (const, override));
73 MOCK_METHOD(
74 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
75 GetLocalAddress, (), (const, override));
76 };
77
78 class MockActivity : public Activity, public Wakeable {
79 public:
80 MOCK_METHOD(void, WakeupRequested, ());
81
ForceImmediateRepoll(WakeupMask)82 void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
Orphan()83 void Orphan() override {}
MakeOwningWaker()84 Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()85 Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)86 void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
WakeupAsync(WakeupMask)87 void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
Drop(WakeupMask)88 void Drop(WakeupMask /*mask*/) override {}
DebugTag() const89 std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const90 std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
91 return DebugTag();
92 }
93
Activate()94 void Activate() {
95 if (scoped_activity_ == nullptr) {
96 scoped_activity_ = std::make_unique<ScopedActivity>(this);
97 }
98 }
99
Deactivate()100 void Deactivate() { scoped_activity_.reset(); }
101
102 private:
103 std::unique_ptr<ScopedActivity> scoped_activity_;
104 };
105
106 class PromiseEndpointTest : public ::testing::Test {
107 public:
PromiseEndpointTest()108 PromiseEndpointTest()
109 : mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
110 mock_endpoint_(*mock_endpoint_ptr_),
111 promise_endpoint_(std::make_unique<PromiseEndpoint>(
112 std::unique_ptr<
113 grpc_event_engine::experimental::EventEngine::Endpoint>(
114 mock_endpoint_ptr_),
115 SliceBuffer())) {}
116
117 private:
118 MockEndpoint* mock_endpoint_ptr_;
119
120 protected:
121 MockEndpoint& mock_endpoint_;
122 std::unique_ptr<PromiseEndpoint> promise_endpoint_;
123
124 const absl::Status kDummyErrorStatus =
125 absl::ErrnoToStatus(5566, "just an error");
126 static constexpr size_t kDummyRequestSize = 5566u;
127 };
128
TEST_F(PromiseEndpointTest,OneReadSuccessful)129 TEST_F(PromiseEndpointTest, OneReadSuccessful) {
130 MockActivity activity;
131 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
132 activity.Activate();
133 EXPECT_CALL(activity, WakeupRequested).Times(0);
134 EXPECT_CALL(mock_endpoint_, Read)
135 .WillOnce(WithArgs<1>(
136 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
137 // Schedule mock_endpoint to read buffer.
138 grpc_event_engine::experimental::Slice slice(
139 grpc_slice_from_cpp_string(kBuffer));
140 buffer->Append(std::move(slice));
141 return true;
142 }));
143 auto promise = promise_endpoint_->Read(kBuffer.size());
144 auto poll = promise();
145 ASSERT_TRUE(poll.ready());
146 ASSERT_TRUE(poll.value().ok());
147 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
148 activity.Deactivate();
149 }
150
TEST_F(PromiseEndpointTest,OneReadFailed)151 TEST_F(PromiseEndpointTest, OneReadFailed) {
152 MockActivity activity;
153 activity.Activate();
154 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
155 EXPECT_CALL(mock_endpoint_, Read)
156 .WillOnce(WithArgs<0>(
157 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
158 // Mock EventEngine endpoint read fails.
159 read_callback(this->kDummyErrorStatus);
160 return false;
161 }));
162 auto promise = promise_endpoint_->Read(kDummyRequestSize);
163 auto poll = promise();
164 ASSERT_TRUE(poll.ready());
165 ASSERT_FALSE(poll.value().ok());
166 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
167 activity.Deactivate();
168 }
169
TEST_F(PromiseEndpointTest,MultipleReadsSuccessful)170 TEST_F(PromiseEndpointTest, MultipleReadsSuccessful) {
171 MockActivity activity;
172 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
173 activity.Activate();
174 EXPECT_CALL(activity, WakeupRequested).Times(0);
175 Sequence s;
176 EXPECT_CALL(mock_endpoint_, Read)
177 .InSequence(s)
178 .WillOnce(WithArg<1>(
179 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
180 // Schedule mock_endpoint to read buffer.
181 grpc_event_engine::experimental::Slice slice(
182 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
183 buffer->Append(std::move(slice));
184 return true;
185 }));
186 EXPECT_CALL(mock_endpoint_, Read)
187 .InSequence(s)
188 .WillOnce(WithArg<1>(
189 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
190 // Schedule mock_endpoint to read buffer.
191 grpc_event_engine::experimental::Slice slice(
192 grpc_slice_from_cpp_string(kBuffer.substr(4)));
193 buffer->Append(std::move(slice));
194 return true;
195 }));
196 {
197 auto promise = promise_endpoint_->Read(4u);
198 auto poll = promise();
199 ASSERT_TRUE(poll.ready());
200 ASSERT_TRUE(poll.value().ok());
201 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(0, 4));
202 }
203 {
204 auto promise = promise_endpoint_->Read(4u);
205 auto poll = promise();
206 ASSERT_TRUE(poll.ready());
207 ASSERT_TRUE(poll.value().ok());
208 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(4));
209 }
210 activity.Deactivate();
211 }
212
TEST_F(PromiseEndpointTest,OnePendingReadSuccessful)213 TEST_F(PromiseEndpointTest, OnePendingReadSuccessful) {
214 MockActivity activity;
215 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
216 absl::AnyInvocable<void(absl::Status)> read_callback;
217 activity.Activate();
218 EXPECT_CALL(activity, WakeupRequested).Times(1);
219 EXPECT_CALL(mock_endpoint_, Read)
220 .WillOnce(WithArgs<0, 1>(
221 [&read_callback, &kBuffer](
222 absl::AnyInvocable<void(absl::Status)> on_read,
223 grpc_event_engine::experimental::SliceBuffer* buffer) {
224 read_callback = std::move(on_read);
225 // Schedule mock_endpoint to read buffer.
226 grpc_event_engine::experimental::Slice slice(
227 grpc_slice_from_cpp_string(kBuffer));
228 buffer->Append(std::move(slice));
229 // Return false to mock EventEngine read not finish..
230 return false;
231 }));
232 auto promise = promise_endpoint_->Read(kBuffer.size());
233 EXPECT_TRUE(promise().pending());
234 // Mock EventEngine read succeeds, and promise resolves.
235 read_callback(absl::OkStatus());
236 auto poll = promise();
237 ASSERT_TRUE(poll.ready());
238 ASSERT_TRUE(poll.value().ok());
239 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
240 activity.Deactivate();
241 }
242
TEST_F(PromiseEndpointTest,OnePendingReadFailed)243 TEST_F(PromiseEndpointTest, OnePendingReadFailed) {
244 MockActivity activity;
245 absl::AnyInvocable<void(absl::Status)> read_callback;
246 activity.Activate();
247 EXPECT_CALL(activity, WakeupRequested).Times(1);
248 EXPECT_CALL(mock_endpoint_, Read)
249 .WillOnce(WithArgs<0>(
250 [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
251 read_callback = std::move(on_read);
252 // Return false to mock EventEngine read not finish.
253 return false;
254 }));
255 auto promise = promise_endpoint_->Read(kDummyRequestSize);
256 EXPECT_TRUE(promise().pending());
257 // Mock EventEngine read fails, and promise returns error.
258 read_callback(kDummyErrorStatus);
259 auto poll = promise();
260 ASSERT_TRUE(poll.ready());
261 ASSERT_FALSE(poll.value().ok());
262 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
263 activity.Deactivate();
264 }
265
TEST_F(PromiseEndpointTest,OneReadSliceSuccessful)266 TEST_F(PromiseEndpointTest, OneReadSliceSuccessful) {
267 MockActivity activity;
268 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
269 activity.Activate();
270 EXPECT_CALL(activity, WakeupRequested).Times(0);
271 EXPECT_CALL(mock_endpoint_, Read)
272 .WillOnce(WithArgs<1>(
273 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
274 // Schedule mock_endpoint to read buffer.
275 grpc_event_engine::experimental::Slice slice(
276 grpc_slice_from_cpp_string(kBuffer));
277 buffer->Append(std::move(slice));
278 return true;
279 }));
280 auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
281 auto poll = promise();
282 ASSERT_TRUE(poll.ready());
283 ASSERT_TRUE(poll.value().ok());
284 EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
285 activity.Deactivate();
286 }
287
TEST_F(PromiseEndpointTest,OneReadSliceFailed)288 TEST_F(PromiseEndpointTest, OneReadSliceFailed) {
289 MockActivity activity;
290 activity.Activate();
291 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
292 EXPECT_CALL(mock_endpoint_, Read)
293 .WillOnce(WithArgs<0>(
294 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
295 // Mock EventEngine endpoint read fails.
296 read_callback(this->kDummyErrorStatus);
297 return false;
298 }));
299 auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
300 auto poll = promise();
301 ASSERT_TRUE(poll.ready());
302 ASSERT_FALSE(poll.value().ok());
303 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
304 activity.Deactivate();
305 }
306
TEST_F(PromiseEndpointTest,MultipleReadSlicesSuccessful)307 TEST_F(PromiseEndpointTest, MultipleReadSlicesSuccessful) {
308 MockActivity activity;
309 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
310 activity.Activate();
311 EXPECT_CALL(activity, WakeupRequested).Times(0);
312 Sequence s;
313 EXPECT_CALL(mock_endpoint_, Read)
314 .InSequence(s)
315 .WillOnce(WithArg<1>(
316 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
317 // Schedule mock_endpoint to read buffer.
318 grpc_event_engine::experimental::Slice slice(
319 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
320 buffer->Append(std::move(slice));
321 return true;
322 }));
323 EXPECT_CALL(mock_endpoint_, Read)
324 .InSequence(s)
325 .WillOnce(WithArg<1>(
326 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
327 // Schedule mock_endpoint to read buffer.
328 grpc_event_engine::experimental::Slice slice(
329 grpc_slice_from_cpp_string(kBuffer.substr(4)));
330 buffer->Append(std::move(slice));
331 return true;
332 }));
333 {
334 auto promise = promise_endpoint_->ReadSlice(4u);
335 auto poll = promise();
336 ASSERT_TRUE(poll.ready());
337 ASSERT_TRUE(poll.value().ok());
338 EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(0, 4));
339 }
340 {
341 auto promise = promise_endpoint_->ReadSlice(4u);
342 auto poll = promise();
343 ASSERT_TRUE(poll.ready());
344 ASSERT_TRUE(poll.value().ok());
345 EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(4));
346 }
347 activity.Deactivate();
348 }
349
TEST_F(PromiseEndpointTest,OnePendingReadSliceSuccessful)350 TEST_F(PromiseEndpointTest, OnePendingReadSliceSuccessful) {
351 MockActivity activity;
352 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
353 absl::AnyInvocable<void(absl::Status)> read_callback;
354 activity.Activate();
355 EXPECT_CALL(activity, WakeupRequested).Times(1);
356 EXPECT_CALL(mock_endpoint_, Read)
357 .WillOnce(WithArgs<0, 1>(
358 [&read_callback, &kBuffer](
359 absl::AnyInvocable<void(absl::Status)> on_read,
360 grpc_event_engine::experimental::SliceBuffer* buffer) {
361 read_callback = std::move(on_read);
362 // Schedule mock_endpoint to read buffer.
363 grpc_event_engine::experimental::Slice slice(
364 grpc_slice_from_cpp_string(kBuffer));
365 buffer->Append(std::move(slice));
366 // Return false to mock EventEngine read not finish..
367 return false;
368 }));
369 auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
370 EXPECT_TRUE(promise().pending());
371 // Mock EventEngine read succeeds, and promise resolves.
372 read_callback(absl::OkStatus());
373 auto poll = promise();
374 ASSERT_TRUE(poll.ready());
375 ASSERT_TRUE(poll.value().ok());
376 EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
377 activity.Deactivate();
378 }
379
TEST_F(PromiseEndpointTest,OnePendingReadSliceFailed)380 TEST_F(PromiseEndpointTest, OnePendingReadSliceFailed) {
381 MockActivity activity;
382 absl::AnyInvocable<void(absl::Status)> read_callback;
383 activity.Activate();
384 EXPECT_CALL(activity, WakeupRequested).Times(1);
385 EXPECT_CALL(mock_endpoint_, Read)
386 .WillOnce(WithArgs<0>(
387 [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
388 read_callback = std::move(on_read);
389 // Return false to mock EventEngine read not finish.
390 return false;
391 }));
392 auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
393 EXPECT_TRUE(promise().pending());
394 // Mock EventEngine read fails, and promise returns error.
395 read_callback(kDummyErrorStatus);
396 auto poll = promise();
397 ASSERT_TRUE(poll.ready());
398 ASSERT_FALSE(poll.value().ok());
399 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
400 activity.Deactivate();
401 }
402
TEST_F(PromiseEndpointTest,OneReadByteSuccessful)403 TEST_F(PromiseEndpointTest, OneReadByteSuccessful) {
404 MockActivity activity;
405 const std::string kBuffer = {0x01};
406 activity.Activate();
407 EXPECT_CALL(activity, WakeupRequested).Times(0);
408 EXPECT_CALL(mock_endpoint_, Read)
409 .WillOnce(WithArgs<1>(
410 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
411 // Schedule mock_endpoint to read buffer.
412 grpc_event_engine::experimental::Slice slice(
413 grpc_slice_from_cpp_string(kBuffer));
414 buffer->Append(std::move(slice));
415 return true;
416 }));
417 auto promise = promise_endpoint_->ReadByte();
418 auto poll = promise();
419 ASSERT_TRUE(poll.ready());
420 ASSERT_TRUE(poll.value().ok());
421 EXPECT_EQ(*poll.value(), kBuffer[0]);
422 activity.Deactivate();
423 }
424
TEST_F(PromiseEndpointTest,OneReadByteFailed)425 TEST_F(PromiseEndpointTest, OneReadByteFailed) {
426 MockActivity activity;
427 activity.Activate();
428 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
429 EXPECT_CALL(mock_endpoint_, Read)
430 .WillOnce(WithArgs<0>(
431 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
432 // Mock EventEngine endpoint read fails.
433 read_callback(this->kDummyErrorStatus);
434 return false;
435 }));
436 auto promise = promise_endpoint_->ReadByte();
437 auto poll = promise();
438 ASSERT_TRUE(poll.ready());
439 ASSERT_FALSE(poll.value().ok());
440 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
441 activity.Deactivate();
442 }
443
TEST_F(PromiseEndpointTest,MultipleReadBytesSuccessful)444 TEST_F(PromiseEndpointTest, MultipleReadBytesSuccessful) {
445 MockActivity activity;
446 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
447 activity.Activate();
448 EXPECT_CALL(activity, WakeupRequested).Times(0);
449 EXPECT_CALL(mock_endpoint_, Read)
450 .WillOnce(WithArg<1>(
451 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
452 // Schedule mock_endpoint to read buffer.
453 grpc_event_engine::experimental::Slice slice(
454 grpc_slice_from_cpp_string(kBuffer));
455 buffer->Append(std::move(slice));
456 return true;
457 }));
458 for (size_t i = 0; i < kBuffer.size(); ++i) {
459 auto promise = promise_endpoint_->ReadByte();
460 auto poll = promise();
461 ASSERT_TRUE(poll.ready());
462 ASSERT_TRUE(poll.value().ok());
463 EXPECT_EQ(*poll.value(), kBuffer[i]);
464 }
465 activity.Deactivate();
466 }
467
TEST_F(PromiseEndpointTest,OnePendingReadByteSuccessful)468 TEST_F(PromiseEndpointTest, OnePendingReadByteSuccessful) {
469 MockActivity activity;
470 const std::string kBuffer = {0x01};
471 absl::AnyInvocable<void(absl::Status)> read_callback;
472 activity.Activate();
473 EXPECT_CALL(activity, WakeupRequested).Times(1);
474 EXPECT_CALL(mock_endpoint_, Read)
475 .WillOnce(WithArgs<0, 1>(
476 [&read_callback, &kBuffer](
477 absl::AnyInvocable<void(absl::Status)> on_read,
478 grpc_event_engine::experimental::SliceBuffer* buffer) {
479 read_callback = std::move(on_read);
480 // Schedule mock_endpoint to read buffer.
481 grpc_event_engine::experimental::Slice slice(
482 grpc_slice_from_cpp_string(kBuffer));
483 buffer->Append(std::move(slice));
484 // Return false to mock EventEngine read not finish..
485 return false;
486 }));
487 auto promise = promise_endpoint_->ReadByte();
488 ASSERT_TRUE(promise().pending());
489 // Mock EventEngine read succeeds, and promise resolves.
490 read_callback(absl::OkStatus());
491 auto poll = promise();
492 ASSERT_TRUE(poll.ready());
493 ASSERT_TRUE(poll.value().ok());
494 EXPECT_EQ(*poll.value(), kBuffer[0]);
495 activity.Deactivate();
496 }
497
TEST_F(PromiseEndpointTest,OnePendingReadByteFailed)498 TEST_F(PromiseEndpointTest, OnePendingReadByteFailed) {
499 MockActivity activity;
500 absl::AnyInvocable<void(absl::Status)> read_callback;
501 activity.Activate();
502 EXPECT_CALL(activity, WakeupRequested).Times(1);
503 EXPECT_CALL(mock_endpoint_, Read)
504 .WillOnce(WithArgs<0>(
505 [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
506 read_callback = std::move(on_read);
507 // Return false to mock EventEngine read not finish.
508 return false;
509 }));
510 auto promise = promise_endpoint_->ReadByte();
511 ASSERT_TRUE(promise().pending());
512 // Mock EventEngine read fails, and promise returns error.
513 read_callback(kDummyErrorStatus);
514 auto poll = promise();
515 ASSERT_TRUE(poll.ready());
516 ASSERT_FALSE(poll.value().ok());
517 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
518 activity.Deactivate();
519 }
520
TEST_F(PromiseEndpointTest,OneWriteSuccessful)521 TEST_F(PromiseEndpointTest, OneWriteSuccessful) {
522 MockActivity activity;
523 activity.Activate();
524 EXPECT_CALL(activity, WakeupRequested).Times(0);
525 EXPECT_CALL(mock_endpoint_, Write).WillOnce(Return(true));
526 auto promise = promise_endpoint_->Write(
527 SliceBuffer(Slice::FromCopiedString("hello world")));
528 auto poll = promise();
529 ASSERT_TRUE(poll.ready());
530 EXPECT_EQ(absl::OkStatus(), poll.value());
531 activity.Deactivate();
532 }
533
TEST_F(PromiseEndpointTest,EmptyWriteIsNoOp)534 TEST_F(PromiseEndpointTest, EmptyWriteIsNoOp) {
535 MockActivity activity;
536 activity.Activate();
537 EXPECT_CALL(activity, WakeupRequested).Times(0);
538 EXPECT_CALL(mock_endpoint_, Write).Times(0);
539 auto promise = promise_endpoint_->Write(SliceBuffer());
540 auto poll = promise();
541 ASSERT_TRUE(poll.ready());
542 EXPECT_EQ(absl::OkStatus(), poll.value());
543 activity.Deactivate();
544 }
545
TEST_F(PromiseEndpointTest,OneWriteFailed)546 TEST_F(PromiseEndpointTest, OneWriteFailed) {
547 MockActivity activity;
548 activity.Activate();
549 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
550 EXPECT_CALL(mock_endpoint_, Write)
551 .WillOnce(
552 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
553 on_write(this->kDummyErrorStatus);
554 return false;
555 }));
556 auto promise = promise_endpoint_->Write(
557 SliceBuffer(Slice::FromCopiedString("hello world")));
558 auto poll = promise();
559 ASSERT_TRUE(poll.ready());
560 EXPECT_EQ(kDummyErrorStatus, poll.value());
561 activity.Deactivate();
562 }
563
TEST_F(PromiseEndpointTest,OnePendingWriteSuccessful)564 TEST_F(PromiseEndpointTest, OnePendingWriteSuccessful) {
565 MockActivity activity;
566 absl::AnyInvocable<void(absl::Status)> write_callback;
567 activity.Activate();
568 EXPECT_CALL(activity, WakeupRequested).Times(1);
569 EXPECT_CALL(mock_endpoint_, Write)
570 .WillOnce(WithArgs<0, 1>(
571 [&write_callback](
572 absl::AnyInvocable<void(absl::Status)> on_write,
573 grpc_event_engine::experimental::SliceBuffer* buffer) {
574 write_callback = std::move(on_write);
575 // Schedule mock_endpoint to write buffer.
576 buffer->Append(grpc_event_engine::experimental::Slice());
577 // Return false to mock EventEngine write pending..
578 return false;
579 }));
580 auto promise = promise_endpoint_->Write(
581 SliceBuffer(Slice::FromCopiedString("hello world")));
582 EXPECT_TRUE(promise().pending());
583 // Mock EventEngine write succeeds, and promise resolves.
584 write_callback(absl::OkStatus());
585 auto poll = promise();
586 ASSERT_TRUE(poll.ready());
587 EXPECT_EQ(absl::OkStatus(), poll.value());
588 activity.Deactivate();
589 }
590
TEST_F(PromiseEndpointTest,OnePendingWriteFailed)591 TEST_F(PromiseEndpointTest, OnePendingWriteFailed) {
592 MockActivity activity;
593 absl::AnyInvocable<void(absl::Status)> write_callback;
594 activity.Activate();
595 EXPECT_CALL(activity, WakeupRequested).Times(1);
596 EXPECT_CALL(mock_endpoint_, Write)
597 .WillOnce(WithArg<0>(
598 [&write_callback](absl::AnyInvocable<void(absl::Status)> on_write) {
599 write_callback = std::move(on_write);
600 // Return false to mock EventEngine write pending..
601 return false;
602 }));
603 auto promise = promise_endpoint_->Write(
604 SliceBuffer(Slice::FromCopiedString("hello world")));
605 EXPECT_TRUE(promise().pending());
606 write_callback(kDummyErrorStatus);
607 auto poll = promise();
608 ASSERT_TRUE(poll.ready());
609 EXPECT_EQ(kDummyErrorStatus, poll.value());
610 activity.Deactivate();
611 }
612
TEST_F(PromiseEndpointTest,GetPeerAddress)613 TEST_F(PromiseEndpointTest, GetPeerAddress) {
614 const char raw_test_address[] = {0x55, 0x66, 0x01, 0x55, 0x66, 0x01};
615 grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
616 reinterpret_cast<const sockaddr*>(raw_test_address),
617 sizeof(raw_test_address));
618 EXPECT_CALL(mock_endpoint_, GetPeerAddress).WillOnce(ReturnRef(test_address));
619 auto peer_address = promise_endpoint_->GetPeerAddress();
620 EXPECT_EQ(0, std::memcmp(test_address.address(), test_address.address(),
621 test_address.size()));
622 EXPECT_EQ(test_address.size(), peer_address.size());
623 }
624
TEST_F(PromiseEndpointTest,GetLocalAddress)625 TEST_F(PromiseEndpointTest, GetLocalAddress) {
626 const char raw_test_address[] = {0x52, 0x55, 0x66, 0x52, 0x55, 0x66};
627 grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
628 reinterpret_cast<const sockaddr*>(raw_test_address),
629 sizeof(raw_test_address));
630 EXPECT_CALL(mock_endpoint_, GetLocalAddress)
631 .WillOnce(ReturnRef(test_address));
632 auto local_address = promise_endpoint_->GetLocalAddress();
633 EXPECT_EQ(0, std::memcmp(test_address.address(), local_address.address(),
634 test_address.size()));
635 EXPECT_EQ(test_address.size(), local_address.size());
636 }
637
TEST_F(PromiseEndpointTest,DestroyedBeforeReadCompletes)638 TEST_F(PromiseEndpointTest, DestroyedBeforeReadCompletes) {
639 MockActivity activity;
640 const std::string kBuffer = {0x01};
641 absl::AnyInvocable<void(absl::Status)> read_callback;
642 activity.Activate();
643 EXPECT_CALL(activity, WakeupRequested).Times(1);
644 EXPECT_CALL(mock_endpoint_, Read)
645 .WillOnce(WithArgs<0, 1>(
646 [&read_callback, &kBuffer](
647 absl::AnyInvocable<void(absl::Status)> on_read,
648 grpc_event_engine::experimental::SliceBuffer* buffer) {
649 read_callback = std::move(on_read);
650 // Schedule mock_endpoint to read buffer.
651 grpc_event_engine::experimental::Slice slice(
652 grpc_slice_from_cpp_string(kBuffer));
653 buffer->Append(std::move(slice));
654 // Return false to mock EventEngine read not finish..
655 return false;
656 }));
657 auto promise = promise_endpoint_->ReadByte();
658 ASSERT_TRUE(promise().pending());
659 promise_endpoint_.reset();
660 // Mock EventEngine read succeeds, and promise resolves.
661 read_callback(absl::OkStatus());
662 auto poll = promise();
663 ASSERT_TRUE(poll.ready());
664 ASSERT_TRUE(poll.value().ok());
665 EXPECT_EQ(*poll.value(), kBuffer[0]);
666 activity.Deactivate();
667 }
668
669 class MultiplePromiseEndpointTest : public ::testing::Test {
670 public:
MultiplePromiseEndpointTest()671 MultiplePromiseEndpointTest()
672 : first_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
673 second_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
674 first_mock_endpoint_(*first_mock_endpoint_ptr_),
675 second_mock_endpoint_(*second_mock_endpoint_ptr_),
676 first_promise_endpoint_(
677 std::unique_ptr<
678 grpc_event_engine::experimental::EventEngine::Endpoint>(
679 first_mock_endpoint_ptr_),
680 SliceBuffer()),
681 second_promise_endpoint_(
682 std::unique_ptr<
683 grpc_event_engine::experimental::EventEngine::Endpoint>(
684 second_mock_endpoint_ptr_),
685 SliceBuffer()) {}
686
687 private:
688 MockEndpoint* first_mock_endpoint_ptr_;
689 MockEndpoint* second_mock_endpoint_ptr_;
690
691 protected:
692 MockEndpoint& first_mock_endpoint_;
693 MockEndpoint& second_mock_endpoint_;
694 PromiseEndpoint first_promise_endpoint_;
695 PromiseEndpoint second_promise_endpoint_;
696
697 const absl::Status kDummyErrorStatus =
698 absl::ErrnoToStatus(5566, "just an error");
699 static constexpr size_t kDummyRequestSize = 5566u;
700 };
701
TEST_F(MultiplePromiseEndpointTest,JoinReadsSuccessful)702 TEST_F(MultiplePromiseEndpointTest, JoinReadsSuccessful) {
703 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
704 EXPECT_CALL(first_mock_endpoint_, Read)
705 .WillOnce(WithArgs<1>(
706 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
707 // Schedule mock_endpoint to read buffer.
708 grpc_event_engine::experimental::Slice slice(
709 grpc_slice_from_cpp_string(kBuffer));
710 buffer->Append(std::move(slice));
711 return true;
712 }));
713 EXPECT_CALL(second_mock_endpoint_, Read)
714 .WillOnce(WithArgs<1>(
715 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
716 // Schedule mock_endpoint to read buffer.
717 grpc_event_engine::experimental::Slice slice(
718 grpc_slice_from_cpp_string(kBuffer));
719 buffer->Append(std::move(slice));
720 return true;
721 }));
722 StrictMock<MockFunction<void(absl::Status)>> on_done;
723 EXPECT_CALL(on_done, Call(absl::OkStatus()));
724 auto activity = MakeActivity(
725 [this, &kBuffer] {
726 return Seq(Join(this->first_promise_endpoint_.Read(kBuffer.size()),
727 this->second_promise_endpoint_.Read(kBuffer.size())),
728 [](std::tuple<absl::StatusOr<SliceBuffer>,
729 absl::StatusOr<SliceBuffer>>
730 ret) {
731 // Both reads finish with `absl::OkStatus`.
732 EXPECT_TRUE(std::get<0>(ret).ok());
733 EXPECT_TRUE(std::get<1>(ret).ok());
734 return absl::OkStatus();
735 });
736 },
737 InlineWakeupScheduler(),
738 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
739 }
740
TEST_F(MultiplePromiseEndpointTest,JoinOneReadSuccessfulOneReadFailed)741 TEST_F(MultiplePromiseEndpointTest, JoinOneReadSuccessfulOneReadFailed) {
742 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
743 EXPECT_CALL(first_mock_endpoint_, Read)
744 .WillOnce(WithArgs<1>(
745 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
746 // Schedule mock_endpoint to read buffer.
747 grpc_event_engine::experimental::Slice slice(
748 grpc_slice_from_cpp_string(kBuffer));
749 buffer->Append(std::move(slice));
750 return true;
751 }));
752 EXPECT_CALL(second_mock_endpoint_, Read)
753 .WillOnce(WithArgs<0>(
754 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
755 // Mock EventEngine endpoint read fails.
756 read_callback(this->kDummyErrorStatus);
757 return false;
758 }));
759 StrictMock<MockFunction<void(absl::Status)>> on_done;
760 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
761 auto activity = MakeActivity(
762 [this, &kBuffer] {
763 return Seq(
764 Join(this->first_promise_endpoint_.Read(kBuffer.size()),
765 this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
766 [this](std::tuple<absl::StatusOr<SliceBuffer>,
767 absl::StatusOr<SliceBuffer>>
768 ret) {
769 // One read finishes with `absl::OkStatus` and the other read
770 // fails.
771 EXPECT_TRUE(std::get<0>(ret).ok());
772 EXPECT_FALSE(std::get<1>(ret).ok());
773 EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
774 return this->kDummyErrorStatus;
775 });
776 },
777 InlineWakeupScheduler(),
778 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
779 }
780
TEST_F(MultiplePromiseEndpointTest,JoinReadsFailed)781 TEST_F(MultiplePromiseEndpointTest, JoinReadsFailed) {
782 EXPECT_CALL(first_mock_endpoint_, Read)
783 .WillOnce(WithArgs<0>(
784 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
785 // Mock EventEngine endpoint read fails.
786 read_callback(this->kDummyErrorStatus);
787 return false;
788 }));
789 EXPECT_CALL(second_mock_endpoint_, Read)
790 .WillOnce(WithArgs<0>(
791 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
792 // Mock EventEngine endpoint read fails.
793 read_callback(this->kDummyErrorStatus);
794 return false;
795 }));
796 StrictMock<MockFunction<void(absl::Status)>> on_done;
797 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
798 auto activity = MakeActivity(
799 [this] {
800 return Seq(
801 Join(this->first_promise_endpoint_.Read(this->kDummyRequestSize),
802 this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
803 [this](std::tuple<absl::StatusOr<SliceBuffer>,
804 absl::StatusOr<SliceBuffer>>
805 ret) {
806 // Both reads finish with errors.
807 EXPECT_FALSE(std::get<0>(ret).ok());
808 EXPECT_FALSE(std::get<1>(ret).ok());
809 EXPECT_EQ(std::get<0>(ret).status(), this->kDummyErrorStatus);
810 EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
811 return this->kDummyErrorStatus;
812 });
813 },
814 InlineWakeupScheduler(),
815 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
816 }
817
TEST_F(MultiplePromiseEndpointTest,JoinWritesSuccessful)818 TEST_F(MultiplePromiseEndpointTest, JoinWritesSuccessful) {
819 EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
820 EXPECT_CALL(second_mock_endpoint_, Write).WillOnce(Return(true));
821 StrictMock<MockFunction<void(absl::Status)>> on_done;
822 EXPECT_CALL(on_done, Call(absl::OkStatus()));
823 auto activity = MakeActivity(
824 [this] {
825 return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
826 Slice::FromCopiedString("hello world"))),
827 this->second_promise_endpoint_.Write(SliceBuffer(
828 Slice::FromCopiedString("hello world")))),
829 [](std::tuple<absl::Status, absl::Status> ret) {
830 // Both writes finish with `absl::OkStatus`.
831 EXPECT_TRUE(std::get<0>(ret).ok());
832 EXPECT_TRUE(std::get<1>(ret).ok());
833 return absl::OkStatus();
834 });
835 },
836 InlineWakeupScheduler(),
837 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
838 }
839
TEST_F(MultiplePromiseEndpointTest,JoinOneWriteSuccessfulOneWriteFailed)840 TEST_F(MultiplePromiseEndpointTest, JoinOneWriteSuccessfulOneWriteFailed) {
841 EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
842 EXPECT_CALL(second_mock_endpoint_, Write)
843 .WillOnce(
844 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
845 on_write(this->kDummyErrorStatus);
846 return false;
847 }));
848 StrictMock<MockFunction<void(absl::Status)>> on_done;
849 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
850 auto activity = MakeActivity(
851 [this] {
852 return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
853 Slice::FromCopiedString("hello world"))),
854 this->second_promise_endpoint_.Write(SliceBuffer(
855 Slice::FromCopiedString("hello world")))),
856 [this](std::tuple<absl::Status, absl::Status> ret) {
857 // One write finish with `absl::OkStatus` and the other
858 // write fails.
859 EXPECT_TRUE(std::get<0>(ret).ok());
860 EXPECT_FALSE(std::get<1>(ret).ok());
861 EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
862 return this->kDummyErrorStatus;
863 });
864 },
865 InlineWakeupScheduler(),
866 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
867 }
868
TEST_F(MultiplePromiseEndpointTest,JoinWritesFailed)869 TEST_F(MultiplePromiseEndpointTest, JoinWritesFailed) {
870 EXPECT_CALL(first_mock_endpoint_, Write)
871 .WillOnce(
872 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
873 on_write(this->kDummyErrorStatus);
874 return false;
875 }));
876 EXPECT_CALL(second_mock_endpoint_, Write)
877 .WillOnce(
878 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
879 on_write(this->kDummyErrorStatus);
880 return false;
881 }));
882 StrictMock<MockFunction<void(absl::Status)>> on_done;
883 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
884 auto activity = MakeActivity(
885 [this] {
886 return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
887 Slice::FromCopiedString("hello world"))),
888 this->second_promise_endpoint_.Write(SliceBuffer(
889 Slice::FromCopiedString("hello world")))),
890 [this](std::tuple<absl::Status, absl::Status> ret) {
891 // Both writes fail with errors.
892 EXPECT_FALSE(std::get<0>(ret).ok());
893 EXPECT_FALSE(std::get<1>(ret).ok());
894 EXPECT_EQ(std::get<0>(ret), this->kDummyErrorStatus);
895 EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
896 return this->kDummyErrorStatus;
897 });
898 },
899 InlineWakeupScheduler(),
900 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
901 }
902
903 } // namespace testing
904 } // namespace grpc_core
905
main(int argc,char ** argv)906 int main(int argc, char** argv) {
907 ::testing::InitGoogleTest(&argc, argv);
908 return RUN_ALL_TESTS();
909 }
910