// Copyright 2024 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include "pw_channel/rp2_stdio_channel.h" #include "pico/stdlib.h" #include "pw_assert/check.h" #include "pw_async2/dispatcher_base.h" #include "pw_log/log.h" #include "pw_multibuf/allocator.h" #include "pw_multibuf/allocator_async.h" #include "pw_multibuf/multibuf.h" #include "pw_status/status.h" namespace pw::channel { namespace { using ::pw::async2::Context; using ::pw::async2::Pending; using ::pw::async2::Poll; using ::pw::async2::Ready; using ::pw::async2::Waker; using ::pw::multibuf::MultiBuf; using ::pw::multibuf::MultiBufAllocationFuture; using ::pw::multibuf::MultiBufAllocator; Waker global_chars_available_waker; void InitStdio() { stdio_init_all(); stdio_set_chars_available_callback( []([[maybe_unused]] void* arg) { std::move(global_chars_available_waker).Wake(); }, nullptr); } void WriteMultiBuf(const MultiBuf& buf) { for (std::byte b : buf) { putchar_raw(static_cast(b)); } } Poll PollReadByte(Context& cx) { int c = getchar_timeout_us(0); if (c == PICO_ERROR_TIMEOUT) { PW_ASYNC_STORE_WAKER( cx, global_chars_available_waker, "RP2StdioChannel is waiting for stdio chars available"); // Read again to ensure that no race occurred. // // The concern is an interleaving like this // Thread one: get_char is called and times out // Thread two: char becomes available, Wake is called // Thread one: sets Waker // // In this interleaving, the task on Thread one is never awoken, // so we must check for available characters *after* setting the Waker. c = getchar_timeout_us(0); if (c == PICO_ERROR_TIMEOUT) { return Pending(); } } return static_cast(c); } // Channel implementation which writes to and reads from rp2040's stdio. // // NOTE: only one Rp2StdioChannel may be in existence. class Rp2StdioChannel final : public pw::channel::Implement { public: Rp2StdioChannel(MultiBufAllocator& read_allocator, MultiBufAllocator& write_allocator) : read_allocation_future_(read_allocator), write_allocation_future_(write_allocator), buffer_(std::nullopt) {} Rp2StdioChannel(const Rp2StdioChannel&) = delete; Rp2StdioChannel& operator=(const Rp2StdioChannel&) = delete; // This is a singleton static, so there's no need for move constructors. Rp2StdioChannel(Rp2StdioChannel&&) = delete; Rp2StdioChannel& operator=(Rp2StdioChannel&&) = delete; private: static constexpr size_t kMinimumReadSize = 64; static constexpr size_t kDesiredReadSize = 1024; Poll PendGetReadBuffer(Context& cx); Poll> DoPendRead(Context& cx) override; Poll DoPendReadyToWrite(Context& cx) override; Poll> DoPendAllocateWriteBuffer( Context& cx, size_t min_bytes) override { write_allocation_future_.SetDesiredSize(min_bytes); return write_allocation_future_.Pend(cx); } Status DoStageWrite(MultiBuf&& data) override; Poll DoPendWrite(Context&) override { return OkStatus(); } Poll DoPendClose(Context&) override { return Ready(OkStatus()); } MultiBufAllocationFuture read_allocation_future_; MultiBufAllocationFuture write_allocation_future_; std::optional buffer_; }; Poll Rp2StdioChannel::PendGetReadBuffer(Context& cx) { if (buffer_.has_value()) { return OkStatus(); } read_allocation_future_.SetDesiredSizes( kMinimumReadSize, kDesiredReadSize, pw::multibuf::kNeedsContiguous); Poll> maybe_multibuf = read_allocation_future_.Pend(cx); if (maybe_multibuf.IsPending()) { return Pending(); } if (!maybe_multibuf->has_value()) { PW_LOG_ERROR("Failed to allocate multibuf for reading"); return Status::ResourceExhausted(); } buffer_ = std::move(**maybe_multibuf); return OkStatus(); } Poll> Rp2StdioChannel::DoPendRead(Context& cx) { Poll buffer_ready = PendGetReadBuffer(cx); if (buffer_ready.IsPending() || !buffer_ready->ok()) { return buffer_ready; } size_t len = 0; for (std::byte& b : *buffer_) { Poll next = PollReadByte(cx); if (next.IsPending()) { break; } b = *next; ++len; } if (len == 0) { return Pending(); } buffer_->Truncate(len); MultiBuf buffer = std::move(*buffer_); buffer_ = std::nullopt; return buffer; } Poll Rp2StdioChannel::DoPendReadyToWrite(Context&) { return OkStatus(); } Status Rp2StdioChannel::DoStageWrite(MultiBuf&& data) { WriteMultiBuf(data); return OkStatus(); } } // namespace ByteReaderWriter& Rp2StdioChannelInit(MultiBufAllocator& read_allocator, MultiBufAllocator& write_allocator) { static Rp2StdioChannel channel = [&] { InitStdio(); return Rp2StdioChannel(read_allocator, write_allocator); }(); return channel.channel(); } } // namespace pw::channel