1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include <cstring>
16 #include <map>
17 #include <string>
18 #include <string_view>
19 #include <type_traits>
20
21 #include "pw_allocator/libc_allocator.h"
22 #include "pw_assert/check.h"
23 #include "pw_bytes/byte_builder.h"
24 #include "pw_bytes/span.h"
25 #include "pw_checksum/crc32.h"
26 #include "pw_grpc/connection.h"
27 #include "pw_grpc/examples/echo/echo.rpc.pwpb.h"
28 #include "pw_grpc/grpc_channel_output.h"
29 #include "pw_grpc/pw_rpc_handler.h"
30 #include "pw_log/log.h"
31 #include "pw_multibuf/simple_allocator.h"
32 #include "pw_result/result.h"
33 #include "pw_rpc/internal/hash.h"
34 #include "pw_rpc/internal/packet.h"
35 #include "pw_rpc_transport/service_registry.h"
36 #include "pw_span/span.h"
37 #include "pw_status/status.h"
38 #include "pw_status/try.h"
39 #include "pw_stream/socket_stream.h"
40 #include "pw_stream/stream.h"
41 #include "pw_string/string.h"
42 #include "pw_thread/test_thread_context.h"
43 #include "pw_thread/thread.h"
44
45 using pw::grpc::StreamId;
46
47 namespace {
48 static constexpr size_t kBufferSize = 512;
49
50 class EchoService
51 : public ::grpc::examples::echo::pw_rpc::pwpb::Echo::Service<EchoService> {
52 public:
UnaryEcho(pw::ConstByteSpan request,pw::rpc::RawUnaryResponder & responder)53 void UnaryEcho(pw::ConstByteSpan request,
54 pw::rpc::RawUnaryResponder& responder) {
55 auto message =
56 ::grpc::examples::echo::pwpb::EchoRequest::FindMessage(request);
57 if (!message.ok()) {
58 responder.Finish({}, message.status()).IgnoreError();
59 }
60
61 if (message->size() < 100) {
62 PW_LOG_INFO("UnaryEcho %s", message->data());
63 } else {
64 PW_LOG_INFO("UnaryEcho (len=%zu)", message->size());
65 }
66
67 quiet_ = message->compare("quiet") == 0;
68 last_unary_responder_ = std::move(responder);
69 if (quiet_) {
70 return;
71 }
72
73 std::array<std::byte, kBufferSize> mem_writer_buffer_;
74 std::array<std::byte, kBufferSize> encoder_scratch_buffer_;
75 pw::stream::MemoryWriter writer(mem_writer_buffer_);
76 ::grpc::examples::echo::pwpb::EchoResponse::StreamEncoder encoder(
77 writer, encoder_scratch_buffer_);
78
79 auto checksum = message->rfind("crc32:", 0) == 0;
80 if (checksum) {
81 uint32_t crc32 = pw::checksum::Crc32::Calculate(
82 pw::span(reinterpret_cast<const std::byte*>(message->data()),
83 message->size()));
84 encoder.Write({.message = std::string_view(std::to_string(crc32))})
85 .IgnoreError();
86 } else {
87 encoder.Write({.message = *message}).IgnoreError();
88 }
89
90 last_unary_responder_.Finish(writer.WrittenData(), pw::OkStatus())
91 .IgnoreError();
92 }
93
ServerStreamingEcho(const::grpc::examples::echo::pwpb::EchoRequest::Message & request,ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message> & writer)94 void ServerStreamingEcho(
95 const ::grpc::examples::echo::pwpb::EchoRequest::Message& request,
96 ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>&
97 writer) {
98 PW_LOG_INFO("ServerStreamingEcho %s", request.message.c_str());
99 quiet_ = request.message.compare("quiet") == 0;
100 last_writer_ = std::move(writer);
101 if (quiet_) {
102 PW_LOG_INFO("not writing server streaming echo");
103 return;
104 }
105 for (size_t i = 0; i < 3; ++i) {
106 last_writer_.Write({.message = request.message}).IgnoreError();
107 }
108 last_writer_.Finish(pw::OkStatus()).IgnoreError();
109 }
110
ClientStreamingEcho(ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader)111 void ClientStreamingEcho(
112 ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
113 ::grpc::examples::echo::pwpb::EchoResponse::Message>&
114 reader) {
115 PW_LOG_INFO("ClientStreamingEcho");
116 last_reader_ = std::move(reader);
117 last_reader_.set_on_next(
118 [this](
119 const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
120 quiet_ = request.message.compare("quiet") == 0;
121 PW_LOG_INFO("ClientStreaming message %s", request.message.c_str());
122 });
123
124 last_reader_.set_on_completion_requested([this]() {
125 if (quiet_) {
126 return;
127 }
128 last_reader_.Finish({.message = "done"}).IgnoreError();
129 });
130 }
131
BidirectionalStreamingEcho(ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,::grpc::examples::echo::pwpb::EchoResponse::Message> & reader_writer)132 void BidirectionalStreamingEcho(
133 ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
134 ::grpc::examples::echo::pwpb::EchoResponse::Message>&
135 reader_writer) {
136 PW_LOG_INFO("BidirectionalStreamingEcho");
137 last_reader_writer_ = std::move(reader_writer);
138 last_reader_writer_.set_on_completion_requested([this]() {
139 if (quiet_) {
140 return;
141 }
142 last_reader_writer_.Finish(pw::OkStatus()).IgnoreError();
143 });
144 last_reader_writer_.set_on_next(
145 [this](
146 const ::grpc::examples::echo::pwpb::EchoRequest::Message& request) {
147 PW_LOG_INFO("BidiStreaming message %s", request.message.c_str());
148 quiet_ = request.message.compare("quiet") == 0;
149 if (quiet_) {
150 return;
151 }
152 last_reader_writer_.Write({.message = request.message}).IgnoreError();
153 });
154 }
155
156 private:
157 pw::rpc::RawUnaryResponder last_unary_responder_{};
158 ServerWriter<::grpc::examples::echo::pwpb::EchoResponse::Message>
159 last_writer_{};
160 ServerReader<::grpc::examples::echo::pwpb::EchoRequest::Message,
161 ::grpc::examples::echo::pwpb::EchoResponse::Message>
162 last_reader_{};
163 ServerReaderWriter<::grpc::examples::echo::pwpb::EchoRequest::Message,
164 ::grpc::examples::echo::pwpb::EchoResponse::Message>
165 last_reader_writer_{};
166 bool quiet_ = false;
167 };
168
169 constexpr uint32_t kTestChannelId = 1;
170
171 } // namespace
172
main(int argc,char * argv[])173 int main(int argc, char* argv[]) {
174 std::vector<std::string> args(argv, argv + argc);
175 int port = 3400;
176 int num_connections = 1;
177
178 if (args.size() > 1) {
179 if (args[1] == "--help") {
180 PW_LOG_INFO("Usage: [port=3400] [num_connections=1]");
181 PW_LOG_INFO(
182 " num_connections positional arg sets how many socket connections "
183 "should be processed before exit");
184 exit(0);
185 }
186 port = stoi(args[1]);
187 }
188
189 if (args.size() > 2) {
190 num_connections = stoi(args[2]);
191 }
192
193 std::setbuf(stdout, nullptr); // unbuffered stdout
194
195 pw::stream::ServerSocket server;
196 pw::grpc::GrpcChannelOutput rpc_egress;
197 std::array<pw::rpc::Channel, 1> tx_channels(
198 {pw::rpc::Channel::Create<kTestChannelId>(&rpc_egress)});
199 pw::rpc::ServiceRegistry service_registry(tx_channels);
200
201 EchoService echo_service;
202 service_registry.RegisterService(echo_service);
203
204 pw::grpc::PwRpcHandler handler(kTestChannelId,
205 service_registry.client_server().server());
206 rpc_egress.set_callbacks(handler);
207
208 PW_LOG_INFO("Main.Listen on port=%d", port);
209 if (auto status = server.Listen(port); !status.ok()) {
210 PW_LOG_ERROR("Main.Listen failed code=%d", status.code());
211 return 1;
212 }
213
214 for (int i = 0; i < num_connections; ++i) {
215 PW_LOG_INFO("Main.Accept");
216 auto socket = server.Accept();
217 if (!socket.ok()) {
218 PW_LOG_ERROR("Main.Accept failed code=%d", socket.status().code());
219 return 1;
220 }
221
222 PW_LOG_INFO("Main.Run");
223
224 constexpr size_t kMaxSendQueueSize = 4096;
225 pw::allocator::LibCAllocator message_assembly_allocator;
226 std::array<std::byte, kMaxSendQueueSize> data_area;
227 pw::multibuf::SimpleAllocator simple_allocator(data_area,
228 message_assembly_allocator);
229 pw::thread::test::TestThreadContext connection_thread_context;
230 pw::thread::test::TestThreadContext send_thread_context;
231 pw::grpc::ConnectionThread conn(
232 *socket,
233 send_thread_context.options(),
234 handler,
235 [&socket]() { socket->Close(); },
236 &message_assembly_allocator,
237 simple_allocator);
238 rpc_egress.set_connection(conn);
239
240 pw::Thread conn_thread(connection_thread_context.options(), conn);
241 conn_thread.join();
242 }
243
244 PW_LOG_INFO("Main.Run completed");
245 return 0;
246 }
247