• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <optional>
16 
17 #include "gtest/gtest.h"
18 #include "pw_protobuf/decoder.h"
19 #include "pw_rpc/internal/hash.h"
20 #include "pw_rpc/raw/client_testing.h"
21 #include "pw_rpc/raw/test_method_context.h"
22 #include "pw_rpc_test_protos/test.pwpb.h"
23 #include "pw_rpc_test_protos/test.raw_rpc.pb.h"
24 
25 namespace pw::rpc {
26 namespace {
27 
EncodeRequest(int integer,Status status)28 Vector<std::byte, 64> EncodeRequest(int integer, Status status) {
29   Vector<std::byte, 64> buffer(64);
30   test::TestRequest::MemoryEncoder test_request(buffer);
31 
32   EXPECT_EQ(OkStatus(), test_request.WriteInteger(integer));
33   EXPECT_EQ(OkStatus(), test_request.WriteStatusCode(status.code()));
34 
35   EXPECT_EQ(OkStatus(), test_request.status());
36   buffer.resize(test_request.size());
37   return buffer;
38 }
39 
EncodeResponse(int number)40 Vector<std::byte, 64> EncodeResponse(int number) {
41   Vector<std::byte, 64> buffer(64);
42   test::TestStreamResponse::MemoryEncoder test_response(buffer);
43 
44   EXPECT_EQ(OkStatus(), test_response.WriteNumber(number));
45 
46   EXPECT_EQ(OkStatus(), test_response.status());
47   buffer.resize(test_response.size());
48   return buffer;
49 }
50 
51 }  // namespace
52 
53 namespace test {
54 
55 class TestService final
56     : public pw_rpc::raw::TestService::Service<TestService> {
57  public:
TestUnaryRpc(ConstByteSpan request,RawUnaryResponder & responder)58   static void TestUnaryRpc(ConstByteSpan request,
59                            RawUnaryResponder& responder) {
60     int64_t integer;
61     Status status;
62 
63     if (!DecodeRequest(request, integer, status)) {
64       ASSERT_EQ(OkStatus(), responder.Finish({}, Status::DataLoss()));
65       return;
66     }
67 
68     std::byte response[64] = {};
69     TestResponse::MemoryEncoder test_response(response);
70     EXPECT_EQ(OkStatus(), test_response.WriteValue(integer + 1));
71 
72     ASSERT_EQ(OkStatus(),
73               responder.Finish(std::span(response).first(test_response.size()),
74                                status));
75   }
76 
TestAnotherUnaryRpc(ConstByteSpan request,RawUnaryResponder & responder)77   void TestAnotherUnaryRpc(ConstByteSpan request,
78                            RawUnaryResponder& responder) {
79     if (request.empty()) {
80       last_responder_ = std::move(responder);
81     } else {
82       TestUnaryRpc(request, responder);
83     }
84   }
85 
TestServerStreamRpc(ConstByteSpan request,RawServerWriter & writer)86   void TestServerStreamRpc(ConstByteSpan request, RawServerWriter& writer) {
87     int64_t integer;
88     Status status;
89 
90     ASSERT_TRUE(DecodeRequest(request, integer, status));
91     for (int i = 0; i < integer; ++i) {
92       std::byte buffer[32] = {};
93       TestStreamResponse::MemoryEncoder test_stream_response(buffer);
94       EXPECT_EQ(OkStatus(), test_stream_response.WriteNumber(i));
95       EXPECT_EQ(OkStatus(), writer.Write(test_stream_response));
96     }
97 
98     EXPECT_EQ(OkStatus(), writer.Finish(status));
99   }
100 
TestClientStreamRpc(RawServerReader & reader)101   void TestClientStreamRpc(RawServerReader& reader) {
102     last_reader_ = std::move(reader);
103 
104     last_reader_.set_on_next([this](ConstByteSpan payload) {
105       EXPECT_EQ(OkStatus(),
106                 last_reader_.Finish(EncodeResponse(ReadInteger(payload)),
107                                     Status::Unauthenticated()));
108     });
109   }
110 
TestBidirectionalStreamRpc(RawServerReaderWriter & reader_writer)111   void TestBidirectionalStreamRpc(RawServerReaderWriter& reader_writer) {
112     last_reader_writer_ = std::move(reader_writer);
113 
114     last_reader_writer_.set_on_next([this](ConstByteSpan payload) {
115       EXPECT_EQ(
116           OkStatus(),
117           last_reader_writer_.Write(EncodeResponse(ReadInteger(payload))));
118       EXPECT_EQ(OkStatus(), last_reader_writer_.Finish(Status::NotFound()));
119     });
120   }
121 
last_responder()122   RawUnaryResponder& last_responder() { return last_responder_; }
123 
124  private:
ReadInteger(ConstByteSpan request)125   static uint32_t ReadInteger(ConstByteSpan request) {
126     uint32_t integer = 0;
127 
128     protobuf::Decoder decoder(request);
129     while (decoder.Next().ok()) {
130       switch (static_cast<TestRequest::Fields>(decoder.FieldNumber())) {
131         case TestRequest::Fields::INTEGER:
132           EXPECT_EQ(OkStatus(), decoder.ReadUint32(&integer));
133           break;
134         case TestRequest::Fields::STATUS_CODE:
135           break;
136         default:
137           ADD_FAILURE();
138       }
139     }
140 
141     return integer;
142   }
143 
DecodeRequest(ConstByteSpan request,int64_t & integer,Status & status)144   static bool DecodeRequest(ConstByteSpan request,
145                             int64_t& integer,
146                             Status& status) {
147     protobuf::Decoder decoder(request);
148     Status decode_status;
149     bool has_integer = false;
150     bool has_status = false;
151 
152     while (decoder.Next().ok()) {
153       switch (static_cast<TestRequest::Fields>(decoder.FieldNumber())) {
154         case TestRequest::Fields::INTEGER:
155           decode_status = decoder.ReadInt64(&integer);
156           EXPECT_EQ(OkStatus(), decode_status);
157           has_integer = decode_status.ok();
158           break;
159         case TestRequest::Fields::STATUS_CODE: {
160           uint32_t status_code;
161           decode_status = decoder.ReadUint32(&status_code);
162           EXPECT_EQ(OkStatus(), decode_status);
163           has_status = decode_status.ok();
164           status = static_cast<Status::Code>(status_code);
165           break;
166         }
167       }
168     }
169     EXPECT_TRUE(has_integer);
170     EXPECT_TRUE(has_status);
171     return has_integer && has_status;
172   }
173 
174   RawUnaryResponder last_responder_;
175   RawServerReader last_reader_;
176   RawServerReaderWriter last_reader_writer_;
177 };
178 
179 }  // namespace test
180 
181 namespace {
182 
TEST(RawCodegen,Server_CompilesProperly)183 TEST(RawCodegen, Server_CompilesProperly) {
184   test::TestService service;
185   EXPECT_EQ(service.id(), internal::Hash("pw.rpc.test.TestService"));
186   EXPECT_STREQ(service.name(), "TestService");
187 }
188 
TEST(RawCodegen,Server_InvokeUnaryRpc)189 TEST(RawCodegen, Server_InvokeUnaryRpc) {
190   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestUnaryRpc) context;
191 
192   context.call(EncodeRequest(123, OkStatus()));
193   EXPECT_EQ(OkStatus(), context.status());
194 
195   protobuf::Decoder decoder(context.response());
196 
197   while (decoder.Next().ok()) {
198     switch (static_cast<test::TestResponse::Fields>(decoder.FieldNumber())) {
199       case test::TestResponse::Fields::VALUE: {
200         int32_t value;
201         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
202         EXPECT_EQ(value, 124);
203         break;
204       }
205       case test::TestResponse::Fields::REPEATED_FIELD:
206         break;  // Ignore this field
207     }
208   }
209 }
210 
TEST(RawCodegen,Server_InvokeAsyncUnaryRpc)211 TEST(RawCodegen, Server_InvokeAsyncUnaryRpc) {
212   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) context;
213 
214   context.call(EncodeRequest(123, OkStatus()));
215   EXPECT_EQ(OkStatus(), context.status());
216 
217   protobuf::Decoder decoder(context.response());
218 
219   while (decoder.Next().ok()) {
220     switch (static_cast<test::TestResponse::Fields>(decoder.FieldNumber())) {
221       case test::TestResponse::Fields::VALUE: {
222         int32_t value;
223         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
224         EXPECT_EQ(value, 124);
225         break;
226       }
227       case test::TestResponse::Fields::REPEATED_FIELD:
228         break;  // Ignore this field
229     }
230   }
231 }
232 
TEST(RawCodegen,Server_HandleError)233 TEST(RawCodegen, Server_HandleError) {
234   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
235 
236   ASSERT_FALSE(ctx.service().last_responder().active());
237   ctx.call({});
238   ASSERT_TRUE(ctx.service().last_responder().active());
239 
240   ctx.SendClientError(Status::Unimplemented());
241 
242   EXPECT_FALSE(ctx.service().last_responder().active());
243 }
244 
TEST(RawCodegen,Server_Finish)245 TEST(RawCodegen, Server_Finish) {
246   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
247 
248   ASSERT_FALSE(ctx.service().last_responder().active());
249   ctx.call({});
250   ASSERT_TRUE(ctx.service().last_responder().active());
251 
252   EXPECT_EQ(OkStatus(), ctx.service().last_responder().Finish({}));
253   EXPECT_FALSE(ctx.service().last_responder().active());
254 }
255 
TEST(RawCodegen,Server_MoveCalls)256 TEST(RawCodegen, Server_MoveCalls) {
257   // Create two call objects on different channels so they are unique.
258   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx;
259 
260   RawUnaryResponder call;
261 
262   ctx.call({});
263 
264   EXPECT_TRUE(ctx.service().last_responder().active());
265   EXPECT_FALSE(call.active());
266 
267   call = std::move(ctx.service().last_responder());
268 
269   EXPECT_FALSE(ctx.service().last_responder().active());
270   EXPECT_TRUE(call.active());
271 }
272 
TEST(RawCodegen,Server_MoveBetweenActiveCallsWithBuffers)273 TEST(RawCodegen, Server_MoveBetweenActiveCallsWithBuffers) {
274   // Create two call objects on different channels so they are unique.
275   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx_1;
276   ctx_1.set_channel_id(1);
277 
278   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestAnotherUnaryRpc) ctx_2;
279   ctx_2.set_channel_id(2);
280 
281   ctx_1.call({});
282   ctx_2.call({});
283 
284   ctx_1.service().last_responder() =
285       std::move(ctx_2.service().last_responder());
286 
287   ASSERT_TRUE(ctx_1.service().last_responder().active());
288   ASSERT_FALSE(ctx_2.service().last_responder().active());
289 
290   ctx_2.service().last_responder() =
291       std::move(ctx_1.service().last_responder());
292 
293   ASSERT_FALSE(ctx_1.service().last_responder().active());
294   ASSERT_TRUE(ctx_2.service().last_responder().active());
295 }
296 
TEST(RawCodegen,Server_InvokeServerStreamingRpc)297 TEST(RawCodegen, Server_InvokeServerStreamingRpc) {
298   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestServerStreamRpc) context;
299 
300   context.call(EncodeRequest(5, Status::Unauthenticated()));
301   EXPECT_TRUE(context.done());
302   EXPECT_EQ(Status::Unauthenticated(), context.status());
303   EXPECT_EQ(context.total_responses(), 5u);
304 
305   protobuf::Decoder decoder(context.responses().back());
306   while (decoder.Next().ok()) {
307     switch (
308         static_cast<test::TestStreamResponse::Fields>(decoder.FieldNumber())) {
309       case test::TestStreamResponse::Fields::NUMBER: {
310         int32_t value;
311         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
312         EXPECT_EQ(value, 4);
313         break;
314       }
315       case test::TestStreamResponse::Fields::CHUNK:
316         FAIL();
317         break;
318     }
319   }
320 }
321 
ReadResponseNumber(ConstByteSpan data)322 int32_t ReadResponseNumber(ConstByteSpan data) {
323   int32_t value = -1;
324   protobuf::Decoder decoder(data);
325   while (decoder.Next().ok()) {
326     switch (
327         static_cast<test::TestStreamResponse::Fields>(decoder.FieldNumber())) {
328       case test::TestStreamResponse::Fields::NUMBER: {
329         EXPECT_EQ(OkStatus(), decoder.ReadInt32(&value));
330         break;
331       }
332       default:
333         ADD_FAILURE();
334         break;
335     }
336   }
337 
338   return value;
339 }
340 
TEST(RawCodegen,Server_InvokeClientStreamingRpc)341 TEST(RawCodegen, Server_InvokeClientStreamingRpc) {
342   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestClientStreamRpc) ctx;
343 
344   ctx.call();
345   ctx.SendClientStream(EncodeRequest(123, OkStatus()));
346 
347   ASSERT_TRUE(ctx.done());
348   EXPECT_EQ(Status::Unauthenticated(), ctx.status());
349   EXPECT_EQ(ctx.total_responses(), 1u);
350   EXPECT_EQ(ReadResponseNumber(ctx.responses().back()), 123);
351 }
352 
TEST(RawCodegen,Server_InvokeBidirectionalStreamingRpc)353 TEST(RawCodegen, Server_InvokeBidirectionalStreamingRpc) {
354   PW_RAW_TEST_METHOD_CONTEXT(test::TestService, TestBidirectionalStreamRpc)
355   ctx;
356 
357   ctx.call();
358   ctx.SendClientStream(EncodeRequest(456, OkStatus()));
359 
360   ASSERT_TRUE(ctx.done());
361   EXPECT_EQ(Status::NotFound(), ctx.status());
362   ASSERT_EQ(ctx.total_responses(), 1u);
363   EXPECT_EQ(ReadResponseNumber(ctx.responses().back()), 456);
364 }
365 
TEST(RawCodegen,Client_ClientClass)366 TEST(RawCodegen, Client_ClientClass) {
367   RawClientTestContext context;
368 
369   test::pw_rpc::raw::TestService::Client service_client(context.client(),
370                                                         context.channel().id());
371 
372   EXPECT_EQ(service_client.channel_id(), context.channel().id());
373   EXPECT_EQ(&service_client.client(), &context.client());
374 }
375 
376 class RawCodegenClientTest : public ::testing::Test {
377  protected:
RawCodegenClientTest()378   RawCodegenClientTest()
379       : service_client_(context_.client(), context_.channel().id()) {}
380 
381   // Assumes the payload is a null-terminated string, not a protobuf.
OnNext()382   Function<void(ConstByteSpan)> OnNext() {
383     return [this](ConstByteSpan c_string) { CopyPayload(c_string); };
384   }
385 
OnCompleted()386   Function<void(Status)> OnCompleted() {
387     return [this](Status status) { status_ = status; };
388   }
389 
UnaryOnCompleted()390   Function<void(ConstByteSpan, Status)> UnaryOnCompleted() {
391     return [this](ConstByteSpan c_string, Status status) {
392       CopyPayload(c_string);
393       status_ = status;
394     };
395   }
396 
OnError()397   Function<void(Status)> OnError() {
398     return [this](Status error) { error_ = error; };
399   }
400 
401   RawClientTestContext<> context_;
402 
403   test::pw_rpc::raw::TestService::Client service_client_;
404 
405   // Store the payload as a null-terminated string for convenience. Use nullptr
406   // for an empty payload.
407   std::optional<const char*> payload_;
408   std::optional<Status> status_;
409   std::optional<Status> error_;
410 
411  private:
CopyPayload(ConstByteSpan c_string)412   void CopyPayload(ConstByteSpan c_string) {
413     ASSERT_LE(c_string.size(), sizeof(buffer_));
414 
415     if (c_string.empty()) {
416       payload_ = nullptr;
417     } else {
418       std::memcpy(buffer_, c_string.data(), c_string.size());
419       payload_ = buffer_;
420     }
421   }
422 
423   char buffer_[64];
424 };
425 
TEST_F(RawCodegenClientTest,InvokeUnaryRpc_Ok)426 TEST_F(RawCodegenClientTest, InvokeUnaryRpc_Ok) {
427   RawUnaryReceiver call = test::pw_rpc::raw::TestService::TestUnaryRpc(
428       context_.client(),
429       context_.channel().id(),
430       std::as_bytes(std::span("This is the request")),
431       UnaryOnCompleted(),
432       OnError());
433 
434   context_.server().SendResponse<test::pw_rpc::raw::TestService::TestUnaryRpc>(
435       std::as_bytes(std::span("(ㆆ_ㆆ)")), OkStatus());
436 
437   ASSERT_TRUE(payload_.has_value());
438   EXPECT_STREQ(payload_.value(), "(ㆆ_ㆆ)");
439   EXPECT_EQ(status_, OkStatus());
440   EXPECT_FALSE(error_.has_value());
441 }
442 
TEST_F(RawCodegenClientTest,InvokeUnaryRpc_Error)443 TEST_F(RawCodegenClientTest, InvokeUnaryRpc_Error) {
444   RawUnaryReceiver call = service_client_.TestUnaryRpc(
445       std::as_bytes(std::span("This is the request")),
446       UnaryOnCompleted(),
447       OnError());
448 
449   context_.server()
450       .SendServerError<test::pw_rpc::raw::TestService::TestUnaryRpc>(
451           Status::NotFound());
452 
453   EXPECT_FALSE(payload_.has_value());
454   EXPECT_FALSE(status_.has_value());
455   EXPECT_EQ(error_, Status::NotFound());
456 }
457 
TEST_F(RawCodegenClientTest,InvokeServerStreamRpc_Ok)458 TEST_F(RawCodegenClientTest, InvokeServerStreamRpc_Ok) {
459   RawClientReader call = test::pw_rpc::raw::TestService::TestServerStreamRpc(
460       context_.client(),
461       context_.channel().id(),
462       std::as_bytes(std::span("This is the request")),
463       OnNext(),
464       OnCompleted(),
465       OnError());
466 
467   context_.server()
468       .SendServerStream<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
469           std::as_bytes(std::span("(⌐□_□)")));
470 
471   ASSERT_TRUE(payload_.has_value());
472   EXPECT_STREQ(payload_.value(), "(⌐□_□)");
473 
474   context_.server()
475       .SendServerStream<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
476           std::as_bytes(std::span("(o_O)")));
477 
478   EXPECT_STREQ(payload_.value(), "(o_O)");
479 
480   context_.server()
481       .SendResponse<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
482           Status::InvalidArgument());
483 
484   EXPECT_EQ(status_, Status::InvalidArgument());
485   EXPECT_FALSE(error_.has_value());
486 }
487 
TEST_F(RawCodegenClientTest,InvokeServerStreamRpc_Error)488 TEST_F(RawCodegenClientTest, InvokeServerStreamRpc_Error) {
489   RawClientReader call = service_client_.TestServerStreamRpc(
490       std::as_bytes(std::span("This is the request")),
491       OnNext(),
492       OnCompleted(),
493       OnError());
494 
495   context_.server()
496       .SendServerError<test::pw_rpc::raw::TestService::TestServerStreamRpc>(
497           Status::FailedPrecondition());
498 
499   EXPECT_FALSE(payload_.has_value());
500   EXPECT_FALSE(status_.has_value());
501   EXPECT_EQ(error_, Status::FailedPrecondition());
502 }
503 
TEST_F(RawCodegenClientTest,InvokeClientStreamRpc_Ok)504 TEST_F(RawCodegenClientTest, InvokeClientStreamRpc_Ok) {
505   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
506       context_.client(),
507       context_.channel().id(),
508       UnaryOnCompleted(),
509       OnError());
510 
511   EXPECT_EQ(OkStatus(), call.Write(std::as_bytes(std::span("(•‿•)"))));
512   EXPECT_STREQ(
513       reinterpret_cast<const char*>(
514           context_.output()
515               .payloads<test::pw_rpc::raw::TestService::TestClientStreamRpc>()
516               .back()
517               .data()),
518       "(•‿•)");
519 
520   context_.server()
521       .SendResponse<test::pw_rpc::raw::TestService::TestClientStreamRpc>(
522           std::as_bytes(std::span("(⌐□_□)")), Status::InvalidArgument());
523 
524   ASSERT_TRUE(payload_.has_value());
525   EXPECT_STREQ(payload_.value(), "(⌐□_□)");
526   EXPECT_EQ(status_, Status::InvalidArgument());
527   EXPECT_FALSE(error_.has_value());
528 }
529 
TEST_F(RawCodegenClientTest,ClientStream_Finish)530 TEST_F(RawCodegenClientTest, ClientStream_Finish) {
531   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
532       context_.client(),
533       context_.channel().id(),
534       UnaryOnCompleted(),
535       OnError());
536 
537   context_.server()
538       .SendResponse<test::pw_rpc::raw::TestService::TestClientStreamRpc>(
539           {}, OkStatus());
540 
541   ASSERT_TRUE(payload_.has_value());
542   EXPECT_EQ(payload_.value(), nullptr);
543   EXPECT_EQ(status_, OkStatus());
544 }
545 
TEST_F(RawCodegenClientTest,ClientStream_Cancel)546 TEST_F(RawCodegenClientTest, ClientStream_Cancel) {
547   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
548       context_.client(),
549       context_.channel().id(),
550       UnaryOnCompleted(),
551       OnError());
552 
553   EXPECT_EQ(call.Cancel(), OkStatus());
554   EXPECT_FALSE(call.active());
555 }
556 
TEST_F(RawCodegenClientTest,ClientStream_Move)557 TEST_F(RawCodegenClientTest, ClientStream_Move) {
558   RawClientWriter call = test::pw_rpc::raw::TestService::TestClientStreamRpc(
559       context_.client(),
560       context_.channel().id(),
561       UnaryOnCompleted(),
562       OnError());
563 
564   EXPECT_EQ(OkStatus(), call.CloseClientStream());
565 
566   RawClientWriter call_2;
567 
568   call = std::move(call_2);
569   EXPECT_FALSE(call.active());
570 }
571 
TEST_F(RawCodegenClientTest,InvokeClientStreamRpc_Error)572 TEST_F(RawCodegenClientTest, InvokeClientStreamRpc_Error) {
573   RawClientWriter call =
574       service_client_.TestClientStreamRpc(UnaryOnCompleted(), OnError());
575 
576   context_.server()
577       .SendServerError<test::pw_rpc::raw::TestService::TestClientStreamRpc>(
578           Status::FailedPrecondition());
579 
580   EXPECT_FALSE(payload_.has_value());
581   EXPECT_FALSE(status_.has_value());
582   EXPECT_EQ(error_, Status::FailedPrecondition());
583 }
584 
TEST_F(RawCodegenClientTest,InvokeBidirectionalStreamRpc_Ok)585 TEST_F(RawCodegenClientTest, InvokeBidirectionalStreamRpc_Ok) {
586   RawClientReaderWriter call =
587       test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc(
588           context_.client(),
589           context_.channel().id(),
590           OnNext(),
591           OnCompleted(),
592           OnError());
593 
594   EXPECT_EQ(OkStatus(), call.Write(std::as_bytes(std::span("(•‿•)"))));
595   EXPECT_STREQ(
596       reinterpret_cast<const char*>(
597           context_.output()
598               .payloads<
599                   test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>()
600               .back()
601               .data()),
602       "(•‿•)");
603 
604   context_.server()
605       .SendServerStream<
606           test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>(
607           std::as_bytes(std::span("(⌐□_□)")));
608 
609   ASSERT_TRUE(payload_.has_value());
610   EXPECT_STREQ(payload_.value(), "(⌐□_□)");
611 
612   context_.server()
613       .SendResponse<test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>(
614           Status::InvalidArgument());
615 
616   EXPECT_EQ(status_, Status::InvalidArgument());
617   EXPECT_FALSE(error_.has_value());
618 }
619 
TEST_F(RawCodegenClientTest,InvokeBidirectionalStreamRpc_Error)620 TEST_F(RawCodegenClientTest, InvokeBidirectionalStreamRpc_Error) {
621   RawClientReaderWriter call = service_client_.TestBidirectionalStreamRpc(
622       OnNext(), OnCompleted(), OnError());
623 
624   context_.server()
625       .SendServerError<
626           test::pw_rpc::raw::TestService::TestBidirectionalStreamRpc>(
627           Status::Internal());
628 
629   EXPECT_FALSE(payload_.has_value());
630   EXPECT_FALSE(status_.has_value());
631   EXPECT_EQ(error_, Status::Internal());
632 }
633 
634 }  // namespace
635 }  // namespace pw::rpc
636