• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include <cstring>
16 #include <map>
17 #include <string>
18 #include <string_view>
19 #include <type_traits>
20 
21 #include "pw_allocator/libc_allocator.h"
22 #include "pw_assert/check.h"
23 #include "pw_bytes/byte_builder.h"
24 #include "pw_bytes/span.h"
25 #include "pw_checksum/crc32.h"
26 #include "pw_grpc/connection.h"
27 #include "pw_grpc/examples/echo/echo.rpc.pwpb.h"
28 #include "pw_grpc/grpc_channel_output.h"
29 #include "pw_grpc/pw_rpc_handler.h"
30 #include "pw_log/log.h"
31 #include "pw_multibuf/simple_allocator.h"
32 #include "pw_result/result.h"
33 #include "pw_rpc/internal/hash.h"
34 #include "pw_rpc/internal/packet.h"
35 #include "pw_rpc_transport/service_registry.h"
36 #include "pw_span/span.h"
37 #include "pw_status/status.h"
38 #include "pw_status/try.h"
39 #include "pw_stream/socket_stream.h"
40 #include "pw_stream/stream.h"
41 #include "pw_string/string.h"
42 #include "pw_thread/test_thread_context.h"
43 #include "pw_thread/thread.h"
44 
45 using pw::grpc::StreamId;
46 
47 namespace {
48 static constexpr size_t kBufferSize = 512;
49 
50 class EchoService
51     : public ::grpc::examples::echo::pw_rpc::pwpb::Echo::Service<EchoService> {
52  public:
UnaryEcho(pw::ConstByteSpan request,pw::rpc::RawUnaryResponder & responder)53   void UnaryEcho(pw::ConstByteSpan request,
54                  pw::rpc::RawUnaryResponder& responder) {
55     auto message =
56         ::grpc::examples::echo::pwpb::EchoRequest::FindMessage(request);
57     if (!message.ok()) {
58       responder.Finish({}, message.status()).IgnoreError();
59     }
60 
61     if (message->size() < 100) {
62       PW_LOG_INFO("UnaryEcho %s", message->data());
63     } else {
64       PW_LOG_INFO("UnaryEcho (len=%zu)", message->size());
65     }
66 
67     quiet_ = message->compare("quiet") == 0;
68     last_unary_responder_ = std::move(responder);
69     if (quiet_) {
70       return;
71     }
72 
73     std::array<std::byte, kBufferSize> mem_writer_buffer_;
74     std::array<std::byte, kBufferSize> encoder_scratch_buffer_;
75     pw::stream::MemoryWriter writer(mem_writer_buffer_);
76     ::grpc::examples::echo::pwpb::EchoResponse::StreamEncoder encoder(
77         writer, encoder_scratch_buffer_);
78 
79     auto checksum = message->rfind("crc32:", 0) == 0;
80     if (checksum) {
81       uint32_t crc32 = pw::checksum::Crc32::Calculate(
82           pw::span(reinterpret_cast<const std::byte*>(message->data()),
83                    message->size()));
84       encoder.Write({.message = std::string_view(std::to_string(crc32))})
85           .IgnoreError();
86     } else {
87       encoder.Write({.message = *message}).IgnoreError();
88     }
89 
90     last_unary_responder_.Finish(writer.WrittenData(), pw::OkStatus())
91         .IgnoreError();
92   }
93 
ServerStreamingEcho(const::grpc::examples::echo::pwpb::EchoRequest::Message & request,ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message> & writer)94   void ServerStreamingEcho(
95       const ::grpc::examples::echo::pwpb::EchoRequest::Message& request,
96       ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>&
97           writer) {
98     PW_LOG_INFO("ServerStreamingEcho %s", request.message.c_str());
99     quiet_ = request.message.compare("quiet") == 0;
100     last_writer_ = std::move(writer);
101     if (quiet_) {
102       PW_LOG_INFO("not writing server streaming echo");
103       return;
104     }
105     for (size_t i = 0; i < 3; ++i) {
106       last_writer_.Write({.message = request.message}).IgnoreError();
107     }
108     last_writer_.Finish(pw::OkStatus()).IgnoreError();
109   }
110 
ClientStreamingEcho(ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader)111   void ClientStreamingEcho(
112       ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
113                    ::grpc::examples::echo::pwpb::EchoResponse::Message>&
114           reader) {
115     PW_LOG_INFO("ClientStreamingEcho");
116     last_reader_ = std::move(reader);
117     last_reader_.set_on_next(
118         [this](
119             const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
120           quiet_ = request.message.compare("quiet") == 0;
121           PW_LOG_INFO("ClientStreaming message %s", request.message.c_str());
122         });
123 
124     last_reader_.set_on_completion_requested([this]() {
125       if (quiet_) {
126         return;
127       }
128       last_reader_.Finish({.message = "done"}).IgnoreError();
129     });
130   }
131 
BidirectionalStreamingEcho(ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader_writer)132   void BidirectionalStreamingEcho(
133       ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
134                          ::grpc::examples::echo::pwpb::EchoResponse::Message>&
135           reader_writer) {
136     PW_LOG_INFO("BidirectionalStreamingEcho");
137     last_reader_writer_ = std::move(reader_writer);
138     last_reader_writer_.set_on_completion_requested([this]() {
139       if (quiet_) {
140         return;
141       }
142       last_reader_writer_.Finish(pw::OkStatus()).IgnoreError();
143     });
144     last_reader_writer_.set_on_next(
145         [this](
146             const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
147           PW_LOG_INFO("BidiStreaming message %s", request.message.c_str());
148           quiet_ = request.message.compare("quiet") == 0;
149           if (quiet_) {
150             return;
151           }
152           last_reader_writer_.Write({.message = request.message}).IgnoreError();
153         });
154   }
155 
156  private:
157   pw::rpc::RawUnaryResponder last_unary_responder_{};
158   ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>
159       last_writer_{};
160   ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
161                ::grpc::examples::echo::pwpb::EchoResponse::Message>
162       last_reader_{};
163   ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
164                      ::grpc::examples::echo::pwpb::EchoResponse::Message>
165       last_reader_writer_{};
166   bool quiet_ = false;
167 };
168 
169 constexpr uint32_t kTestChannelId = 1;
170 
171 }  // namespace
172 
main(int argc,char * argv[])173 int main(int argc, char* argv[]) {
174   std::vector<std::string> args(argv, argv + argc);
175   int port = 3400;
176   int num_connections = 1;
177 
178   if (args.size() > 1) {
179     if (args[1] == "--help") {
180       PW_LOG_INFO("Usage: [port=3400] [num_connections=1]");
181       PW_LOG_INFO(
182           "  num_connections positional arg sets how many socket connections "
183           "should be processed before exit");
184       exit(0);
185     }
186     port = stoi(args[1]);
187   }
188 
189   if (args.size() > 2) {
190     num_connections = stoi(args[2]);
191   }
192 
193   std::setbuf(stdout, nullptr);  // unbuffered stdout
194 
195   pw::stream::ServerSocket server;
196   pw::grpc::GrpcChannelOutput rpc_egress;
197   std::array<pw::rpc::Channel, 1> tx_channels(
198       {pw::rpc::Channel::Create<kTestChannelId>(&rpc_egress)});
199   pw::rpc::ServiceRegistry service_registry(tx_channels);
200 
201   EchoService echo_service;
202   service_registry.RegisterService(echo_service);
203 
204   pw::grpc::PwRpcHandler handler(kTestChannelId,
205                                  service_registry.client_server().server());
206   rpc_egress.set_callbacks(handler);
207 
208   PW_LOG_INFO("Main.Listen on port=%d", port);
209   if (auto status = server.Listen(port); !status.ok()) {
210     PW_LOG_ERROR("Main.Listen failed code=%d", status.code());
211     return 1;
212   }
213 
214   for (int i = 0; i < num_connections; ++i) {
215     PW_LOG_INFO("Main.Accept");
216     auto socket = server.Accept();
217     if (!socket.ok()) {
218       PW_LOG_ERROR("Main.Accept failed code=%d", socket.status().code());
219       return 1;
220     }
221 
222     PW_LOG_INFO("Main.Run");
223 
224     constexpr size_t kMaxSendQueueSize = 4096;
225     pw::allocator::LibCAllocator message_assembly_allocator;
226     std::array<std::byte, kMaxSendQueueSize> data_area;
227     pw::multibuf::SimpleAllocator simple_allocator(data_area,
228                                                    message_assembly_allocator);
229     pw::thread::test::TestThreadContext connection_thread_context;
230     pw::thread::test::TestThreadContext send_thread_context;
231     pw::grpc::ConnectionThread conn(
232         *socket,
233         send_thread_context.options(),
234         handler,
235         [&socket]() { socket->Close(); },
236         &message_assembly_allocator,
237         simple_allocator);
238     rpc_egress.set_connection(conn);
239 
240     pw::Thread conn_thread(connection_thread_context.options(), conn);
241     conn_thread.join();
242   }
243 
244   PW_LOG_INFO("Main.Run completed");
245   return 0;
246 }
247