1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/raw/client_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_rpc/raw/client_testing.h"
20 #include "pw_rpc/writer.h"
21 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
22 #include "pw_unit_test/framework.h"
23
24 namespace pw::rpc {
25 namespace {
26
27 using test::pw_rpc::raw::TestService;
28
FailIfCalled(Status)29 void FailIfCalled(Status) { FAIL(); }
FailIfOnNextCalled(ConstByteSpan)30 void FailIfOnNextCalled(ConstByteSpan) { FAIL(); }
FailIfOnCompletedCalled(ConstByteSpan,Status)31 void FailIfOnCompletedCalled(ConstByteSpan, Status) { FAIL(); }
32
TEST(RawUnaryReceiver,DefaultConstructed)33 TEST(RawUnaryReceiver, DefaultConstructed) {
34 RawUnaryReceiver call;
35
36 ASSERT_FALSE(call.active());
37 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
38
39 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
40
41 call.set_on_completed([](ConstByteSpan, Status) {});
42 call.set_on_error([](Status) {});
43 }
44
TEST(RawClientWriter,DefaultConstructed)45 TEST(RawClientWriter, DefaultConstructed) {
46 RawClientWriter call;
47
48 ASSERT_FALSE(call.active());
49 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
50
51 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
52 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
53 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
54
55 call.set_on_completed([](ConstByteSpan, Status) {});
56 call.set_on_error([](Status) {});
57 }
58
TEST(RawClientReader,DefaultConstructed)59 TEST(RawClientReader, DefaultConstructed) {
60 RawClientReader call;
61
62 ASSERT_FALSE(call.active());
63 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
64
65 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
66 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
67
68 call.set_on_completed([](Status) {});
69 call.set_on_next([](ConstByteSpan) {});
70 call.set_on_error([](Status) {});
71 }
72
TEST(RawClientReaderWriter,DefaultConstructed)73 TEST(RawClientReaderWriter, DefaultConstructed) {
74 RawClientReaderWriter call;
75
76 ASSERT_FALSE(call.active());
77 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
78
79 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
80 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
81 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
82
83 call.set_on_completed([](Status) {});
84 call.set_on_next([](ConstByteSpan) {});
85 call.set_on_error([](Status) {});
86 }
87
TEST(RawClientWriter,RequestCompletion)88 TEST(RawClientWriter, RequestCompletion) {
89 RawClientTestContext ctx;
90 RawClientWriter call = TestService::TestClientStreamRpc(
91 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
92 ASSERT_EQ(OkStatus(), call.RequestCompletion());
93
94 ASSERT_TRUE(call.active());
95 EXPECT_EQ(call.channel_id(), ctx.channel().id());
96
97 EXPECT_EQ(OkStatus(), call.Write({}));
98 EXPECT_EQ(OkStatus(), call.RequestCompletion());
99 EXPECT_EQ(OkStatus(), call.Cancel());
100
101 call.set_on_completed([](ConstByteSpan, Status) {});
102 call.set_on_error([](Status) {});
103 }
104
TEST(RawClientReader,RequestCompletion)105 TEST(RawClientReader, RequestCompletion) {
106 RawClientTestContext ctx;
107 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
108 ctx.channel().id(),
109 {},
110 FailIfOnNextCalled,
111 FailIfCalled,
112 FailIfCalled);
113 ASSERT_EQ(OkStatus(), call.RequestCompletion());
114
115 ASSERT_TRUE(call.active());
116 EXPECT_EQ(call.channel_id(), ctx.channel().id());
117
118 EXPECT_EQ(OkStatus(), call.RequestCompletion());
119 EXPECT_EQ(OkStatus(), call.Cancel());
120
121 call.set_on_completed([](Status) {});
122 call.set_on_next([](ConstByteSpan) {});
123 call.set_on_error([](Status) {});
124 }
125
TEST(RawClientReaderWriter,RequestCompletion)126 TEST(RawClientReaderWriter, RequestCompletion) {
127 RawClientTestContext ctx;
128 RawClientReaderWriter call =
129 TestService::TestBidirectionalStreamRpc(ctx.client(),
130 ctx.channel().id(),
131 FailIfOnNextCalled,
132 FailIfCalled,
133 FailIfCalled);
134 ASSERT_EQ(OkStatus(), call.RequestCompletion());
135
136 ASSERT_TRUE(call.active());
137 EXPECT_EQ(call.channel_id(), ctx.channel().id());
138
139 EXPECT_EQ(OkStatus(), call.Write({}));
140 EXPECT_EQ(OkStatus(), call.RequestCompletion());
141 EXPECT_EQ(OkStatus(), call.Cancel());
142
143 call.set_on_completed([](Status) {});
144 call.set_on_next([](ConstByteSpan) {});
145 call.set_on_error([](Status) {});
146 }
147
TEST(RawUnaryReceiver,Cancel)148 TEST(RawUnaryReceiver, Cancel) {
149 RawClientTestContext ctx;
150 RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
151 ctx.channel().id(),
152 {},
153 FailIfOnCompletedCalled,
154 FailIfCalled);
155 ASSERT_EQ(OkStatus(), call.Cancel());
156
157 // Additional calls should do nothing and return FAILED_PRECONDITION.
158 ASSERT_EQ(Status::FailedPrecondition(), call.Cancel());
159 ASSERT_EQ(Status::FailedPrecondition(), call.Cancel());
160
161 ASSERT_FALSE(call.active());
162 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
163
164 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
165
166 call.set_on_completed([](ConstByteSpan, Status) {});
167 call.set_on_error([](Status) {});
168
169 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
170 }
171
TEST(RawClientWriter,Cancel)172 TEST(RawClientWriter, Cancel) {
173 RawClientTestContext ctx;
174 RawClientWriter call = TestService::TestClientStreamRpc(
175 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
176 ASSERT_EQ(OkStatus(), call.Cancel());
177
178 ASSERT_FALSE(call.active());
179 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
180
181 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
182 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
183 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
184
185 call.set_on_completed([](ConstByteSpan, Status) {});
186 call.set_on_error([](Status) {});
187
188 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
189 }
190
TEST(RawClientReader,Cancel)191 TEST(RawClientReader, Cancel) {
192 RawClientTestContext ctx;
193 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
194 ctx.channel().id(),
195 {},
196 FailIfOnNextCalled,
197 FailIfCalled,
198 FailIfCalled);
199 ASSERT_EQ(OkStatus(), call.Cancel());
200
201 ASSERT_FALSE(call.active());
202 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
203
204 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
205 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
206
207 call.set_on_completed([](Status) {});
208 call.set_on_next([](ConstByteSpan) {});
209 call.set_on_error([](Status) {});
210
211 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
212 }
213
TEST(RawClientReaderWriter,Cancel)214 TEST(RawClientReaderWriter, Cancel) {
215 RawClientTestContext ctx;
216 RawClientReaderWriter call =
217 TestService::TestBidirectionalStreamRpc(ctx.client(),
218 ctx.channel().id(),
219 FailIfOnNextCalled,
220 FailIfCalled,
221 FailIfCalled);
222 ASSERT_EQ(OkStatus(), call.Cancel());
223
224 ASSERT_FALSE(call.active());
225 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
226
227 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
228 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
229 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
230
231 call.set_on_completed([](Status) {});
232 call.set_on_next([](ConstByteSpan) {});
233 call.set_on_error([](Status) {});
234
235 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & cancellation only
236 }
237
TEST(RawUnaryReceiver,Abandon)238 TEST(RawUnaryReceiver, Abandon) {
239 RawClientTestContext ctx;
240 RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
241 ctx.channel().id(),
242 {},
243 FailIfOnCompletedCalled,
244 FailIfCalled);
245 call.Abandon();
246
247 ASSERT_FALSE(call.active());
248 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
249
250 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
251
252 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
253 }
254
TEST(RawClientWriter,Abandon)255 TEST(RawClientWriter, Abandon) {
256 RawClientTestContext ctx;
257 RawClientWriter call = TestService::TestClientStreamRpc(
258 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
259 call.Abandon();
260
261 ASSERT_FALSE(call.active());
262 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
263
264 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
265 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
266 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
267
268 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
269 }
270
TEST(RawClientReader,Abandon)271 TEST(RawClientReader, Abandon) {
272 RawClientTestContext ctx;
273 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
274 ctx.channel().id(),
275 {},
276 FailIfOnNextCalled,
277 FailIfCalled,
278 FailIfCalled);
279 call.Abandon();
280
281 ASSERT_FALSE(call.active());
282 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
283
284 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
285 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
286
287 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
288 }
289
TEST(RawClientReaderWriter,Abandon)290 TEST(RawClientReaderWriter, Abandon) {
291 RawClientTestContext ctx;
292 RawClientReaderWriter call =
293 TestService::TestBidirectionalStreamRpc(ctx.client(),
294 ctx.channel().id(),
295 FailIfOnNextCalled,
296 FailIfCalled,
297 FailIfCalled);
298 call.Abandon();
299
300 ASSERT_FALSE(call.active());
301 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
302
303 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
304 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
305 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
306
307 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
308 }
309
TEST(RawUnaryReceiver,CloseAndWaitForCallbacks)310 TEST(RawUnaryReceiver, CloseAndWaitForCallbacks) {
311 RawClientTestContext ctx;
312 RawUnaryReceiver call = TestService::TestUnaryRpc(ctx.client(),
313 ctx.channel().id(),
314 {},
315 FailIfOnCompletedCalled,
316 FailIfCalled);
317 call.CloseAndWaitForCallbacks();
318
319 ASSERT_FALSE(call.active());
320 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
321
322 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
323
324 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
325 }
326
TEST(RawClientWriter,CloseAndWaitForCallbacks)327 TEST(RawClientWriter, CloseAndWaitForCallbacks) {
328 RawClientTestContext ctx;
329 RawClientWriter call = TestService::TestClientStreamRpc(
330 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
331 call.CloseAndWaitForCallbacks();
332
333 ASSERT_FALSE(call.active());
334 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
335
336 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
337 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
338 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
339
340 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
341 }
342
TEST(RawClientReader,CloseAndWaitForCallbacks)343 TEST(RawClientReader, CloseAndWaitForCallbacks) {
344 RawClientTestContext ctx;
345 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
346 ctx.channel().id(),
347 {},
348 FailIfOnNextCalled,
349 FailIfCalled,
350 FailIfCalled);
351 call.CloseAndWaitForCallbacks();
352
353 ASSERT_FALSE(call.active());
354 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
355
356 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
357 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
358
359 EXPECT_EQ(ctx.output().total_packets(), 1u); // request only
360 }
361
TEST(RawClientReaderWriter,CloseAndWaitForCallbacks)362 TEST(RawClientReaderWriter, CloseAndWaitForCallbacks) {
363 RawClientTestContext ctx;
364 RawClientReaderWriter call =
365 TestService::TestBidirectionalStreamRpc(ctx.client(),
366 ctx.channel().id(),
367 FailIfOnNextCalled,
368 FailIfCalled,
369 FailIfCalled);
370 call.CloseAndWaitForCallbacks();
371
372 ASSERT_FALSE(call.active());
373 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
374
375 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
376 EXPECT_EQ(Status::FailedPrecondition(), call.Cancel());
377 EXPECT_EQ(Status::FailedPrecondition(), call.RequestCompletion());
378
379 EXPECT_EQ(ctx.output().total_packets(), 2u); // request & client stream end
380 }
381
TEST(RawClientReaderWriter,Move_InactiveToActive_EndsClientStream)382 TEST(RawClientReaderWriter, Move_InactiveToActive_EndsClientStream) {
383 RawClientTestContext ctx;
384
385 RawClientReaderWriter active_call =
386 TestService::TestBidirectionalStreamRpc(ctx.client(),
387 ctx.channel().id(),
388 FailIfOnNextCalled,
389 FailIfCalled,
390 FailIfCalled);
391
392 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
393
394 RawClientReaderWriter inactive_call;
395
396 active_call = std::move(inactive_call);
397
398 EXPECT_EQ(ctx.output().total_packets(),
399 2u); // Sent CLIENT_REQUEST_COMPLETION
400 EXPECT_EQ(
401 ctx.output()
402 .client_stream_end_packets<TestService::TestBidirectionalStreamRpc>(),
403 1u);
404
405 EXPECT_FALSE(active_call.active());
406 // NOLINTNEXTLINE(bugprone-use-after-move)
407 EXPECT_FALSE(inactive_call.active());
408 }
409
TEST(RawUnaryReceiver,Move_InactiveToActive_SilentlyCloses)410 TEST(RawUnaryReceiver, Move_InactiveToActive_SilentlyCloses) {
411 RawClientTestContext ctx;
412
413 RawUnaryReceiver active_call =
414 TestService::TestUnaryRpc(ctx.client(),
415 ctx.channel().id(),
416 {},
417 FailIfOnCompletedCalled,
418 FailIfCalled);
419
420 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
421
422 RawUnaryReceiver inactive_call;
423
424 active_call = std::move(inactive_call);
425
426 EXPECT_EQ(ctx.output().total_packets(), 1u); // No more packets
427
428 EXPECT_FALSE(active_call.active());
429 // NOLINTNEXTLINE(bugprone-use-after-move)
430 EXPECT_FALSE(inactive_call.active());
431 }
432
TEST(RawUnaryReceiver,Move_ActiveToActive)433 TEST(RawUnaryReceiver, Move_ActiveToActive) {
434 RawClientTestContext ctx;
435
436 RawUnaryReceiver active_call_1 =
437 TestService::TestUnaryRpc(ctx.client(), ctx.channel().id(), {});
438
439 RawUnaryReceiver active_call_2 =
440 TestService::TestAnotherUnaryRpc(ctx.client(), ctx.channel().id(), {});
441
442 ASSERT_EQ(ctx.output().total_packets(), 2u); // Sent the requests
443 ASSERT_TRUE(active_call_1.active());
444 ASSERT_TRUE(active_call_2.active());
445
446 active_call_2 = std::move(active_call_1);
447
448 EXPECT_EQ(ctx.output().total_packets(), 2u); // No more packets
449
450 // NOLINTNEXTLINE(bugprone-use-after-move)
451 EXPECT_FALSE(active_call_1.active());
452 EXPECT_TRUE(active_call_2.active());
453 }
454
TEST(RawUnaryReceiver,InvalidChannelId)455 TEST(RawUnaryReceiver, InvalidChannelId) {
456 RawClientTestContext ctx;
457 std::optional<Status> error;
458
459 RawUnaryReceiver call = TestService::TestUnaryRpc(
460 ctx.client(), 1290341, {}, {}, [&error](Status status) {
461 error = status;
462 });
463 EXPECT_FALSE(call.active());
464 EXPECT_EQ(error, Status::Unavailable());
465 }
466
TEST(RawClientReader,NoClientStream_OutOfScope_SilentlyCloses)467 TEST(RawClientReader, NoClientStream_OutOfScope_SilentlyCloses) {
468 RawClientTestContext ctx;
469
470 {
471 RawClientReader call = TestService::TestServerStreamRpc(ctx.client(),
472 ctx.channel().id(),
473 {},
474 FailIfOnNextCalled,
475 FailIfCalled,
476 FailIfCalled);
477 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
478 }
479
480 EXPECT_EQ(ctx.output().total_packets(), 1u); // No more packets
481 }
482
TEST(RawClientWriter,WithClientStream_OutOfScope_SendsClientStreamEnd)483 TEST(RawClientWriter, WithClientStream_OutOfScope_SendsClientStreamEnd) {
484 RawClientTestContext ctx;
485
486 {
487 RawClientWriter call =
488 TestService::TestClientStreamRpc(ctx.client(),
489 ctx.channel().id(),
490 FailIfOnCompletedCalled,
491 FailIfCalled);
492 ASSERT_EQ(ctx.output().total_packets(), 1u); // Sent the request
493 }
494
495 EXPECT_EQ(ctx.output().total_packets(),
496 2u); // Sent CLIENT_REQUEST_COMPLETION
497 EXPECT_EQ(ctx.output()
498 .client_stream_end_packets<TestService::TestClientStreamRpc>(),
499 1u);
500 }
501
502 constexpr const char kWriterData[] = "20X6";
503
WriteAsWriter(Writer & writer)504 void WriteAsWriter(Writer& writer) {
505 ASSERT_TRUE(writer.active());
506 ASSERT_EQ(writer.channel_id(), RawClientTestContext<>::kDefaultChannelId);
507
508 EXPECT_EQ(OkStatus(), writer.Write(as_bytes(span(kWriterData))));
509 }
510
TEST(RawClientWriter,UsableAsWriter)511 TEST(RawClientWriter, UsableAsWriter) {
512 RawClientTestContext ctx;
513 RawClientWriter call = TestService::TestClientStreamRpc(
514 ctx.client(), ctx.channel().id(), FailIfOnCompletedCalled, FailIfCalled);
515
516 WriteAsWriter(call.as_writer());
517
518 EXPECT_STREQ(reinterpret_cast<const char*>(
519 ctx.output()
520 .payloads<TestService::TestClientStreamRpc>()
521 .back()
522 .data()),
523 kWriterData);
524 }
525
TEST(RawClientReaderWriter,UsableAsWriter)526 TEST(RawClientReaderWriter, UsableAsWriter) {
527 RawClientTestContext ctx;
528 RawClientReaderWriter call =
529 TestService::TestBidirectionalStreamRpc(ctx.client(),
530 ctx.channel().id(),
531 FailIfOnNextCalled,
532 FailIfCalled,
533 FailIfCalled);
534
535 WriteAsWriter(call.as_writer());
536
537 EXPECT_STREQ(reinterpret_cast<const char*>(
538 ctx.output()
539 .payloads<TestService::TestBidirectionalStreamRpc>()
540 .back()
541 .data()),
542 kWriterData);
543 }
544
span_as_cstr(ConstByteSpan span)545 const char* span_as_cstr(ConstByteSpan span) {
546 return reinterpret_cast<const char*>(span.data());
547 }
548
TEST(RawClientReaderWriter,MultipleCallsToSameMethodOkAndReceiveSeparateResponses)549 TEST(RawClientReaderWriter,
550 MultipleCallsToSameMethodOkAndReceiveSeparateResponses) {
551 RawClientTestContext ctx;
552
553 ConstByteSpan data_1 = as_bytes(span("data_1_unset"));
554 ConstByteSpan data_2 = as_bytes(span("data_2_unset"));
555
556 Status error;
557 auto set_error = [&error](Status status) { error.Update(status); };
558 RawClientReaderWriter active_call_1 = TestService::TestBidirectionalStreamRpc(
559 ctx.client(),
560 ctx.channel().id(),
561 [&data_1](ConstByteSpan payload) { data_1 = payload; },
562 FailIfCalled,
563 set_error);
564
565 EXPECT_TRUE(active_call_1.active());
566
567 RawClientReaderWriter active_call_2 = TestService::TestBidirectionalStreamRpc(
568 ctx.client(),
569 ctx.channel().id(),
570 [&data_2](ConstByteSpan payload) { data_2 = payload; },
571 FailIfCalled,
572 set_error);
573
574 EXPECT_TRUE(active_call_1.active());
575 EXPECT_TRUE(active_call_2.active());
576 EXPECT_EQ(error, OkStatus());
577
578 ConstByteSpan message_1 = as_bytes(span("hello_1"));
579 ConstByteSpan message_2 = as_bytes(span("hello_2"));
580
581 ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
582 message_2, active_call_2.id());
583 EXPECT_STREQ(span_as_cstr(data_2), span_as_cstr(message_2));
584 ctx.server().SendServerStream<TestService::TestBidirectionalStreamRpc>(
585 message_1, active_call_1.id());
586 EXPECT_STREQ(span_as_cstr(data_1), span_as_cstr(message_1));
587 }
588
589 } // namespace
590 } // namespace pw::rpc
591