1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #define PW_LOG_MODULE_NAME "TRN"
16
17 #include "pw_transfer/client.h"
18
19 #include "pw_log/log.h"
20
21 namespace pw::transfer {
22
Read(uint32_t resource_id,stream::Writer & output,CompletionFunc && on_completion,chrono::SystemClock::duration timeout,ProtocolVersion protocol_version)23 Status Client::Read(uint32_t resource_id,
24 stream::Writer& output,
25 CompletionFunc&& on_completion,
26 chrono::SystemClock::duration timeout,
27 ProtocolVersion protocol_version) {
28 if (on_completion == nullptr ||
29 protocol_version == ProtocolVersion::kUnknown) {
30 return Status::InvalidArgument();
31 }
32
33 if (!has_read_stream_) {
34 rpc::RawClientReaderWriter read_stream = client_.Read(
35 [this](ConstByteSpan chunk) {
36 transfer_thread_.ProcessClientChunk(chunk);
37 },
38 [this](Status status) {
39 OnRpcError(status, internal::TransferType::kReceive);
40 });
41 transfer_thread_.SetClientReadStream(read_stream);
42 has_read_stream_ = true;
43 }
44
45 transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
46 protocol_version,
47 resource_id,
48 &output,
49 max_parameters_,
50 std::move(on_completion),
51 timeout,
52 max_retries_,
53 max_lifetime_retries_);
54 return OkStatus();
55 }
56
Write(uint32_t resource_id,stream::Reader & input,CompletionFunc && on_completion,chrono::SystemClock::duration timeout,ProtocolVersion protocol_version)57 Status Client::Write(uint32_t resource_id,
58 stream::Reader& input,
59 CompletionFunc&& on_completion,
60 chrono::SystemClock::duration timeout,
61 ProtocolVersion protocol_version) {
62 if (on_completion == nullptr ||
63 protocol_version == ProtocolVersion::kUnknown) {
64 return Status::InvalidArgument();
65 }
66
67 if (!has_write_stream_) {
68 rpc::RawClientReaderWriter write_stream = client_.Write(
69 [this](ConstByteSpan chunk) {
70 transfer_thread_.ProcessClientChunk(chunk);
71 },
72 [this](Status status) {
73 OnRpcError(status, internal::TransferType::kTransmit);
74 });
75 transfer_thread_.SetClientWriteStream(write_stream);
76 has_write_stream_ = true;
77 }
78
79 transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
80 protocol_version,
81 resource_id,
82 &input,
83 max_parameters_,
84 std::move(on_completion),
85 timeout,
86 max_retries_,
87 max_lifetime_retries_);
88
89 return OkStatus();
90 }
91
CancelTransfer(uint32_t resource_id)92 void Client::CancelTransfer(uint32_t resource_id) {
93 transfer_thread_.EndClientTransfer(
94 resource_id, Status::Cancelled(), /*send_status_chunk=*/true);
95 }
96
OnRpcError(Status status,internal::TransferType type)97 void Client::OnRpcError(Status status, internal::TransferType type) {
98 bool is_write_error = type == internal::TransferType::kTransmit;
99
100 PW_LOG_ERROR("Client %s stream terminated with status %d",
101 is_write_error ? "Write()" : "Read()",
102 status.code());
103
104 if (is_write_error) {
105 has_write_stream_ = false;
106 } else {
107 has_read_stream_ = false;
108 }
109 }
110
111 } // namespace pw::transfer
112