• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // Client binary for the cross-language integration test.
16 //
17 // Usage:
18 //  bazel-bin/pw_transfer/integration_test_client 3300 <<< "resource_id: 12
19 //  file: '/tmp/myfile.txt'"
20 //
21 // WORK IN PROGRESS, SEE b/228516801
22 #include "pw_transfer/client.h"
23 
24 #include <sys/socket.h>
25 
26 #include <cstddef>
27 #include <cstdio>
28 
29 #include "google/protobuf/text_format.h"
30 #include "pw_assert/check.h"
31 #include "pw_log/log.h"
32 #include "pw_rpc/channel.h"
33 #include "pw_rpc/integration_testing.h"
34 #include "pw_status/status.h"
35 #include "pw_status/try.h"
36 #include "pw_stream/std_file_stream.h"
37 #include "pw_sync/binary_semaphore.h"
38 #include "pw_thread/thread.h"
39 #include "pw_thread_stl/options.h"
40 #include "pw_transfer/integration_test/config.pb.h"
41 #include "pw_transfer/transfer_thread.h"
42 
43 namespace pw::transfer::integration_test {
44 namespace {
45 
46 // This is the maximum size of the socket send buffers. Ideally, this is set
47 // to the lowest allowed value to minimize buffering between the proxy and
48 // clients so rate limiting causes the client to block and wait for the
49 // integration test proxy to drain rather than allowing OS buffers to backlog
50 // large quantities of data.
51 //
52 // Note that the OS may chose to not strictly follow this requested buffer size.
53 // Still, setting this value to be as small as possible does reduce bufer sizes
54 // significantly enough to better reflect typical inter-device communication.
55 //
56 // For this to be effective, servers should also configure their sockets to a
57 // smaller receive buffer size.
58 constexpr int kMaxSocketSendBufferSize = 1;
59 
60 constexpr size_t kDefaultMaxWindowSizeBytes = 16384;
61 
TransferThreadOptions()62 thread::Options& TransferThreadOptions() {
63   static thread::stl::Options options;
64   return options;
65 }
66 
67 // Transfer status, valid only after semaphore is acquired.
68 //
69 // We need to bundle the status and semaphore together because a pw_function
70 // callback can at most capture the reference to one variable (and we need to
71 // both set the status and release the semaphore).
72 struct TransferResult {
73   Status status = Status::Unknown();
74   sync::BinarySemaphore completed;
75 };
76 
77 // Create a pw_transfer client and perform the transfer actions.
PerformTransferActions(const pw::transfer::ClientConfig & config)78 pw::Status PerformTransferActions(const pw::transfer::ClientConfig& config) {
79   constexpr size_t kMaxPayloadSize = rpc::MaxSafePayloadSize();
80   std::byte chunk_buffer[kMaxPayloadSize];
81   std::byte encode_buffer[kMaxPayloadSize];
82   transfer::Thread<2, 2> transfer_thread(chunk_buffer, encode_buffer);
83   pw::Thread system_thread(TransferThreadOptions(), transfer_thread);
84 
85   // As much as we don't want to dynamically allocate an array,
86   // variable length arrays (VLA) are nonstandard, and a std::vector could cause
87   // references to go stale if the vector's underlying buffer is resized. This
88   // array of TransferResults needs to outlive the loop that performs the
89   // actual transfer actions due to how some references to TransferResult
90   // may persist beyond the lifetime of a transfer.
91   const int num_actions = config.transfer_actions().size();
92   auto transfer_results = std::make_unique<TransferResult[]>(num_actions);
93 
94   pw::transfer::Client client(rpc::integration_test::client(),
95                               rpc::integration_test::kChannelId,
96                               transfer_thread,
97                               kDefaultMaxWindowSizeBytes);
98 
99   if (config.max_retries() > 0) {
100     if (client.set_max_retries(config.max_retries()).IsInvalidArgument()) {
101       PW_LOG_ERROR("Invalid max_retries count: %u",
102                    static_cast<unsigned>(config.max_retries()));
103       return Status::InvalidArgument();
104     }
105   }
106 
107   if (config.max_lifetime_retries() > 0) {
108     if (client.set_max_lifetime_retries(config.max_lifetime_retries())
109             .IsInvalidArgument()) {
110       PW_LOG_ERROR("Invalid max_lifetime_retries count: %u",
111                    static_cast<unsigned>(config.max_retries()));
112       return Status::InvalidArgument();
113     }
114   }
115 
116   Status status = pw::OkStatus();
117   for (int i = 0; i < num_actions; i++) {
118     const pw::transfer::TransferAction& action = config.transfer_actions()[i];
119     TransferResult& result = transfer_results[i];
120     // If no protocol version is specified, default to the latest version.
121     pw::transfer::ProtocolVersion protocol_version =
122         action.protocol_version() ==
123                 pw::transfer::TransferAction::ProtocolVersion::
124                     TransferAction_ProtocolVersion_UNKNOWN_VERSION
125             ? pw::transfer::ProtocolVersion::kLatest
126             : static_cast<pw::transfer::ProtocolVersion>(
127                   action.protocol_version());
128     if (action.transfer_type() ==
129         pw::transfer::TransferAction::TransferType::
130             TransferAction_TransferType_WRITE_TO_SERVER) {
131       pw::stream::StdFileReader input(action.file_path().c_str());
132       pw::Result<pw::transfer::Client::Handle> handle = client.Write(
133           action.resource_id(),
134           input,
135           [&result](Status status) {
136             result.status = status;
137             result.completed.release();
138           },
139           protocol_version,
140           pw::transfer::cfg::kDefaultClientTimeout,
141           pw::transfer::cfg::kDefaultInitialChunkTimeout,
142           action.initial_offset());
143       if (handle.ok()) {
144         // Wait for the transfer to complete. We need to do this here so that
145         // the StdFileReader doesn't go out of scope.
146         result.completed.acquire();
147       } else {
148         result.status = handle.status();
149       }
150 
151       input.Close();
152 
153     } else if (action.transfer_type() ==
154                pw::transfer::TransferAction::TransferType::
155                    TransferAction_TransferType_READ_FROM_SERVER) {
156       pw::stream::StdFileWriter output(action.file_path().c_str());
157       pw::Result<pw::transfer::Client::Handle> handle = client.Read(
158           action.resource_id(),
159           output,
160           [&result](Status status) {
161             result.status = status;
162             result.completed.release();
163           },
164           protocol_version,
165           pw::transfer::cfg::kDefaultClientTimeout,
166           pw::transfer::cfg::kDefaultInitialChunkTimeout,
167           action.initial_offset());
168       if (handle.ok()) {
169         // Wait for the transfer to complete.
170         result.completed.acquire();
171       } else {
172         result.status = handle.status();
173       }
174 
175       output.Close();
176     } else {
177       PW_LOG_ERROR("Unrecognized transfer action type %d",
178                    action.transfer_type());
179       status = pw::Status::InvalidArgument();
180       break;
181     }
182 
183     if (int(result.status.code()) != int(action.expected_status())) {
184       PW_LOG_ERROR("Failed to perform action:\n%s",
185                    action.DebugString().c_str());
186       status = result.status.ok() ? Status::Unknown() : result.status;
187       break;
188     }
189   }
190 
191   transfer_thread.Terminate();
192 
193   system_thread.join();
194 
195   // The RPC thread must join before destroying transfer objects as the transfer
196   // service may still reference the transfer thread or transfer client objects.
197   pw::rpc::integration_test::TerminateClient();
198   return status;
199 }
200 
201 }  // namespace
202 }  // namespace pw::transfer::integration_test
203 
main(int argc,char * argv[])204 int main(int argc, char* argv[]) {
205   if (argc < 2) {
206     PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
207     return 1;
208   }
209 
210   const int port = std::atoi(argv[1]);
211 
212   std::string config_string;
213   std::string line;
214   while (std::getline(std::cin, line)) {
215     config_string = config_string + line + '\n';
216   }
217   pw::transfer::ClientConfig config;
218 
219   bool ok =
220       google::protobuf::TextFormat::ParseFromString(config_string, &config);
221   if (!ok) {
222     PW_LOG_INFO("Failed to parse config: %s", config_string.c_str());
223     PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]);
224     return 1;
225   } else {
226     PW_LOG_INFO("Client loaded config:\n%s", config.DebugString().c_str());
227   }
228 
229   if (!pw::rpc::integration_test::InitializeClient(port).ok()) {
230     return 1;
231   }
232 
233   int retval = pw::rpc::integration_test::SetClientSockOpt(
234       SOL_SOCKET,
235       SO_SNDBUF,
236       &pw::transfer::integration_test::kMaxSocketSendBufferSize,
237       sizeof(pw::transfer::integration_test::kMaxSocketSendBufferSize));
238   PW_CHECK_INT_EQ(retval,
239                   0,
240                   "Failed to configure socket send buffer size with errno=%d",
241                   errno);
242 
243   if (!pw::transfer::integration_test::PerformTransferActions(config).ok()) {
244     PW_LOG_INFO("Failed to transfer!");
245     return 1;
246   }
247   return 0;
248 }
249