1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 // Simple RPC server with the transfer service registered. Reads HDLC frames
16 // with RPC packets through a socket. This server has a single resource ID that
17 // is available, and data must be written to the server before data can be read
18 // from the resource ID.
19 //
20 // Usage:
21 //
22 // integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'"
23
24 #include <sys/socket.h>
25
26 #include <chrono>
27 #include <cstddef>
28 #include <cstdlib>
29 #include <deque>
30 #include <map>
31 #include <memory>
32 #include <string>
33 #include <thread>
34 #include <utility>
35 #include <variant>
36 #include <vector>
37
38 #include "google/protobuf/text_format.h"
39 #include "pw_assert/check.h"
40 #include "pw_chrono/system_clock.h"
41 #include "pw_log/log.h"
42 #include "pw_rpc_system_server/rpc_server.h"
43 #include "pw_rpc_system_server/socket.h"
44 #include "pw_stream/std_file_stream.h"
45 #include "pw_thread/thread.h"
46 #include "pw_thread_stl/options.h"
47 #include "pw_transfer/integration_test/config.pb.h"
48 #include "pw_transfer/transfer.h"
49
50 namespace pw::transfer {
51 namespace {
52
53 // This is the maximum size of the socket send buffers. Ideally, this is set
54 // to the lowest allowed value to minimize buffering between the proxy and
55 // clients so rate limiting causes the client to block and wait for the
56 // integration test proxy to drain rather than allowing OS buffers to backlog
57 // large quantities of data.
58 //
59 // Note that the OS may chose to not strictly follow this requested buffer size.
60 // Still, setting this value to be as small as possible does reduce bufer sizes
61 // significantly enough to better reflect typical inter-device communication.
62 //
63 // For this to be effective, servers should also configure their sockets to a
64 // smaller receive buffer size.
65 constexpr int kMaxSocketSendBufferSize = 1;
66
67 class FileTransferHandler final : public ReadWriteHandler {
68 public:
FileTransferHandler(uint32_t resource_id,std::deque<std::string> && sources,std::deque<std::string> && destinations,std::string default_source_path,std::string default_destination_path,bool offsettable)69 FileTransferHandler(uint32_t resource_id,
70 std::deque<std::string>&& sources,
71 std::deque<std::string>&& destinations,
72 std::string default_source_path,
73 std::string default_destination_path,
74 bool offsettable)
75 : ReadWriteHandler(resource_id),
76 sources_(sources),
77 destinations_(destinations),
78 default_source_path_(default_source_path),
79 default_destination_path_(default_destination_path),
80 offsettable(offsettable) {}
81
82 ~FileTransferHandler() override = default;
83
PrepareRead()84 Status PrepareRead() final {
85 if (sources_.empty() && default_source_path_.empty()) {
86 PW_LOG_ERROR("Source paths exhausted");
87 return Status::ResourceExhausted();
88 }
89
90 std::string path;
91 if (!sources_.empty()) {
92 path = sources_.front();
93 sources_.pop_front();
94 } else {
95 path = default_source_path_;
96 }
97
98 PW_LOG_DEBUG("Preparing read for file %s", path.c_str());
99 set_reader(stream_.emplace<stream::StdFileReader>(path.c_str()));
100 return OkStatus();
101 }
102
PrepareRead(uint32_t offset)103 Status PrepareRead(uint32_t offset) final {
104 if (!offsettable) {
105 return Status::Unimplemented();
106 }
107
108 if (Status status = PrepareRead(); !status.ok()) {
109 return status;
110 }
111
112 if (offset >
113 std::get<stream::StdFileReader>(stream_).ConservativeReadLimit()) {
114 return Status::ResourceExhausted();
115 }
116
117 return std::get<stream::StdFileReader>(stream_).Seek(offset);
118 }
119
FinalizeRead(Status)120 void FinalizeRead(Status) final {
121 std::get<stream::StdFileReader>(stream_).Close();
122 }
123
PrepareWrite()124 Status PrepareWrite() final {
125 if (destinations_.empty() && default_destination_path_.empty()) {
126 PW_LOG_ERROR("Destination paths exhausted");
127 return Status::ResourceExhausted();
128 }
129
130 std::string path;
131 if (!destinations_.empty()) {
132 path = destinations_.front();
133 destinations_.pop_front();
134 } else {
135 path = default_destination_path_;
136 }
137
138 PW_LOG_DEBUG("Preparing write for file %s", path.c_str());
139 set_writer(stream_.emplace<stream::StdFileWriter>(path.c_str()));
140 return OkStatus();
141 }
142
PrepareWrite(uint32_t offset)143 Status PrepareWrite(uint32_t offset) final {
144 if (!offsettable) {
145 return Status::Unimplemented();
146 }
147
148 if (Status status = PrepareWrite(); !status.ok()) {
149 return status;
150 }
151
152 // It does not appear possible to hit this limit
153 if (offset >
154 std::get<stream::StdFileWriter>(stream_).ConservativeWriteLimit()) {
155 return Status::ResourceExhausted();
156 }
157
158 return std::get<stream::StdFileWriter>(stream_).Seek(offset);
159 }
160
FinalizeWrite(Status)161 Status FinalizeWrite(Status) final {
162 std::get<stream::StdFileWriter>(stream_).Close();
163 return OkStatus();
164 }
165
166 private:
167 std::deque<std::string> sources_;
168 std::deque<std::string> destinations_;
169 std::string default_source_path_;
170 std::string default_destination_path_;
171 std::variant<std::monostate, stream::StdFileReader, stream::StdFileWriter>
172 stream_;
173 bool offsettable;
174 };
175
RunServer(int socket_port,ServerConfig config)176 void RunServer(int socket_port, ServerConfig config) {
177 std::vector<std::byte> chunk_buffer(config.chunk_size_bytes());
178 std::vector<std::byte> encode_buffer(config.chunk_size_bytes());
179 transfer::Thread<4, 4> transfer_thread(chunk_buffer, encode_buffer);
180 TransferService transfer_service(
181 transfer_thread,
182 config.pending_bytes(),
183 std::chrono::seconds(config.chunk_timeout_seconds()),
184 config.transfer_service_retries(),
185 config.extend_window_divisor());
186
187 rpc::system_server::set_socket_port(socket_port);
188
189 rpc::system_server::Init();
190 rpc::system_server::Server().RegisterService(transfer_service);
191
192 // Start transfer thread.
193 pw::Thread transfer_thread_handle(thread::stl::Options(), transfer_thread);
194
195 int retval =
196 rpc::system_server::SetServerSockOpt(SOL_SOCKET,
197 SO_SNDBUF,
198 &kMaxSocketSendBufferSize,
199 sizeof(kMaxSocketSendBufferSize));
200 PW_CHECK_INT_EQ(retval,
201 0,
202 "Failed to configure socket send buffer size with errno=%d",
203 errno);
204
205 std::vector<std::unique_ptr<FileTransferHandler>> handlers;
206 for (const auto& resource : config.resources()) {
207 uint32_t id = resource.first;
208
209 std::deque<std::string> source_paths(resource.second.source_paths().begin(),
210 resource.second.source_paths().end());
211 std::deque<std::string> destination_paths(
212 resource.second.destination_paths().begin(),
213 resource.second.destination_paths().end());
214
215 auto handler = std::make_unique<FileTransferHandler>(
216 id,
217 std::move(source_paths),
218 std::move(destination_paths),
219 resource.second.default_source_path(),
220 resource.second.default_destination_path(),
221 resource.second.offsettable());
222
223 transfer_service.RegisterHandler(*handler);
224 handlers.push_back(std::move(handler));
225 }
226
227 PW_LOG_INFO("Starting pw_rpc server");
228 PW_CHECK_OK(rpc::system_server::Start());
229
230 // Unregister transfer handler before cleaning up the thread since doing so
231 // requires the transfer thread to be running.
232 for (auto& handler : handlers) {
233 transfer_service.UnregisterHandler(*handler);
234 }
235
236 // End transfer thread.
237 transfer_thread.Terminate();
238 transfer_thread_handle.join();
239 }
240
241 } // namespace
242 } // namespace pw::transfer
243
main(int argc,char * argv[])244 int main(int argc, char* argv[]) {
245 if (argc != 2) {
246 PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
247 return 1;
248 }
249
250 int port = std::atoi(argv[1]);
251 PW_CHECK_UINT_GT(port, 0, "Invalid port!");
252
253 std::string config_string;
254 std::string line;
255 while (std::getline(std::cin, line)) {
256 config_string = config_string + line + '\n';
257 }
258 pw::transfer::ServerConfig config;
259
260 bool ok =
261 google::protobuf::TextFormat::ParseFromString(config_string, &config);
262 if (!ok) {
263 PW_LOG_INFO("Failed to parse config: %s", config_string.c_str());
264 PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
265 return 1;
266 } else {
267 PW_LOG_INFO("Server loaded config:\n%s", config.DebugString().c_str());
268 }
269
270 pw::transfer::RunServer(port, config);
271 return 0;
272 }
273