// Copyright 2023 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include "pw_rpc_transport/stream_rpc_dispatcher.h" #include #include #include "pw_bytes/span.h" #include "pw_log/log.h" #include "pw_span/span.h" #include "pw_status/status.h" #include "pw_stream/stream.h" #include "pw_sync/mutex.h" #include "pw_sync/thread_notification.h" #include "pw_thread/thread.h" #include "pw_thread_stl/options.h" #include "pw_unit_test/framework.h" namespace pw::rpc { namespace { using namespace std::chrono_literals; class TestIngress : public RpcIngressHandler { public: explicit TestIngress(size_t num_bytes_expected) : num_bytes_expected_(num_bytes_expected) {} Status ProcessIncomingData(ConstByteSpan buffer) override { if (num_bytes_expected_ > 0) { std::copy(buffer.begin(), buffer.end(), std::back_inserter(received_)); num_bytes_expected_ -= std::min(num_bytes_expected_, buffer.size()); } if (num_bytes_expected_ == 0) { done_.release(); } return OkStatus(); } std::vector received() const { return received_; } void Wait() { done_.acquire(); } private: size_t num_bytes_expected_ = 0; sync::ThreadNotification done_; std::vector received_; }; class TestStream : public stream::NonSeekableReader { public: TestStream() : position_(0) {} void QueueData(ConstByteSpan data) { std::lock_guard lock(send_mutex_); std::copy(data.begin(), data.end(), std::back_inserter(to_send_)); available_.release(); } void Stop() { stopped_ = true; available_.release(); } private: void WaitForData() { while (!stopped_) { { std::lock_guard lock(send_mutex_); if (position_ < to_send_.size()) { break; } } available_.acquire(); } } StatusWithSize DoRead(ByteSpan out) final { WaitForData(); if (stopped_) { return StatusWithSize(0); } std::lock_guard lock(send_mutex_); if (position_ == to_send_.size()) { return StatusWithSize::OutOfRange(); } size_t to_copy = std::min(out.size(), to_send_.size() - position_); std::memcpy(out.data(), to_send_.data() + position_, to_copy); position_ += to_copy; return StatusWithSize(to_copy); } sync::Mutex send_mutex_; std::vector to_send_; std::atomic stopped_ = false; size_t position_; sync::ThreadNotification available_; }; TEST(StreamRpcDispatcherTest, RecvOk) { constexpr size_t kWriteSize = 10; constexpr std::array kWriteBuffer = {}; TestIngress test_ingress(kWriteSize); TestStream test_stream; auto dispatcher = StreamRpcDispatcher(test_stream, test_ingress); auto dispatcher_thread = Thread(thread::stl::Options(), dispatcher); test_stream.QueueData(kWriteBuffer); test_ingress.Wait(); dispatcher.Stop(); test_stream.Stop(); dispatcher_thread.join(); auto received = test_ingress.received(); EXPECT_EQ(received.size(), kWriteSize); EXPECT_EQ(dispatcher.num_read_errors(), 0U); } } // namespace } // namespace pw::rpc