1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/raw/server_reader_writer.h"
16
17 #include <optional>
18
19 #include "pw_rpc/internal/lock.h"
20 #include "pw_rpc/raw/fake_channel_output.h"
21 #include "pw_rpc/service.h"
22 #include "pw_rpc/writer.h"
23 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
24 #include "pw_unit_test/framework.h"
25
26 namespace pw::rpc {
27
28 class TestServiceImpl final
29 : public test::pw_rpc::raw::TestService::Service<TestServiceImpl> {
30 public:
TestUnaryRpc(ConstByteSpan,RawUnaryResponder &)31 static void TestUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
32
TestAnotherUnaryRpc(ConstByteSpan,RawUnaryResponder &)33 void TestAnotherUnaryRpc(ConstByteSpan, RawUnaryResponder&) {}
34
TestServerStreamRpc(ConstByteSpan,RawServerWriter &)35 void TestServerStreamRpc(ConstByteSpan, RawServerWriter&) {}
36
TestClientStreamRpc(RawServerReader &)37 void TestClientStreamRpc(RawServerReader&) {}
38
TestBidirectionalStreamRpc(RawServerReaderWriter &)39 void TestBidirectionalStreamRpc(RawServerReaderWriter&) {}
40 };
41
42 struct ReaderWriterTestContext {
43 static constexpr uint32_t kChannelId = 1;
44
ReaderWriterTestContextpw::rpc::ReaderWriterTestContext45 ReaderWriterTestContext()
46 : channel(Channel::Create<kChannelId>(&output)),
47 server(span(&channel, 1)) {}
48
49 TestServiceImpl service;
50 RawFakeChannelOutput<4> output;
51 Channel channel;
52 Server server;
53 };
54
55 using test::pw_rpc::raw::TestService;
56
TEST(RawUnaryResponder,DefaultConstructed)57 TEST(RawUnaryResponder, DefaultConstructed) {
58 RawUnaryResponder call;
59
60 ASSERT_FALSE(call.active());
61 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
62
63 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
64
65 call.set_on_error([](Status) {});
66 }
67
TEST(RawServerWriter,DefaultConstructed)68 TEST(RawServerWriter, DefaultConstructed) {
69 RawServerWriter call;
70
71 ASSERT_FALSE(call.active());
72 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
73
74 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
75 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
76
77 call.set_on_error([](Status) {});
78 }
79
TEST(RawServerReader,DefaultConstructed)80 TEST(RawServerReader, DefaultConstructed) {
81 RawServerReader call;
82
83 ASSERT_FALSE(call.active());
84 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
85
86 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
87
88 call.set_on_next([](ConstByteSpan) {});
89 call.set_on_error([](Status) {});
90 }
91
TEST(RawServerReaderWriter,DefaultConstructed)92 TEST(RawServerReaderWriter, DefaultConstructed) {
93 RawServerReaderWriter call;
94
95 ASSERT_FALSE(call.active());
96 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
97
98 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
99 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(Status::Cancelled()));
100
101 call.set_on_next([](ConstByteSpan) {});
102 call.set_on_error([](Status) {});
103 }
104
TEST(RawUnaryResponder,Closed)105 TEST(RawUnaryResponder, Closed) {
106 ReaderWriterTestContext ctx;
107 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
108 ctx.server, ctx.channel.id(), ctx.service);
109 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
110
111 ASSERT_FALSE(call.active());
112 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
113
114 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
115
116 call.set_on_error([](Status) {});
117 }
118
TEST(RawUnaryResponder,TryCloseFailed)119 TEST(RawUnaryResponder, TryCloseFailed) {
120 ReaderWriterTestContext ctx;
121 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
122 ctx.server, ctx.channel.id(), ctx.service);
123 // Sets ChannelOutput to always return false.
124 ctx.output.set_send_status(Status::Unknown());
125 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
126
127 // Call should be still alive.
128 ASSERT_TRUE(call.active());
129 }
130
TEST(RawUnaryResponder,TryCloseSuccessful)131 TEST(RawUnaryResponder, TryCloseSuccessful) {
132 ReaderWriterTestContext ctx;
133 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
134 ctx.server, ctx.channel.id(), ctx.service);
135 // Sets ChannelOutput to always return false.
136 ctx.output.set_send_status(Status::Unknown());
137 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
138
139 // Call should be still alive.
140 ASSERT_TRUE(call.active());
141
142 // Tries to close the call again, with ChannelOutput set to return ok.
143 ctx.output.set_send_status(OkStatus());
144 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
145 // Call should be closed.
146 ASSERT_FALSE(call.active());
147 }
148
TEST(RawServerWriter,Closed)149 TEST(RawServerWriter, Closed) {
150 ReaderWriterTestContext ctx;
151 RawServerWriter call =
152 RawServerWriter::Open<TestService::TestServerStreamRpc>(
153 ctx.server, ctx.channel.id(), ctx.service);
154 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
155
156 ASSERT_FALSE(call.active());
157 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
158
159 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
160 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus()));
161
162 call.set_on_error([](Status) {});
163 }
164
TEST(RawServerWriter,TryCloseFailed)165 TEST(RawServerWriter, TryCloseFailed) {
166 ReaderWriterTestContext ctx;
167 RawServerWriter call =
168 RawServerWriter::Open<TestService::TestServerStreamRpc>(
169 ctx.server, ctx.channel.id(), ctx.service);
170 // Sets ChannelOutput to always return false.
171 ctx.output.set_send_status(Status::Unknown());
172 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
173
174 // Call should be still alive.
175 ASSERT_TRUE(call.active());
176 }
177
TEST(RawServerWriter,TryCloseSuccessful)178 TEST(RawServerWriter, TryCloseSuccessful) {
179 ReaderWriterTestContext ctx;
180 RawServerWriter call =
181 RawServerWriter::Open<TestService::TestServerStreamRpc>(
182 ctx.server, ctx.channel.id(), ctx.service);
183 // Sets ChannelOutput to always return false.
184 ctx.output.set_send_status(Status::Unknown());
185 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
186
187 // Call should be still alive.
188 ASSERT_TRUE(call.active());
189
190 // Tries to close the call again, with ChannelOutput set to return ok.
191 ctx.output.set_send_status(OkStatus());
192 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
193 // Call should be closed.
194 ASSERT_FALSE(call.active());
195 }
196
TEST(RawServerReader,Closed)197 TEST(RawServerReader, Closed) {
198 ReaderWriterTestContext ctx;
199 RawServerReader call =
200 RawServerReader::Open<TestService::TestClientStreamRpc>(
201 ctx.server, ctx.channel.id(), ctx.service);
202 ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus()));
203
204 ASSERT_FALSE(call.active());
205 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
206
207 EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus()));
208
209 call.set_on_next([](ConstByteSpan) {});
210 call.set_on_error([](Status) {});
211 }
212
TEST(RawServerReader,TryCloseFailed)213 TEST(RawServerReader, TryCloseFailed) {
214 ReaderWriterTestContext ctx;
215 RawServerReader call =
216 RawServerReader::Open<TestService::TestClientStreamRpc>(
217 ctx.server, ctx.channel.id(), ctx.service);
218 // Sets ChannelOutput to always return false.
219 ctx.output.set_send_status(Status::Unknown());
220 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
221
222 // Call should be still alive.
223 ASSERT_TRUE(call.active());
224 }
225
TEST(RawServerReader,TryCloseSuccessful)226 TEST(RawServerReader, TryCloseSuccessful) {
227 ReaderWriterTestContext ctx;
228 RawServerReader call =
229 RawServerReader::Open<TestService::TestClientStreamRpc>(
230 ctx.server, ctx.channel.id(), ctx.service);
231 // Sets ChannelOutput to always return false.
232 ctx.output.set_send_status(Status::Unknown());
233 ASSERT_EQ(Status::Unknown(), call.TryFinish({}, OkStatus()));
234
235 // Call should be still alive.
236 ASSERT_TRUE(call.active());
237
238 // Tries to close the call again, with ChannelOutput set to return ok.
239 ctx.output.set_send_status(OkStatus());
240 ASSERT_EQ(OkStatus(), call.TryFinish({}, OkStatus()));
241 // Call should be closed.
242 ASSERT_FALSE(call.active());
243 }
244
TEST(RawServerReaderWriter,Closed)245 TEST(RawServerReaderWriter, Closed) {
246 ReaderWriterTestContext ctx;
247 RawServerReaderWriter call =
248 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
249 ctx.server, ctx.channel.id(), ctx.service);
250 ASSERT_EQ(OkStatus(), call.Finish(OkStatus()));
251
252 ASSERT_FALSE(call.active());
253 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
254
255 EXPECT_EQ(Status::FailedPrecondition(), call.Write({}));
256 EXPECT_EQ(Status::FailedPrecondition(), call.Finish(Status::Cancelled()));
257
258 call.set_on_next([](ConstByteSpan) {});
259 call.set_on_error([](Status) {});
260 }
261
TEST(RawServerReaderWriter,TryCloseFailed)262 TEST(RawServerReaderWriter, TryCloseFailed) {
263 ReaderWriterTestContext ctx;
264 RawServerReaderWriter call =
265 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
266 ctx.server, ctx.channel.id(), ctx.service);
267 // Sets ChannelOutput to always return false.
268 ctx.output.set_send_status(Status::Unknown());
269 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
270
271 // Call should be still alive.
272 ASSERT_TRUE(call.active());
273 }
274
TEST(RawServerReaderWriter,TryCloseSuccessful)275 TEST(RawServerReaderWriter, TryCloseSuccessful) {
276 ReaderWriterTestContext ctx;
277 RawServerReaderWriter call =
278 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
279 ctx.server, ctx.channel.id(), ctx.service);
280 // Sets ChannelOutput to always return false.
281 ctx.output.set_send_status(Status::Unknown());
282 ASSERT_EQ(Status::Unknown(), call.TryFinish(OkStatus()));
283
284 // Call should be still alive.
285 ASSERT_TRUE(call.active());
286
287 // Tries to close the call again, with ChannelOutput set to return ok.
288 ctx.output.set_send_status(OkStatus());
289 ASSERT_EQ(OkStatus(), call.TryFinish(OkStatus()));
290 // Call should be closed.
291 ASSERT_FALSE(call.active());
292 }
293
TEST(RawUnaryResponder,Open_ReturnsUsableResponder)294 TEST(RawUnaryResponder, Open_ReturnsUsableResponder) {
295 ReaderWriterTestContext ctx;
296 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
297 ctx.server, ctx.channel.id(), ctx.service);
298
299 EXPECT_EQ(call.channel_id(), ctx.channel.id());
300 EXPECT_EQ(OkStatus(), call.Finish(as_bytes(span("hello from pw_rpc"))));
301
302 EXPECT_STREQ(
303 reinterpret_cast<const char*>(
304 ctx.output.payloads<TestService::TestUnaryRpc>().back().data()),
305 "hello from pw_rpc");
306 }
307
TEST(RawServerReaderWriter,Open_UnknownChannel)308 TEST(RawServerReaderWriter, Open_UnknownChannel) {
309 ReaderWriterTestContext ctx;
310 ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId));
311
312 RawServerReaderWriter call =
313 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
314 ctx.server, ctx.kChannelId, ctx.service);
315
316 EXPECT_TRUE(call.active());
317 EXPECT_EQ(call.channel_id(), ctx.kChannelId);
318 EXPECT_EQ(Status::Unavailable(), call.Write({}));
319
320 ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output));
321
322 EXPECT_EQ(OkStatus(), call.Write({}));
323 EXPECT_TRUE(call.active());
324
325 EXPECT_EQ(OkStatus(), call.Finish());
326 EXPECT_FALSE(call.active());
327 EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId);
328 }
329
TEST(RawUnaryResponder,Open_MultipleTimes_CancelsPrevious)330 TEST(RawUnaryResponder, Open_MultipleTimes_CancelsPrevious) {
331 ReaderWriterTestContext ctx;
332
333 RawUnaryResponder one = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
334 ctx.server, ctx.channel.id(), ctx.service);
335
336 std::optional<Status> error;
337 one.set_on_error([&error](Status status) { error = status; });
338
339 ASSERT_TRUE(one.active());
340
341 RawUnaryResponder two = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
342 ctx.server, ctx.channel.id(), ctx.service);
343
344 ASSERT_FALSE(one.active());
345 ASSERT_TRUE(two.active());
346
347 EXPECT_EQ(Status::Cancelled(), error);
348 }
349
TEST(RawServerWriter,Open_ReturnsUsableWriter)350 TEST(RawServerWriter, Open_ReturnsUsableWriter) {
351 ReaderWriterTestContext ctx;
352 RawServerWriter call =
353 RawServerWriter::Open<TestService::TestServerStreamRpc>(
354 ctx.server, ctx.channel.id(), ctx.service);
355
356 EXPECT_EQ(call.channel_id(), ctx.channel.id());
357 EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("321"))));
358
359 EXPECT_STREQ(reinterpret_cast<const char*>(
360 ctx.output.payloads<TestService::TestServerStreamRpc>()
361 .back()
362 .data()),
363 "321");
364 }
365
TEST(RawServerReader,Open_ReturnsUsableReader)366 TEST(RawServerReader, Open_ReturnsUsableReader) {
367 ReaderWriterTestContext ctx;
368 RawServerReader call =
369 RawServerReader::Open<TestService::TestClientStreamRpc>(
370 ctx.server, ctx.channel.id(), ctx.service);
371
372 EXPECT_EQ(call.channel_id(), ctx.channel.id());
373 EXPECT_EQ(OkStatus(), call.Finish(as_bytes(span("This is a message"))));
374
375 EXPECT_STREQ(reinterpret_cast<const char*>(
376 ctx.output.payloads<TestService::TestClientStreamRpc>()
377 .back()
378 .data()),
379 "This is a message");
380 }
381
TEST(RawServerReaderWriter,Open_ReturnsUsableReaderWriter)382 TEST(RawServerReaderWriter, Open_ReturnsUsableReaderWriter) {
383 ReaderWriterTestContext ctx;
384 RawServerReaderWriter call =
385 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
386 ctx.server, ctx.channel.id(), ctx.service);
387
388 EXPECT_EQ(call.channel_id(), ctx.channel.id());
389 EXPECT_EQ(OkStatus(), call.Write(as_bytes(span("321"))));
390
391 EXPECT_STREQ(
392 reinterpret_cast<const char*>(
393 ctx.output.payloads<TestService::TestBidirectionalStreamRpc>()
394 .back()
395 .data()),
396 "321");
397 }
398
TEST(RawUnaryResponder,Move_FinishesActiveCall)399 TEST(RawUnaryResponder, Move_FinishesActiveCall) {
400 ReaderWriterTestContext ctx;
401 RawUnaryResponder active_call =
402 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
403 ctx.server, ctx.channel.id(), ctx.service);
404
405 RawUnaryResponder inactive_call;
406
407 active_call = std::move(inactive_call);
408
409 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
410 ASSERT_EQ(completions.size(), 1u);
411 EXPECT_EQ(completions.back(), OkStatus());
412 }
413
TEST(RawUnaryResponder,Move_DifferentActiveCalls_ClosesFirstOnly)414 TEST(RawUnaryResponder, Move_DifferentActiveCalls_ClosesFirstOnly) {
415 ReaderWriterTestContext ctx;
416 RawUnaryResponder active_call =
417 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
418 ctx.server, ctx.channel.id(), ctx.service);
419
420 RawUnaryResponder new_active_call =
421 RawUnaryResponder::Open<TestService::TestAnotherUnaryRpc>(
422 ctx.server, ctx.channel.id(), ctx.service);
423
424 EXPECT_TRUE(active_call.active());
425 EXPECT_TRUE(new_active_call.active());
426
427 active_call = std::move(new_active_call);
428
429 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
430 ASSERT_EQ(completions.size(), 1u);
431 EXPECT_EQ(completions.back(), OkStatus());
432
433 EXPECT_TRUE(
434 ctx.output.completions<TestService::TestAnotherUnaryRpc>().empty());
435 }
436
TEST(RawUnaryResponder,ReplaceActiveCall_DoesNotFinishCall)437 TEST(RawUnaryResponder, ReplaceActiveCall_DoesNotFinishCall) {
438 ReaderWriterTestContext ctx;
439 RawUnaryResponder active_call =
440 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
441 ctx.server, ctx.channel.id(), ctx.service);
442
443 RawUnaryResponder new_active_call =
444 RawUnaryResponder::Open<TestService::TestUnaryRpc>(
445 ctx.server, ctx.channel.id(), ctx.service);
446
447 active_call = std::move(new_active_call);
448
449 ASSERT_TRUE(ctx.output.completions<TestService::TestUnaryRpc>().empty());
450
451 constexpr const char kData[] = "Some data!";
452 EXPECT_EQ(
453 OkStatus(),
454 active_call.Finish(as_bytes(span(kData)), Status::InvalidArgument()));
455
456 EXPECT_STREQ(
457 reinterpret_cast<const char*>(
458 ctx.output.payloads<TestService::TestUnaryRpc>().back().data()),
459 kData);
460
461 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
462 ASSERT_EQ(completions.size(), 1u);
463 EXPECT_EQ(completions.back(), Status::InvalidArgument());
464 }
465
TEST(RawUnaryResponder,OutOfScope_FinishesActiveCall)466 TEST(RawUnaryResponder, OutOfScope_FinishesActiveCall) {
467 ReaderWriterTestContext ctx;
468
469 {
470 RawUnaryResponder call = RawUnaryResponder::Open<TestService::TestUnaryRpc>(
471 ctx.server, ctx.channel.id(), ctx.service);
472 ASSERT_TRUE(ctx.output.completions<TestService::TestUnaryRpc>().empty());
473 }
474
475 const auto completions = ctx.output.completions<TestService::TestUnaryRpc>();
476 ASSERT_EQ(completions.size(), 1u);
477 EXPECT_EQ(completions.back(), OkStatus());
478 }
479
TEST(RawServerWriter,Move_InactiveToActive_FinishesActiveCall)480 TEST(RawServerWriter, Move_InactiveToActive_FinishesActiveCall) {
481 ReaderWriterTestContext ctx;
482 RawServerWriter active_call =
483 RawServerWriter::Open<TestService::TestServerStreamRpc>(
484 ctx.server, ctx.channel.id(), ctx.service);
485
486 RawServerWriter inactive_call;
487
488 active_call = std::move(inactive_call);
489
490 const auto completions =
491 ctx.output.completions<TestService::TestServerStreamRpc>();
492 ASSERT_EQ(completions.size(), 1u);
493 EXPECT_EQ(completions.back(), OkStatus());
494 }
495
TEST(RawServerWriter,ReplaceActiveCall_DoesNotFinishCall)496 TEST(RawServerWriter, ReplaceActiveCall_DoesNotFinishCall) {
497 ReaderWriterTestContext ctx;
498 RawServerWriter active_call =
499 RawServerWriter::Open<TestService::TestServerStreamRpc>(
500 ctx.server, ctx.channel.id(), ctx.service);
501
502 RawServerWriter new_active_call =
503 RawServerWriter::Open<TestService::TestServerStreamRpc>(
504 ctx.server, ctx.channel.id(), ctx.service);
505
506 active_call = std::move(new_active_call);
507
508 ASSERT_TRUE(
509 ctx.output.completions<TestService::TestServerStreamRpc>().empty());
510
511 constexpr const char kData[] = "Some data!";
512 EXPECT_EQ(OkStatus(), active_call.Write(as_bytes(span(kData))));
513
514 EXPECT_STREQ(reinterpret_cast<const char*>(
515 ctx.output.payloads<TestService::TestServerStreamRpc>()
516 .back()
517 .data()),
518 kData);
519 }
520
521 constexpr const char kWriterData[] = "20X6";
522
WriteAsWriter(Writer & writer)523 void WriteAsWriter(Writer& writer) {
524 ASSERT_TRUE(writer.active());
525 ASSERT_EQ(writer.channel_id(), ReaderWriterTestContext::kChannelId);
526
527 EXPECT_EQ(OkStatus(), writer.Write(as_bytes(span(kWriterData))));
528 }
529
TEST(RawServerWriter,UsableAsWriter)530 TEST(RawServerWriter, UsableAsWriter) {
531 ReaderWriterTestContext ctx;
532 RawServerWriter call =
533 RawServerWriter::Open<TestService::TestServerStreamRpc>(
534 ctx.server, ctx.channel.id(), ctx.service);
535
536 WriteAsWriter(call.as_writer());
537
538 EXPECT_STREQ(reinterpret_cast<const char*>(
539 ctx.output.payloads<TestService::TestServerStreamRpc>()
540 .back()
541 .data()),
542 kWriterData);
543 }
544
TEST(RawServerReaderWriter,UsableAsWriter)545 TEST(RawServerReaderWriter, UsableAsWriter) {
546 ReaderWriterTestContext ctx;
547 RawServerReaderWriter call =
548 RawServerReaderWriter::Open<TestService::TestBidirectionalStreamRpc>(
549 ctx.server, ctx.channel.id(), ctx.service);
550
551 WriteAsWriter(call.as_writer());
552
553 EXPECT_STREQ(
554 reinterpret_cast<const char*>(
555 ctx.output.payloads<TestService::TestBidirectionalStreamRpc>()
556 .back()
557 .data()),
558 kWriterData);
559 }
560
561 } // namespace pw::rpc
562