• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_channel/forwarding_channel.h"
16 
17 namespace pw::channel::internal {
18 
19 async2::Poll<Result<multibuf::MultiBuf>>
DoPendRead(async2::Context & cx)20 ForwardingChannel<DataType::kDatagram>::DoPendRead(async2::Context& cx)
21     PW_NO_LOCK_SAFETY_ANALYSIS {
22   std::lock_guard lock(pair_.mutex_);
23   if (pair_.closed_) {
24     return Status::FailedPrecondition();
25   }
26   if (!read_queue_.has_value()) {
27     waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
28     return async2::Pending();
29   }
30   auto read_data = std::move(*read_queue_);
31   read_queue_.reset();
32   std::move(sibling_.waker_).Wake();
33   return read_data;
34 }
35 
DoPendReadyToWrite(async2::Context & cx)36 async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendReadyToWrite(
37     async2::Context& cx) PW_NO_LOCK_SAFETY_ANALYSIS {
38   std::lock_guard lock(pair_.mutex_);
39   if (pair_.closed_) {
40     return Status::FailedPrecondition();
41   }
42   if (sibling_.read_queue_.has_value()) {
43     waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
44     return async2::Pending();
45   }
46   return async2::Ready(OkStatus());
47 }
48 
DoWrite(multibuf::MultiBuf && data)49 Result<channel::WriteToken> ForwardingChannel<DataType::kDatagram>::DoWrite(
50     multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
51   std::lock_guard lock(pair_.mutex_);
52   if (pair_.closed_) {
53     return Status::FailedPrecondition();
54   }
55   PW_DASSERT(!sibling_.read_queue_.has_value());
56   sibling_.read_queue_ = std::move(data);
57   const uint32_t token = ++write_token_;
58   std::move(sibling_.waker_).Wake();
59   return CreateWriteToken(token);
60 }
61 
62 async2::Poll<Result<channel::WriteToken>>
DoPendFlush(async2::Context &)63 ForwardingChannel<DataType::kDatagram>::DoPendFlush(async2::Context&) {
64   std::lock_guard lock(pair_.mutex_);
65   if (pair_.closed_) {
66     return Status::FailedPrecondition();
67   }
68   return async2::Ready(CreateWriteToken(write_token_));
69 }
70 
DoPendClose(async2::Context &)71 async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendClose(
72     async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS {
73   std::lock_guard lock(pair_.mutex_);
74   if (pair_.closed_) {
75     return Status::FailedPrecondition();
76   }
77   pair_.closed_ = true;
78   read_queue_.reset();
79   std::move(sibling_.waker_).Wake();
80   return OkStatus();
81 }
82 
83 async2::Poll<Result<multibuf::MultiBuf>>
DoPendRead(async2::Context & cx)84 ForwardingChannel<DataType::kByte>::DoPendRead(async2::Context& cx) {
85   std::lock_guard lock(pair_.mutex_);
86   if (pair_.closed_) {
87     return Status::FailedPrecondition();
88   }
89   if (read_queue_.empty()) {
90     read_waker_ = cx.GetWaker(async2::WaitReason::Unspecified());
91     return async2::Pending();
92   }
93   auto read_data = std::move(read_queue_);
94   read_queue_ = {};
95   return read_data;
96 }
97 
DoWrite(multibuf::MultiBuf && data)98 Result<channel::WriteToken> ForwardingChannel<DataType::kByte>::DoWrite(
99     multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS {
100   std::lock_guard lock(pair_.mutex_);
101   if (pair_.closed_) {
102     return Status::FailedPrecondition();
103   }
104   if (data.empty()) {
105     return CreateWriteToken(write_token_);  // no data, nothing to do
106   }
107 
108   write_token_ += data.size();
109   sibling_.read_queue_.PushSuffix(std::move(data));
110   return CreateWriteToken(write_token_);
111 }
112 
113 async2::Poll<Result<channel::WriteToken>>
DoPendFlush(async2::Context &)114 ForwardingChannel<DataType::kByte>::DoPendFlush(async2::Context&) {
115   std::lock_guard lock(pair_.mutex_);
116   if (pair_.closed_) {
117     return Status::FailedPrecondition();
118   }
119   return async2::Ready(CreateWriteToken(write_token_));
120 }
121 
DoPendClose(async2::Context &)122 async2::Poll<Status> ForwardingChannel<DataType::kByte>::DoPendClose(
123     async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS {
124   std::lock_guard lock(pair_.mutex_);
125   if (pair_.closed_) {
126     return Status::FailedPrecondition();
127   }
128   pair_.closed_ = true;
129   read_queue_.Release();
130   std::move(sibling_.read_waker_).Wake();
131   return OkStatus();
132 }
133 
134 }  // namespace pw::channel::internal
135