1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_channel/forwarding_channel.h"
16
17 #include <algorithm>
18 #include <array>
19
20 #include "pw_allocator/testing.h"
21 #include "pw_multibuf/header_chunk_region_tracker.h"
22 #include "pw_multibuf/simple_allocator.h"
23 #include "pw_string/string.h"
24 #include "pw_unit_test/framework.h"
25
26 namespace {
27
28 using ::pw::Result;
29 using ::pw::async2::Context;
30 using ::pw::async2::Pending;
31 using ::pw::async2::Poll;
32 using ::pw::async2::Ready;
33 using ::pw::async2::Task;
34 using ::pw::async2::Waker;
35 using ::pw::channel::ByteReader;
36 using ::pw::channel::DatagramReader;
37 using ::pw::multibuf::MultiBuf;
38
39 // Creates and initializes a MultiBuf to the specified value.
40 class InitializedMultiBuf {
41 public:
InitializedMultiBuf(std::string_view contents)42 InitializedMultiBuf(std::string_view contents) {
43 std::optional<pw::multibuf::OwnedChunk> chunk =
44 pw::multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
45 allocator_, contents.size());
46 std::memcpy(chunk.value().data(), contents.data(), contents.size());
47 buf_.PushFrontChunk(std::move(*chunk));
48 }
49
Take()50 pw::multibuf::MultiBuf Take() { return std::move(buf_); }
51
52 private:
53 pw::allocator::test::AllocatorForTest<2048> allocator_;
54 pw::multibuf::MultiBuf buf_;
55 };
56
CopyToString(const pw::multibuf::MultiBuf & mb)57 pw::InlineString<128> CopyToString(const pw::multibuf::MultiBuf& mb) {
58 pw::InlineString<128> contents(mb.size(), '\0');
59 std::copy(
60 mb.begin(), mb.end(), reinterpret_cast<std::byte*>(contents.data()));
61 return contents;
62 }
63
64 template <pw::channel::DataType kType,
65 size_t kDataSize = 128,
66 size_t kMetaSize = 128>
67 class TestChannelPair {
68 public:
TestChannelPair()69 TestChannelPair()
70 : simple_allocator_(data_area_, meta_alloc_), pair_(simple_allocator_) {}
71
operator ->()72 pw::channel::ForwardingChannelPair<kType>* operator->() { return &pair_; }
73
74 private:
75 std::array<std::byte, kDataSize> data_area_;
76 pw::allocator::test::AllocatorForTest<kMetaSize> meta_alloc_;
77 pw::multibuf::SimpleAllocator simple_allocator_;
78
79 pw::channel::ForwardingChannelPair<kType> pair_;
80 };
81
82 // TODO: b/330788671 - Have the test tasks run in multiple stages to ensure that
83 // wakers are stored / woken properly by ForwardingChannel.
TEST(ForwardingDatagramChannel,ForwardsEmptyDatagrams)84 TEST(ForwardingDatagramChannel, ForwardsEmptyDatagrams) {
85 pw::async2::Dispatcher dispatcher;
86
87 class : public pw::async2::Task {
88 public:
89 TestChannelPair<pw::channel::DataType::kDatagram> pair;
90
91 int test_completed = 0;
92
93 private:
94 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
95 // No data yet
96 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
97 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
98
99 // Send datagram first->second
100 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
101 pair->first().PendReadyToWrite(cx));
102 auto result = pair->first().Write({}); // Write empty datagram
103 EXPECT_EQ(pw::OkStatus(), result.status());
104
105 EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
106 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
107
108 auto empty_chunk_result = pair->second().PendRead(cx);
109 EXPECT_TRUE(empty_chunk_result.IsReady());
110 EXPECT_TRUE(empty_chunk_result->ok());
111 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
112
113 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
114
115 // Send datagram second->first
116 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
117 pair->second().PendReadyToWrite(cx));
118 result = pair->second().Write({}); // Write empty datagram
119 EXPECT_EQ(pw::OkStatus(), result.status());
120
121 EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
122 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
123
124 empty_chunk_result = pair->first().PendRead(cx);
125 EXPECT_TRUE(empty_chunk_result.IsReady());
126 EXPECT_TRUE(empty_chunk_result->ok());
127 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
128
129 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
130
131 test_completed += 1;
132 return pw::async2::Ready();
133 }
134 } test_task;
135
136 dispatcher.Post(test_task);
137
138 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
139 EXPECT_EQ(test_task.test_completed, 1);
140 }
141
TEST(ForwardingDatagramChannel,ForwardsNonEmptyDatagrams)142 TEST(ForwardingDatagramChannel, ForwardsNonEmptyDatagrams) {
143 pw::async2::Dispatcher dispatcher;
144
145 class : public pw::async2::Task {
146 public:
147 TestChannelPair<pw::channel::DataType::kDatagram> pair;
148
149 int test_completed = 0;
150
151 private:
152 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
153 InitializedMultiBuf b1("Hello");
154 InitializedMultiBuf b2("world!");
155
156 // Send datagram first->second
157 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
158 pair->first().PendReadyToWrite(cx));
159 EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
160
161 EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
162
163 EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
164 "Hello");
165
166 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
167 pair->first().PendReadyToWrite(cx));
168 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
169
170 EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
171 EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
172 "world!");
173
174 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
175 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
176 pair->first().PendReadyToWrite(cx));
177
178 test_completed += 1;
179 return pw::async2::Ready();
180 }
181 } test_task;
182
183 dispatcher.Post(test_task);
184
185 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
186 EXPECT_EQ(test_task.test_completed, 1);
187 }
188
TEST(ForwardingDatagramChannel,ForwardsDatagrams)189 TEST(ForwardingDatagramChannel, ForwardsDatagrams) {
190 pw::async2::Dispatcher dispatcher;
191
192 class : public pw::async2::Task {
193 public:
194 TestChannelPair<pw::channel::DataType::kDatagram> pair;
195
196 int test_completed = 0;
197
198 private:
199 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
200 // No data yet
201 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
202 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
203
204 // Send datagram first->second
205 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
206 pair->first().PendReadyToWrite(cx));
207 auto result = pair->first().Write({}); // Write empty datagram
208 EXPECT_EQ(pw::OkStatus(), result.status());
209
210 EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
211 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
212
213 auto empty_chunk_result = pair->second().PendRead(cx);
214 EXPECT_TRUE(empty_chunk_result.IsReady());
215 EXPECT_TRUE(empty_chunk_result->ok());
216 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
217
218 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
219
220 // Send datagram second->first
221 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
222 pair->second().PendReadyToWrite(cx));
223 result = pair->second().Write({}); // Write empty datagram
224 EXPECT_EQ(pw::OkStatus(), result.status());
225
226 EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
227 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
228
229 empty_chunk_result = pair->first().PendRead(cx);
230 EXPECT_TRUE(empty_chunk_result.IsReady());
231 EXPECT_TRUE(empty_chunk_result->ok());
232 EXPECT_EQ((*empty_chunk_result)->size(), 0u);
233
234 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
235
236 test_completed += 1;
237 return pw::async2::Ready();
238 }
239 } test_task;
240
241 dispatcher.Post(test_task);
242
243 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
244 EXPECT_EQ(test_task.test_completed, 1);
245 }
246
TEST(ForwardingDatagramchannel,PendCloseAwakensAndClosesPeer)247 TEST(ForwardingDatagramchannel, PendCloseAwakensAndClosesPeer) {
248 class TryToReadUntilClosed : public Task {
249 public:
250 TryToReadUntilClosed(DatagramReader& reader) : reader_(reader) {}
251
252 private:
253 pw::async2::Poll<> DoPend(Context& cx) final {
254 Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
255 if (read.IsPending()) {
256 return Pending();
257 }
258 EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
259 return Ready();
260 }
261 DatagramReader& reader_;
262 };
263
264 pw::async2::Dispatcher dispatcher;
265 TestChannelPair<pw::channel::DataType::kDatagram> pair;
266 TryToReadUntilClosed read_task(pair->first());
267 dispatcher.Post(read_task);
268
269 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
270 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
271
272 Waker empty_waker;
273 Context empty_cx(dispatcher, empty_waker);
274 EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
275
276 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
277 }
278
TEST(ForwardingByteChannel,IgnoresEmptyWrites)279 TEST(ForwardingByteChannel, IgnoresEmptyWrites) {
280 pw::async2::Dispatcher dispatcher;
281
282 class : public pw::async2::Task {
283 public:
284 TestChannelPair<pw::channel::DataType::kByte> pair;
285
286 int test_completed = 0;
287
288 private:
289 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
290 // No data yet
291 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
292 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
293
294 // Send nothing first->second
295 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
296 pair->first().PendReadyToWrite(cx));
297 EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
298
299 // Still no data
300 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
301 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
302
303 // Send nothing second->first
304 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
305 pair->first().PendReadyToWrite(cx));
306 EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
307
308 // Still no data
309 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
310 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
311
312 test_completed += 1;
313 return pw::async2::Ready();
314 }
315 } test_task;
316
317 dispatcher.Post(test_task);
318
319 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
320 EXPECT_EQ(test_task.test_completed, 1);
321 }
322
TEST(ForwardingByteChannel,WriteData)323 TEST(ForwardingByteChannel, WriteData) {
324 pw::async2::Dispatcher dispatcher;
325
326 class : public pw::async2::Task {
327 public:
328 TestChannelPair<pw::channel::DataType::kByte> pair;
329
330 int test_completed = 0;
331
332 private:
333 pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
334 // No data yet
335 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
336 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
337
338 InitializedMultiBuf b1("hello");
339 InitializedMultiBuf b2(" ");
340 InitializedMultiBuf b3("world");
341
342 // Send "hello world" first->second
343 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
344 pair->first().PendReadyToWrite(cx));
345 EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
346 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
347 pair->first().PendReadyToWrite(cx));
348 EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
349 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
350 pair->first().PendReadyToWrite(cx));
351 EXPECT_EQ(pw::OkStatus(), pair->first().Write(b3.Take()).status());
352
353 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
354
355 auto hello_world_result = pair->second().PendRead(cx);
356 EXPECT_TRUE(hello_world_result.IsReady());
357
358 EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
359
360 // Send nothing second->first
361 EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
362 pair->first().PendReadyToWrite(cx));
363 EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
364
365 // Still no data
366 EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
367 EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
368
369 test_completed += 1;
370 return pw::async2::Ready();
371 }
372 } test_task;
373
374 dispatcher.Post(test_task);
375
376 EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
377 EXPECT_EQ(test_task.test_completed, 1);
378 }
379
TEST(ForwardingByteChannel,PendCloseAwakensAndClosesPeer)380 TEST(ForwardingByteChannel, PendCloseAwakensAndClosesPeer) {
381 class TryToReadUntilClosed : public Task {
382 public:
383 TryToReadUntilClosed(ByteReader& reader) : reader_(reader) {}
384
385 private:
386 pw::async2::Poll<> DoPend(Context& cx) final {
387 Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
388 if (read.IsPending()) {
389 return Pending();
390 }
391 EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
392 return Ready();
393 }
394 ByteReader& reader_;
395 };
396
397 pw::async2::Dispatcher dispatcher;
398 TestChannelPair<pw::channel::DataType::kByte> pair;
399 TryToReadUntilClosed read_task(pair->first());
400 dispatcher.Post(read_task);
401
402 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
403 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
404
405 Waker empty_waker;
406 Context empty_cx(dispatcher, empty_waker);
407 EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
408
409 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
410 }
411
412 } // namespace
413