1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #include "pw_channel/forwarding_channel.h" 16 17 namespace pw::channel::internal { 18 19 async2::Poll<Result<multibuf::MultiBuf>> DoPendRead(async2::Context & cx)20ForwardingChannel<DataType::kDatagram>::DoPendRead(async2::Context& cx) 21 PW_NO_LOCK_SAFETY_ANALYSIS { 22 std::lock_guard lock(pair_.mutex_); 23 if (pair_.closed_) { 24 return Status::FailedPrecondition(); 25 } 26 if (!read_queue_.has_value()) { 27 waker_ = cx.GetWaker(async2::WaitReason::Unspecified()); 28 return async2::Pending(); 29 } 30 auto read_data = std::move(*read_queue_); 31 read_queue_.reset(); 32 std::move(sibling_.waker_).Wake(); 33 return read_data; 34 } 35 DoPendReadyToWrite(async2::Context & cx)36async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendReadyToWrite( 37 async2::Context& cx) PW_NO_LOCK_SAFETY_ANALYSIS { 38 std::lock_guard lock(pair_.mutex_); 39 if (pair_.closed_) { 40 return Status::FailedPrecondition(); 41 } 42 if (sibling_.read_queue_.has_value()) { 43 waker_ = cx.GetWaker(async2::WaitReason::Unspecified()); 44 return async2::Pending(); 45 } 46 return async2::Ready(OkStatus()); 47 } 48 DoWrite(multibuf::MultiBuf && data)49Result<channel::WriteToken> ForwardingChannel<DataType::kDatagram>::DoWrite( 50 multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS { 51 std::lock_guard lock(pair_.mutex_); 52 if (pair_.closed_) { 53 return Status::FailedPrecondition(); 54 } 55 PW_DASSERT(!sibling_.read_queue_.has_value()); 56 sibling_.read_queue_ = std::move(data); 57 const uint32_t token = ++write_token_; 58 std::move(sibling_.waker_).Wake(); 59 return CreateWriteToken(token); 60 } 61 62 async2::Poll<Result<channel::WriteToken>> DoPendFlush(async2::Context &)63ForwardingChannel<DataType::kDatagram>::DoPendFlush(async2::Context&) { 64 std::lock_guard lock(pair_.mutex_); 65 if (pair_.closed_) { 66 return Status::FailedPrecondition(); 67 } 68 return async2::Ready(CreateWriteToken(write_token_)); 69 } 70 DoPendClose(async2::Context &)71async2::Poll<Status> ForwardingChannel<DataType::kDatagram>::DoPendClose( 72 async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS { 73 std::lock_guard lock(pair_.mutex_); 74 if (pair_.closed_) { 75 return Status::FailedPrecondition(); 76 } 77 pair_.closed_ = true; 78 read_queue_.reset(); 79 std::move(sibling_.waker_).Wake(); 80 return OkStatus(); 81 } 82 83 async2::Poll<Result<multibuf::MultiBuf>> DoPendRead(async2::Context & cx)84ForwardingChannel<DataType::kByte>::DoPendRead(async2::Context& cx) { 85 std::lock_guard lock(pair_.mutex_); 86 if (pair_.closed_) { 87 return Status::FailedPrecondition(); 88 } 89 if (read_queue_.empty()) { 90 read_waker_ = cx.GetWaker(async2::WaitReason::Unspecified()); 91 return async2::Pending(); 92 } 93 auto read_data = std::move(read_queue_); 94 read_queue_ = {}; 95 return read_data; 96 } 97 DoWrite(multibuf::MultiBuf && data)98Result<channel::WriteToken> ForwardingChannel<DataType::kByte>::DoWrite( 99 multibuf::MultiBuf&& data) PW_NO_LOCK_SAFETY_ANALYSIS { 100 std::lock_guard lock(pair_.mutex_); 101 if (pair_.closed_) { 102 return Status::FailedPrecondition(); 103 } 104 if (data.empty()) { 105 return CreateWriteToken(write_token_); // no data, nothing to do 106 } 107 108 write_token_ += data.size(); 109 sibling_.read_queue_.PushSuffix(std::move(data)); 110 return CreateWriteToken(write_token_); 111 } 112 113 async2::Poll<Result<channel::WriteToken>> DoPendFlush(async2::Context &)114ForwardingChannel<DataType::kByte>::DoPendFlush(async2::Context&) { 115 std::lock_guard lock(pair_.mutex_); 116 if (pair_.closed_) { 117 return Status::FailedPrecondition(); 118 } 119 return async2::Ready(CreateWriteToken(write_token_)); 120 } 121 DoPendClose(async2::Context &)122async2::Poll<Status> ForwardingChannel<DataType::kByte>::DoPendClose( 123 async2::Context&) PW_NO_LOCK_SAFETY_ANALYSIS { 124 std::lock_guard lock(pair_.mutex_); 125 if (pair_.closed_) { 126 return Status::FailedPrecondition(); 127 } 128 pair_.closed_ = true; 129 read_queue_.Release(); 130 std::move(sibling_.read_waker_).Wake(); 131 return OkStatus(); 132 } 133 134 } // namespace pw::channel::internal 135