1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_uart/blocking_adapter.h"
16
17 #include <array>
18 #include <memory>
19 #include <mutex>
20 #include <optional>
21 #include <utility>
22
23 #include "pw_assert/check.h"
24 #include "pw_bytes/array.h"
25 #include "pw_log/log.h"
26 #include "pw_sync/lock_annotations.h"
27 #include "pw_sync/mutex.h"
28 #include "pw_sync/timed_thread_notification.h"
29 #include "pw_thread/test_thread_context.h"
30 #include "pw_thread/thread.h"
31 #include "pw_unit_test/framework.h"
32 #include "pw_work_queue/work_queue.h"
33
34 // Waits for something critical for test execution.
35 // We use PW_CHECK to ensure we crash on timeout instead of hanging forever.
36 // This is a macro so the crash points to the invocation site.
37 #define ASSERT_WAIT(waitable) PW_CHECK(waitable.try_acquire_for(1000ms))
38
39 namespace pw::uart {
40 namespace {
41
42 using namespace std::chrono_literals;
43
44 // A mock UartNonBlocking for testing the blocking adapter.
45 class UartNonBlockingMock : public UartNonBlocking {
46 public:
enabled() const47 bool enabled() const { return enabled_; }
48
WaitAndCompleteRead(Status status,ConstByteSpan data)49 void WaitAndCompleteRead(Status status, ConstByteSpan data) {
50 // Wait for a read to start.
51 ASSERT_WAIT(read_started_);
52
53 std::optional<ReadTransaction> read = ConsumeCurrentRead();
54 PW_CHECK(read.has_value());
55
56 // Copy data into rx buffer;
57 PW_CHECK_UINT_GE(read->rx_buffer.size(), data.size());
58 std::copy(data.begin(), data.end(), read->rx_buffer.begin());
59
60 read->Complete(status, data.size());
61 }
62
WaitForWrite()63 ConstByteSpan WaitForWrite() PW_LOCKS_EXCLUDED(mutex_) {
64 // Wait for a write to start.
65 ASSERT_WAIT(write_started_);
66
67 std::lock_guard lock(mutex_);
68 PW_CHECK(current_write_.has_value());
69 return current_write_->tx_buffer;
70 }
71
CompleteWrite(StatusWithSize status_size)72 void CompleteWrite(StatusWithSize status_size) {
73 std::optional<WriteTransaction> write = ConsumeCurrentWrite();
74 PW_CHECK(write.has_value());
75 write->Complete(status_size);
76 }
77
WaitAndCompleteFlush(Status status)78 void WaitAndCompleteFlush(Status status) {
79 // Wait for a flush to start.
80 ASSERT_WAIT(flush_started_);
81
82 std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
83 PW_CHECK(flush.has_value());
84
85 flush->Complete(status);
86 }
87
88 private:
89 sync::Mutex mutex_;
90 bool enabled_ = false;
91
92 //
93 // UartNonBlocking impl.
94 //
DoEnable(bool enabled)95 Status DoEnable(bool enabled) override {
96 enabled_ = enabled;
97 return OkStatus();
98 }
99
DoSetBaudRate(uint32_t)100 Status DoSetBaudRate(uint32_t) override { return OkStatus(); }
DoConservativeReadAvailable()101 size_t DoConservativeReadAvailable() override { return 0; }
DoClearPendingReceiveBytes()102 Status DoClearPendingReceiveBytes() override { return OkStatus(); }
103
104 // Read
105 struct ReadTransaction {
106 ByteSpan rx_buffer;
107 size_t min_bytes;
108 Function<void(Status, ConstByteSpan buffer)> callback;
109
Completepw::uart::__anon30c296480111::UartNonBlockingMock::ReadTransaction110 void Complete(Status status, size_t num_bytes) {
111 callback(status, rx_buffer.first(num_bytes));
112 }
113 };
114 std::optional<ReadTransaction> current_read_ PW_GUARDED_BY(mutex_);
115 sync::TimedThreadNotification read_started_;
116
ConsumeCurrentRead()117 std::optional<ReadTransaction> ConsumeCurrentRead()
118 PW_LOCKS_EXCLUDED(mutex_) {
119 std::lock_guard lock(mutex_);
120 return std::exchange(current_read_, std::nullopt);
121 }
122
DoRead(ByteSpan rx_buffer,size_t min_bytes,Function<void (Status,ConstByteSpan buffer)> && callback)123 Status DoRead(ByteSpan rx_buffer,
124 size_t min_bytes,
125 Function<void(Status, ConstByteSpan buffer)>&& callback)
126 override PW_LOCKS_EXCLUDED(mutex_) {
127 {
128 std::lock_guard lock(mutex_);
129
130 if (current_read_) {
131 return Status::Unavailable();
132 }
133
134 current_read_.emplace(ReadTransaction{
135 .rx_buffer = rx_buffer,
136 .min_bytes = min_bytes,
137 .callback = std::move(callback),
138 });
139 }
140
141 read_started_.release();
142 return OkStatus();
143 }
144
DoCancelRead()145 bool DoCancelRead() override {
146 std::optional<ReadTransaction> read = ConsumeCurrentRead();
147 if (!read.has_value()) {
148 return false;
149 }
150 read->Complete(Status::Cancelled(), 0);
151 return true;
152 }
153
154 // Write
155 struct WriteTransaction {
156 ConstByteSpan tx_buffer;
157 Function<void(StatusWithSize)> callback;
158
Completepw::uart::__anon30c296480111::UartNonBlockingMock::WriteTransaction159 void Complete(StatusWithSize status_size) { callback(status_size); }
160 };
161 std::optional<WriteTransaction> current_write_ PW_GUARDED_BY(mutex_);
162 sync::TimedThreadNotification write_started_;
163
ConsumeCurrentWrite()164 std::optional<WriteTransaction> ConsumeCurrentWrite()
165 PW_LOCKS_EXCLUDED(mutex_) {
166 std::lock_guard lock(mutex_);
167 return std::exchange(current_write_, std::nullopt);
168 }
169
DoWrite(ConstByteSpan tx_buffer,Function<void (StatusWithSize status)> && callback)170 Status DoWrite(ConstByteSpan tx_buffer,
171 Function<void(StatusWithSize status)>&& callback) override
172 PW_LOCKS_EXCLUDED(mutex_) {
173 {
174 std::lock_guard lock(mutex_);
175
176 if (current_write_) {
177 return Status::Unavailable();
178 }
179
180 current_write_.emplace(WriteTransaction{
181 .tx_buffer = tx_buffer,
182 .callback = std::move(callback),
183 });
184 }
185
186 write_started_.release();
187 return OkStatus();
188 }
189
DoCancelWrite()190 bool DoCancelWrite() override {
191 std::optional<WriteTransaction> write = ConsumeCurrentWrite();
192 if (!write.has_value()) {
193 return false;
194 }
195 write->Complete(StatusWithSize::Cancelled(0));
196 return true;
197 }
198
199 // Flush
200 struct FlushTransaction {
201 Function<void(Status)> callback;
202
Completepw::uart::__anon30c296480111::UartNonBlockingMock::FlushTransaction203 void Complete(Status status) { callback(status); }
204 };
205 std::optional<FlushTransaction> current_flush_ PW_GUARDED_BY(mutex_);
206 sync::TimedThreadNotification flush_started_;
207
ConsumeCurrentFlush()208 std::optional<FlushTransaction> ConsumeCurrentFlush()
209 PW_LOCKS_EXCLUDED(mutex_) {
210 std::lock_guard lock(mutex_);
211 return std::exchange(current_flush_, std::nullopt);
212 }
213
DoFlushOutput(Function<void (Status status)> && callback)214 Status DoFlushOutput(Function<void(Status status)>&& callback) override
215 PW_LOCKS_EXCLUDED(mutex_) {
216 {
217 std::lock_guard lock(mutex_);
218
219 if (current_flush_) {
220 return Status::Unavailable();
221 }
222
223 current_flush_.emplace(FlushTransaction{
224 .callback = std::move(callback),
225 });
226 }
227
228 flush_started_.release();
229 return OkStatus();
230 }
231
DoCancelFlushOutput()232 bool DoCancelFlushOutput() override {
233 std::optional<FlushTransaction> flush = ConsumeCurrentFlush();
234 if (!flush.has_value()) {
235 return false;
236 }
237 flush->Complete(Status::Cancelled());
238 return true;
239 }
240 };
241
242 // Test fixture
243 class BlockingAdapterTest : public ::testing::Test {
244 protected:
BlockingAdapterTest()245 BlockingAdapterTest() : adapter(underlying) {}
246
247 UartNonBlockingMock underlying;
248 UartBlockingAdapter adapter;
249
250 work_queue::WorkQueueWithBuffer<2> work_queue;
251
252 // State used by tests.
253 // Ideally these would be locals, but that would require capturing more than
254 // one pointer worth of data, exceeding PW_FUNCTION_INLINE_CALLABLE_SIZE.
255 sync::TimedThreadNotification blocking_action_complete;
256 static constexpr auto kReadBufferSize = 16;
257 std::array<std::byte, kReadBufferSize> read_buffer;
258 StatusWithSize read_result;
259 Status write_result;
260
SetUp()261 void SetUp() override { StartWorkQueueThread(); }
262
TearDown()263 void TearDown() override { StopWorkQueueThread(); }
264
StartWorkQueueThread()265 void StartWorkQueueThread() {
266 PW_CHECK(!work_queue_thread_, "WorkQueue thread already started");
267 work_queue_thread_context_ =
268 std::make_unique<thread::test::TestThreadContext>();
269 work_queue_thread_.emplace(work_queue_thread_context_->options(),
270 work_queue);
271 }
272
StopWorkQueueThread()273 void StopWorkQueueThread() {
274 if (work_queue_thread_) {
275 PW_LOG_DEBUG("Stopping work queue...");
276 work_queue.RequestStop();
277 #if PW_THREAD_JOINING_ENABLED
278 work_queue_thread_->join();
279 #else
280 work_queue_thread_->detach();
281 #endif
282 // Once stopped, the WorkQueue cannot be started again (stop_requested_
283 // latches), so we don't set work_queue_thread_ to std::nullopt here.
284 // work_queue_thread_ = std::nullopt;
285 }
286 }
287
288 private:
289 std::unique_ptr<thread::test::TestThreadContext> work_queue_thread_context_;
290 std::optional<thread::Thread> work_queue_thread_;
291 };
292
293 //
294 // Enable
295 //
296
TEST_F(BlockingAdapterTest,EnableWorks)297 TEST_F(BlockingAdapterTest, EnableWorks) {
298 // Start out disabled
299 ASSERT_FALSE(underlying.enabled());
300
301 // Can enable
302 PW_TEST_EXPECT_OK(adapter.Enable());
303 EXPECT_TRUE(underlying.enabled());
304 }
305
TEST_F(BlockingAdapterTest,DisableWorks)306 TEST_F(BlockingAdapterTest, DisableWorks) {
307 // Start out enabled
308 PW_TEST_ASSERT_OK(underlying.Enable());
309 ASSERT_TRUE(underlying.enabled());
310
311 // Can disable
312 PW_TEST_EXPECT_OK(adapter.Disable());
313 EXPECT_FALSE(underlying.enabled());
314 }
315
316 //
317 // Read
318 //
319
TEST_F(BlockingAdapterTest,ReadWorks)320 TEST_F(BlockingAdapterTest, ReadWorks) {
321 // Call blocking ReadExactly on the work queue.
322 work_queue.CheckPushWork([this]() {
323 PW_LOG_DEBUG("Calling adapter.ReadExactly()...");
324 read_result = adapter.ReadExactly(read_buffer);
325 blocking_action_complete.release();
326 });
327
328 constexpr auto kRxData = bytes::Array<0x12, 0x34, 0x56>();
329 static_assert(kRxData.size() <= kReadBufferSize);
330
331 underlying.WaitAndCompleteRead(OkStatus(), kRxData);
332
333 // Wait for the read to complete.
334 ASSERT_WAIT(blocking_action_complete);
335
336 PW_TEST_EXPECT_OK(read_result.status());
337 EXPECT_EQ(read_result.size(), kRxData.size());
338 EXPECT_TRUE(std::equal(kRxData.begin(), kRxData.end(), read_buffer.begin()));
339 }
340
TEST_F(BlockingAdapterTest,ReadHandlesTimeouts)341 TEST_F(BlockingAdapterTest, ReadHandlesTimeouts) {
342 // Call blocking TryReadExactlyFor on the work queue.
343 work_queue.CheckPushWork([this]() {
344 PW_LOG_DEBUG("Calling adapter.TryReadExactlyFor()...");
345 read_result = adapter.TryReadExactlyFor(read_buffer, 100ms);
346 blocking_action_complete.release();
347 });
348
349 // Don't complete the transaction; let it time out.
350
351 // Wait for the read to complete.
352 ASSERT_WAIT(blocking_action_complete);
353
354 EXPECT_EQ(read_result.status(), Status::DeadlineExceeded());
355 }
356
357 //
358 // Write
359 //
TEST_F(BlockingAdapterTest,WriteWorks)360 TEST_F(BlockingAdapterTest, WriteWorks) {
361 static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
362
363 // Call blocking Write on the work queue.
364 work_queue.CheckPushWork([this]() {
365 PW_LOG_DEBUG("Calling adapter.Write()...");
366 write_result = adapter.Write(kTxData);
367 blocking_action_complete.release();
368 });
369
370 ConstByteSpan tx_buffer = underlying.WaitForWrite();
371 EXPECT_EQ(tx_buffer.size(), kTxData.size());
372 EXPECT_TRUE(std::equal(tx_buffer.begin(), tx_buffer.end(), kTxData.begin()));
373
374 underlying.CompleteWrite(StatusWithSize(tx_buffer.size()));
375
376 // Wait for the write to complete.
377 ASSERT_WAIT(blocking_action_complete);
378 PW_TEST_EXPECT_OK(write_result);
379 }
380
TEST_F(BlockingAdapterTest,WriteHandlesTimeouts)381 TEST_F(BlockingAdapterTest, WriteHandlesTimeouts) {
382 static constexpr auto kTxData = bytes::Array<0x12, 0x34, 0x56>();
383
384 // Call blocking TryWriteFor on the work queue.
385 work_queue.CheckPushWork([this]() {
386 PW_LOG_DEBUG("Calling adapter.TryWriteFor()...");
387 write_result = adapter.TryWriteFor(kTxData, 100ms).status();
388 blocking_action_complete.release();
389 });
390
391 // Don't complete the transaction; let it time out.
392
393 // Wait for the write to complete.
394 ASSERT_WAIT(blocking_action_complete);
395 EXPECT_EQ(write_result, Status::DeadlineExceeded());
396 }
397
398 //
399 // FlushOutput
400 //
TEST_F(BlockingAdapterTest,FlushOutputWorks)401 TEST_F(BlockingAdapterTest, FlushOutputWorks) {
402 // Call blocking FlushOutput on the work queue.
403 work_queue.CheckPushWork([this]() {
404 PW_LOG_DEBUG("Calling adapter.FlushOutput()...");
405 write_result = adapter.FlushOutput();
406 blocking_action_complete.release();
407 });
408
409 underlying.WaitAndCompleteFlush(OkStatus());
410
411 // Wait for the flush to complete.
412 ASSERT_WAIT(blocking_action_complete);
413 PW_TEST_EXPECT_OK(write_result);
414 }
415
416 // FlushOutput does not provide a variant with timeout.
417
418 } // namespace
419 } // namespace pw::uart
420