• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_uart/blocking_adapter.h"
16 
17 #include <array>
18 #include <memory>
19 #include <mutex>
20 #include <optional>
21 #include <utility>
22 
23 #include "pw_assert/check.h"
24 #include "pw_bytes/array.h"
25 #include "pw_log/log.h"
26 #include "pw_sync/lock_annotations.h"
27 #include "pw_sync/mutex.h"
28 #include "pw_sync/timed_thread_notification.h"
29 #include "pw_thread/test_thread_context.h"
30 #include "pw_thread/thread.h"
31 #include "pw_unit_test/framework.h"
32 #include "pw_work_queue/work_queue.h"
33 
34 // Waits for something critical for test execution.
35 // We use PW_CHECK to ensure we crash on timeout instead of hanging forever.
36 // This is a macro so the crash points to the invocation site.
37 #define ASSERT_WAIT(waitable) PW_CHECK(waitable.try_acquire_for(1000ms))
38 
39 namespace pw::uart {
40 namespace {
41 
42 using namespace std::chrono_literals;
43 
44 // A mock UartNonBlocking for testing the blocking adapter.
45 class UartNonBlockingMock : public UartNonBlocking {
46  public:
enabled() const47   bool enabled() const { return enabled_; }
48 
WaitAndCompleteRead(Status status,ConstByteSpan data)49   void WaitAndCompleteRead(Status status, ConstByteSpan data) {
50     // Wait for a read to start.
51     ASSERT_WAIT(read_started_);
52 
53     std::optional<ReadTransaction> read = ConsumeCurrentRead();
54     PW_CHECK(read.has_value());
55 
56     // Copy data into rx buffer;
57     PW_CHECK_UINT_GE(read->rx_buffer.size(), data.size());
58     std::copy(data.begin(), data.end(), read->rx_buffer.begin());
59 
60     read->Complete(status, data.size());
61   }
62 
WaitForWrite()63   ConstByteSpan WaitForWrite() PW_LOCKS_EXCLUDED(mutex_) {
64     // Wait for a write to start.
65     ASSERT_WAIT(write_started_);
66 
67     std::lock_guard lock(mutex_);
68     PW_CHECK(current_write_.has_value());
69     return current_write_->tx_buffer;
70   }
71 
CompleteWrite(StatusWithSize status_size)72   void CompleteWrite(StatusWithSize status_size) {
73     std::optional<WriteTransaction> write = ConsumeCurrentWrite();
74     PW_CHECK(write.has_value());
75     write->Complete(status_size);
76   }
77 
WaitAndCompleteFlush(Status status)78   void WaitAndCompleteFlush(Status status) {
79     // Wait for a flush to start.
80     ASSERT_WAIT(flush_started_);
81 
82     std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
83     PW_CHECK(flush.has_value());
84 
85     flush->Complete(status);
86   }
87 
88  private:
89   sync::Mutex mutex_;
90   bool enabled_ = false;
91 
92   //
93   // UartNonBlocking impl.
94   //
DoEnable(bool enabled)95   Status DoEnable(bool enabled) override {
96     enabled_ = enabled;
97     return OkStatus();
98   }
99 
DoSetBaudRate(uint32_t)100   Status DoSetBaudRate(uint32_t) override { return OkStatus(); }
DoConservativeReadAvailable()101   size_t DoConservativeReadAvailable() override { return 0; }
DoClearPendingReceiveBytes()102   Status DoClearPendingReceiveBytes() override { return OkStatus(); }
103 
104   // Read
105   struct ReadTransaction {
106     ByteSpan rx_buffer;
107     size_t min_bytes;
108     Function<void(Status, ConstByteSpan buffer)> callback;
109 
Completepw::uart::__anon30c296480111::UartNonBlockingMock::ReadTransaction110     void Complete(Status status, size_t num_bytes) {
111       callback(status, rx_buffer.first(num_bytes));
112     }
113   };
114   std::optional<ReadTransaction> current_read_ PW_GUARDED_BY(mutex_);
115   sync::TimedThreadNotification read_started_;
116 
ConsumeCurrentRead()117   std::optional<ReadTransaction> ConsumeCurrentRead()
118       PW_LOCKS_EXCLUDED(mutex_) {
119     std::lock_guard lock(mutex_);
120     return std::exchange(current_read_, std::nullopt);
121   }
122 
DoRead(ByteSpan rx_buffer,size_t min_bytes,Function<void (Status,ConstByteSpan buffer)> && callback)123   Status DoRead(ByteSpan rx_buffer,
124                 size_t min_bytes,
125                 Function<void(Status, ConstByteSpan buffer)>&& callback)
126       override PW_LOCKS_EXCLUDED(mutex_) {
127     {
128       std::lock_guard lock(mutex_);
129 
130       if (current_read_) {
131         return Status::Unavailable();
132       }
133 
134       current_read_.emplace(ReadTransaction{
135           .rx_buffer = rx_buffer,
136           .min_bytes = min_bytes,
137           .callback = std::move(callback),
138       });
139     }
140 
141     read_started_.release();
142     return OkStatus();
143   }
144 
DoCancelRead()145   bool DoCancelRead() override {
146     std::optional<ReadTransaction> read = ConsumeCurrentRead();
147     if (!read.has_value()) {
148       return false;
149     }
150     read->Complete(Status::Cancelled(), 0);
151     return true;
152   }
153 
154   // Write
155   struct WriteTransaction {
156     ConstByteSpan tx_buffer;
157     Function<void(StatusWithSize)> callback;
158 
Completepw::uart::__anon30c296480111::UartNonBlockingMock::WriteTransaction159     void Complete(StatusWithSize status_size) { callback(status_size); }
160   };
161   std::optional<WriteTransaction> current_write_ PW_GUARDED_BY(mutex_);
162   sync::TimedThreadNotification write_started_;
163 
ConsumeCurrentWrite()164   std::optional<WriteTransaction> ConsumeCurrentWrite()
165       PW_LOCKS_EXCLUDED(mutex_) {
166     std::lock_guard lock(mutex_);
167     return std::exchange(current_write_, std::nullopt);
168   }
169 
DoWrite(ConstByteSpan tx_buffer,Function<void (StatusWithSize status)> && callback)170   Status DoWrite(ConstByteSpan tx_buffer,
171                  Function<void(StatusWithSize status)>&& callback) override
172       PW_LOCKS_EXCLUDED(mutex_) {
173     {
174       std::lock_guard lock(mutex_);
175 
176       if (current_write_) {
177         return Status::Unavailable();
178       }
179 
180       current_write_.emplace(WriteTransaction{
181           .tx_buffer = tx_buffer,
182           .callback = std::move(callback),
183       });
184     }
185 
186     write_started_.release();
187     return OkStatus();
188   }
189 
DoCancelWrite()190   bool DoCancelWrite() override {
191     std::optional<WriteTransaction> write = ConsumeCurrentWrite();
192     if (!write.has_value()) {
193       return false;
194     }
195     write->Complete(StatusWithSize::Cancelled(0));
196     return true;
197   }
198 
199   // Flush
200   struct FlushTransaction {
201     Function<void(Status)> callback;
202 
Completepw::uart::__anon30c296480111::UartNonBlockingMock::FlushTransaction203     void Complete(Status status) { callback(status); }
204   };
205   std::optional<FlushTransaction> current_flush_ PW_GUARDED_BY(mutex_);
206   sync::TimedThreadNotification flush_started_;
207 
ConsumeCurrentFlush()208   std::optional<FlushTransaction> ConsumeCurrentFlush()
209       PW_LOCKS_EXCLUDED(mutex_) {
210     std::lock_guard lock(mutex_);
211     return std::exchange(current_flush_, std::nullopt);
212   }
213 
DoFlushOutput(Function<void (Status status)> && callback)214   Status DoFlushOutput(Function<void(Status status)>&& callback) override
215       PW_LOCKS_EXCLUDED(mutex_) {
216     {
217       std::lock_guard lock(mutex_);
218 
219       if (current_flush_) {
220         return Status::Unavailable();
221       }
222 
223       current_flush_.emplace(FlushTransaction{
224           .callback = std::move(callback),
225       });
226     }
227 
228     flush_started_.release();
229     return OkStatus();
230   }
231 
DoCancelFlushOutput()232   bool DoCancelFlushOutput() override {
233     std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
234     if (!flush.has_value()) {
235       return false;
236     }
237     flush->Complete(Status::Cancelled());
238     return true;
239   }
240 };
241 
242 // Test fixture
243 class BlockingAdapterTest : public ::testing::Test {
244  protected:
BlockingAdapterTest()245   BlockingAdapterTest() : adapter(underlying) {}
246 
247   UartNonBlockingMock underlying;
248   UartBlockingAdapter adapter;
249 
250   work_queue::WorkQueueWithBuffer<2> work_queue;
251 
252   // State used by tests.
253   // Ideally these would be locals, but that would require capturing more than
254   // one pointer worth of data, exceeding PW_FUNCTION_INLINE_CALLABLE_SIZE.
255   sync::TimedThreadNotification blocking_action_complete;
256   static constexpr auto kReadBufferSize = 16;
257   std::array<std::byte, kReadBufferSize> read_buffer;
258   StatusWithSize read_result;
259   Status write_result;
260 
SetUp()261   void SetUp() override { StartWorkQueueThread(); }
262 
TearDown()263   void TearDown() override { StopWorkQueueThread(); }
264 
StartWorkQueueThread()265   void StartWorkQueueThread() {
266     PW_CHECK(!work_queue_thread_, "WorkQueue thread already started");
267     work_queue_thread_context_ =
268         std::make_unique<thread::test::TestThreadContext>();
269     work_queue_thread_.emplace(work_queue_thread_context_->options(),
270                                work_queue);
271   }
272 
StopWorkQueueThread()273   void StopWorkQueueThread() {
274     if (work_queue_thread_) {
275       PW_LOG_DEBUG("Stopping work queue...");
276       work_queue.RequestStop();
277 #if PW_THREAD_JOINING_ENABLED
278       work_queue_thread_->join();
279 #else
280       work_queue_thread_->detach();
281 #endif
282       // Once stopped, the WorkQueue cannot be started again (stop_requested_
283       // latches), so we don't set work_queue_thread_ to std::nullopt here.
284       // work_queue_thread_ = std::nullopt;
285     }
286   }
287 
288  private:
289   std::unique_ptr<thread::test::TestThreadContext> work_queue_thread_context_;
290   std::optional<thread::Thread> work_queue_thread_;
291 };
292 
293 //
294 // Enable
295 //
296 
TEST_F(BlockingAdapterTest,EnableWorks)297 TEST_F(BlockingAdapterTest, EnableWorks) {
298   // Start out disabled
299   ASSERT_FALSE(underlying.enabled());
300 
301   // Can enable
302   PW_TEST_EXPECT_OK(adapter.Enable());
303   EXPECT_TRUE(underlying.enabled());
304 }
305 
TEST_F(BlockingAdapterTest,DisableWorks)306 TEST_F(BlockingAdapterTest, DisableWorks) {
307   // Start out enabled
308   PW_TEST_ASSERT_OK(underlying.Enable());
309   ASSERT_TRUE(underlying.enabled());
310 
311   // Can disable
312   PW_TEST_EXPECT_OK(adapter.Disable());
313   EXPECT_FALSE(underlying.enabled());
314 }
315 
316 //
317 // Read
318 //
319 
TEST_F(BlockingAdapterTest,ReadWorks)320 TEST_F(BlockingAdapterTest, ReadWorks) {
321   // Call blocking ReadExactly on the work queue.
322   work_queue.CheckPushWork([this]() {
323     PW_LOG_DEBUG("Calling adapter.ReadExactly()...");
324     read_result = adapter.ReadExactly(read_buffer);
325     blocking_action_complete.release();
326   });
327 
328   constexpr auto kRxData = bytes::Array<0x12, 0x34, 0x56>();
329   static_assert(kRxData.size() <= kReadBufferSize);
330 
331   underlying.WaitAndCompleteRead(OkStatus(), kRxData);
332 
333   // Wait for the read to complete.
334   ASSERT_WAIT(blocking_action_complete);
335 
336   PW_TEST_EXPECT_OK(read_result.status());
337   EXPECT_EQ(read_result.size(), kRxData.size());
338   EXPECT_TRUE(std::equal(kRxData.begin(), kRxData.end(), read_buffer.begin()));
339 }
340 
TEST_F(BlockingAdapterTest,ReadHandlesTimeouts)341 TEST_F(BlockingAdapterTest, ReadHandlesTimeouts) {
342   // Call blocking TryReadExactlyFor on the work queue.
343   work_queue.CheckPushWork([this]() {
344     PW_LOG_DEBUG("Calling adapter.TryReadExactlyFor()...");
345     read_result = adapter.TryReadExactlyFor(read_buffer, 100ms);
346     blocking_action_complete.release();
347   });
348 
349   // Don't complete the transaction; let it time out.
350 
351   // Wait for the read to complete.
352   ASSERT_WAIT(blocking_action_complete);
353 
354   EXPECT_EQ(read_result.status(), Status::DeadlineExceeded());
355 }
356 
357 //
358 // Write
359 //
TEST_F(BlockingAdapterTest,WriteWorks)360 TEST_F(BlockingAdapterTest, WriteWorks) {
361   static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
362 
363   // Call blocking Write on the work queue.
364   work_queue.CheckPushWork([this]() {
365     PW_LOG_DEBUG("Calling adapter.Write()...");
366     write_result = adapter.Write(kTxData);
367     blocking_action_complete.release();
368   });
369 
370   ConstByteSpan tx_buffer = underlying.WaitForWrite();
371   EXPECT_EQ(tx_buffer.size(), kTxData.size());
372   EXPECT_TRUE(std::equal(tx_buffer.begin(), tx_buffer.end(), kTxData.begin()));
373 
374   underlying.CompleteWrite(StatusWithSize(tx_buffer.size()));
375 
376   // Wait for the write to complete.
377   ASSERT_WAIT(blocking_action_complete);
378   PW_TEST_EXPECT_OK(write_result);
379 }
380 
TEST_F(BlockingAdapterTest,WriteHandlesTimeouts)381 TEST_F(BlockingAdapterTest, WriteHandlesTimeouts) {
382   static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
383 
384   // Call blocking TryWriteFor on the work queue.
385   work_queue.CheckPushWork([this]() {
386     PW_LOG_DEBUG("Calling adapter.TryWriteFor()...");
387     write_result = adapter.TryWriteFor(kTxData, 100ms).status();
388     blocking_action_complete.release();
389   });
390 
391   // Don't complete the transaction; let it time out.
392 
393   // Wait for the write to complete.
394   ASSERT_WAIT(blocking_action_complete);
395   EXPECT_EQ(write_result, Status::DeadlineExceeded());
396 }
397 
398 //
399 // FlushOutput
400 //
TEST_F(BlockingAdapterTest,FlushOutputWorks)401 TEST_F(BlockingAdapterTest, FlushOutputWorks) {
402   // Call blocking FlushOutput on the work queue.
403   work_queue.CheckPushWork([this]() {
404     PW_LOG_DEBUG("Calling adapter.FlushOutput()...");
405     write_result = adapter.FlushOutput();
406     blocking_action_complete.release();
407   });
408 
409   underlying.WaitAndCompleteFlush(OkStatus());
410 
411   // Wait for the flush to complete.
412   ASSERT_WAIT(blocking_action_complete);
413   PW_TEST_EXPECT_OK(write_result);
414 }
415 
416 // FlushOutput does not provide a variant with timeout.
417 
418 }  // namespace
419 }  // namespace pw::uart
420