• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #define PW_LOG_MODULE_NAME "TRN"
16 
17 #include "pw_transfer/client.h"
18 
19 #include "pw_log/log.h"
20 
21 namespace pw::transfer {
22 
Read(uint32_t transfer_id,stream::Writer & output,CompletionFunc && on_completion,chrono::SystemClock::duration timeout)23 Status Client::Read(uint32_t transfer_id,
24                     stream::Writer& output,
25                     CompletionFunc&& on_completion,
26                     chrono::SystemClock::duration timeout) {
27   if (on_completion == nullptr) {
28     return Status::InvalidArgument();
29   }
30 
31   if (!has_read_stream_) {
32     rpc::RawClientReaderWriter read_stream = client_.Read(
33         [this](ConstByteSpan chunk) {
34           transfer_thread_.ProcessClientChunk(chunk);
35         },
36         [this](Status status) {
37           OnRpcError(status, internal::TransferType::kReceive);
38         });
39     transfer_thread_.SetClientReadStream(read_stream);
40     has_read_stream_ = true;
41   }
42 
43   transfer_thread_.StartClientTransfer(internal::TransferType::kReceive,
44                                        transfer_id,
45                                        transfer_id,
46                                        &output,
47                                        max_parameters_,
48                                        std::move(on_completion),
49                                        timeout,
50                                        cfg::kDefaultMaxRetries);
51   return OkStatus();
52 }
53 
Write(uint32_t transfer_id,stream::Reader & input,CompletionFunc && on_completion,chrono::SystemClock::duration timeout)54 Status Client::Write(uint32_t transfer_id,
55                      stream::Reader& input,
56                      CompletionFunc&& on_completion,
57                      chrono::SystemClock::duration timeout) {
58   if (on_completion == nullptr) {
59     return Status::InvalidArgument();
60   }
61 
62   if (!has_write_stream_) {
63     rpc::RawClientReaderWriter write_stream = client_.Write(
64         [this](ConstByteSpan chunk) {
65           transfer_thread_.ProcessClientChunk(chunk);
66         },
67         [this](Status status) {
68           OnRpcError(status, internal::TransferType::kTransmit);
69         });
70     transfer_thread_.SetClientWriteStream(write_stream);
71     has_write_stream_ = true;
72   }
73 
74   transfer_thread_.StartClientTransfer(internal::TransferType::kTransmit,
75                                        transfer_id,
76                                        transfer_id,
77                                        &input,
78                                        max_parameters_,
79                                        std::move(on_completion),
80                                        timeout,
81                                        cfg::kDefaultMaxRetries);
82 
83   return OkStatus();
84 }
85 
OnRpcError(Status status,internal::TransferType type)86 void Client::OnRpcError(Status status, internal::TransferType type) {
87   bool is_write_error = type == internal::TransferType::kTransmit;
88 
89   PW_LOG_ERROR("Client %s stream terminated with status %d",
90                is_write_error ? "Write()" : "Read()",
91                status.code());
92 
93   if (is_write_error) {
94     has_write_stream_ = false;
95   } else {
96     has_read_stream_ = false;
97   }
98 }
99 
100 }  // namespace pw::transfer
101