1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16
17 #include "pw_transfer/client.h"
18
19 #include "pw_log/log.h"
20
21 namespace pw::transfer {
22
Read(uint32_t transfer_id,stream::Writer & output,CompletionFunc && on_completion,chrono::SystemClock::duration timeout)23 Status Client::Read(uint32_t transfer_id,
24 stream::Writer& output,
25 CompletionFunc&& on_completion,
26 chrono::SystemClock::duration timeout) {
27 if (on_completion == nullptr) {
28 return Status::InvalidArgument();
29 }
30
31 if (!has_read_stream_) {
32 rpc::RawClientReaderWriter read_stream = client_.Read(
33 [this](ConstByteSpan chunk) {
34 transfer_thread_.ProcessClientChunk(chunk);
35 },
36 [this](Status status) {
37 OnRpcError(status, internal::TransferType::kReceive);
38 });
39 transfer_thread_.SetClientReadStream(read_stream);
40 has_read_stream_ = true;
41 }
42
43 transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
44 transfer_id,
45 transfer_id,
46 &output,
47 max_parameters_,
48 std::move(on_completion),
49 timeout,
50 cfg::kDefaultMaxRetries);
51 return OkStatus();
52 }
53
Write(uint32_t transfer_id,stream::Reader & input,CompletionFunc && on_completion,chrono::SystemClock::duration timeout)54 Status Client::Write(uint32_t transfer_id,
55 stream::Reader& input,
56 CompletionFunc&& on_completion,
57 chrono::SystemClock::duration timeout) {
58 if (on_completion == nullptr) {
59 return Status::InvalidArgument();
60 }
61
62 if (!has_write_stream_) {
63 rpc::RawClientReaderWriter write_stream = client_.Write(
64 [this](ConstByteSpan chunk) {
65 transfer_thread_.ProcessClientChunk(chunk);
66 },
67 [this](Status status) {
68 OnRpcError(status, internal::TransferType::kTransmit);
69 });
70 transfer_thread_.SetClientWriteStream(write_stream);
71 has_write_stream_ = true;
72 }
73
74 transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
75 transfer_id,
76 transfer_id,
77 &input,
78 max_parameters_,
79 std::move(on_completion),
80 timeout,
81 cfg::kDefaultMaxRetries);
82
83 return OkStatus();
84 }
85
OnRpcError(Status status,internal::TransferType type)86 void Client::OnRpcError(Status status, internal::TransferType type) {
87 bool is_write_error = type == internal::TransferType::kTransmit;
88
89 PW_LOG_ERROR("Client %s stream terminated with status %d",
90 is_write_error ? "Write()" : "Read()",
91 status.code());
92
93 if (is_write_error) {
94 has_write_stream_ = false;
95 } else {
96 has_read_stream_ = false;
97 }
98 }
99
100 } // namespace pw::transfer
101