• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_channel/forwarding_channel.h"
16 
17 #include <algorithm>
18 #include <array>
19 
20 #include "pw_allocator/testing.h"
21 #include "pw_multibuf/header_chunk_region_tracker.h"
22 #include "pw_multibuf/simple_allocator.h"
23 #include "pw_string/string.h"
24 #include "pw_unit_test/framework.h"
25 
26 namespace {
27 
28 using ::pw::Result;
29 using ::pw::async2::Context;
30 using ::pw::async2::Pending;
31 using ::pw::async2::Poll;
32 using ::pw::async2::Ready;
33 using ::pw::async2::Task;
34 using ::pw::async2::Waker;
35 using ::pw::channel::ByteReader;
36 using ::pw::channel::DatagramReader;
37 using ::pw::multibuf::MultiBuf;
38 
39 // Creates and initializes a MultiBuf to the specified value.
40 class InitializedMultiBuf {
41  public:
InitializedMultiBuf(std::string_view contents)42   InitializedMultiBuf(std::string_view contents) {
43     std::optional<pw::multibuf::OwnedChunk> chunk =
44         pw::multibuf::HeaderChunkRegionTracker::AllocateRegionAsChunk(
45             allocator_, contents.size());
46     std::memcpy(chunk.value().data(), contents.data(), contents.size());
47     buf_.PushFrontChunk(std::move(*chunk));
48   }
49 
Take()50   pw::multibuf::MultiBuf Take() { return std::move(buf_); }
51 
52  private:
53   pw::allocator::test::AllocatorForTest<2048> allocator_;
54   pw::multibuf::MultiBuf buf_;
55 };
56 
CopyToString(const pw::multibuf::MultiBuf & mb)57 pw::InlineString<128> CopyToString(const pw::multibuf::MultiBuf& mb) {
58   pw::InlineString<128> contents(mb.size(), '\0');
59   std::copy(
60       mb.begin(), mb.end(), reinterpret_cast<std::byte*>(contents.data()));
61   return contents;
62 }
63 
64 template <pw::channel::DataType kType,
65           size_t kDataSize = 128,
66           size_t kMetaSize = 128>
67 class TestChannelPair {
68  public:
TestChannelPair()69   TestChannelPair()
70       : simple_allocator_(data_area_, meta_alloc_), pair_(simple_allocator_) {}
71 
operator ->()72   pw::channel::ForwardingChannelPair<kType>* operator->() { return &pair_; }
73 
74  private:
75   std::array<std::byte, kDataSize> data_area_;
76   pw::allocator::test::AllocatorForTest<kMetaSize> meta_alloc_;
77   pw::multibuf::SimpleAllocator simple_allocator_;
78 
79   pw::channel::ForwardingChannelPair<kType> pair_;
80 };
81 
82 // TODO: b/330788671 - Have the test tasks run in multiple stages to ensure that
83 //     wakers are stored / woken properly by ForwardingChannel.
TEST(ForwardingDatagramChannel,ForwardsEmptyDatagrams)84 TEST(ForwardingDatagramChannel, ForwardsEmptyDatagrams) {
85   pw::async2::Dispatcher dispatcher;
86 
87   class : public pw::async2::Task {
88    public:
89     TestChannelPair<pw::channel::DataType::kDatagram> pair;
90 
91     int test_completed = 0;
92 
93    private:
94     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
95       // No data yet
96       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
97       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
98 
99       // Send datagram first->second
100       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
101                 pair->first().PendReadyToWrite(cx));
102       auto result = pair->first().Write({});  // Write empty datagram
103       EXPECT_EQ(pw::OkStatus(), result.status());
104 
105       EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
106       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
107 
108       auto empty_chunk_result = pair->second().PendRead(cx);
109       EXPECT_TRUE(empty_chunk_result.IsReady());
110       EXPECT_TRUE(empty_chunk_result->ok());
111       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
112 
113       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
114 
115       // Send datagram second->first
116       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
117                 pair->second().PendReadyToWrite(cx));
118       result = pair->second().Write({});  // Write empty datagram
119       EXPECT_EQ(pw::OkStatus(), result.status());
120 
121       EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
122       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
123 
124       empty_chunk_result = pair->first().PendRead(cx);
125       EXPECT_TRUE(empty_chunk_result.IsReady());
126       EXPECT_TRUE(empty_chunk_result->ok());
127       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
128 
129       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
130 
131       test_completed += 1;
132       return pw::async2::Ready();
133     }
134   } test_task;
135 
136   dispatcher.Post(test_task);
137 
138   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
139   EXPECT_EQ(test_task.test_completed, 1);
140 }
141 
TEST(ForwardingDatagramChannel,ForwardsNonEmptyDatagrams)142 TEST(ForwardingDatagramChannel, ForwardsNonEmptyDatagrams) {
143   pw::async2::Dispatcher dispatcher;
144 
145   class : public pw::async2::Task {
146    public:
147     TestChannelPair<pw::channel::DataType::kDatagram> pair;
148 
149     int test_completed = 0;
150 
151    private:
152     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
153       InitializedMultiBuf b1("Hello");
154       InitializedMultiBuf b2("world!");
155 
156       // Send datagram first->second
157       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
158                 pair->first().PendReadyToWrite(cx));
159       EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
160 
161       EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
162 
163       EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
164                 "Hello");
165 
166       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
167                 pair->first().PendReadyToWrite(cx));
168       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
169 
170       EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
171       EXPECT_EQ(CopyToString(pair->second().PendRead(cx).value().value()),
172                 "world!");
173 
174       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
175       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
176                 pair->first().PendReadyToWrite(cx));
177 
178       test_completed += 1;
179       return pw::async2::Ready();
180     }
181   } test_task;
182 
183   dispatcher.Post(test_task);
184 
185   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
186   EXPECT_EQ(test_task.test_completed, 1);
187 }
188 
TEST(ForwardingDatagramChannel,ForwardsDatagrams)189 TEST(ForwardingDatagramChannel, ForwardsDatagrams) {
190   pw::async2::Dispatcher dispatcher;
191 
192   class : public pw::async2::Task {
193    public:
194     TestChannelPair<pw::channel::DataType::kDatagram> pair;
195 
196     int test_completed = 0;
197 
198    private:
199     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
200       // No data yet
201       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
202       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
203 
204       // Send datagram first->second
205       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
206                 pair->first().PendReadyToWrite(cx));
207       auto result = pair->first().Write({});  // Write empty datagram
208       EXPECT_EQ(pw::OkStatus(), result.status());
209 
210       EXPECT_EQ(pw::async2::Pending(), pair->first().PendReadyToWrite(cx));
211       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
212 
213       auto empty_chunk_result = pair->second().PendRead(cx);
214       EXPECT_TRUE(empty_chunk_result.IsReady());
215       EXPECT_TRUE(empty_chunk_result->ok());
216       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
217 
218       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
219 
220       // Send datagram second->first
221       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
222                 pair->second().PendReadyToWrite(cx));
223       result = pair->second().Write({});  // Write empty datagram
224       EXPECT_EQ(pw::OkStatus(), result.status());
225 
226       EXPECT_EQ(pw::async2::Pending(), pair->second().PendReadyToWrite(cx));
227       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
228 
229       empty_chunk_result = pair->first().PendRead(cx);
230       EXPECT_TRUE(empty_chunk_result.IsReady());
231       EXPECT_TRUE(empty_chunk_result->ok());
232       EXPECT_EQ((*empty_chunk_result)->size(), 0u);
233 
234       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
235 
236       test_completed += 1;
237       return pw::async2::Ready();
238     }
239   } test_task;
240 
241   dispatcher.Post(test_task);
242 
243   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
244   EXPECT_EQ(test_task.test_completed, 1);
245 }
246 
TEST(ForwardingDatagramchannel,PendCloseAwakensAndClosesPeer)247 TEST(ForwardingDatagramchannel, PendCloseAwakensAndClosesPeer) {
248   class TryToReadUntilClosed : public Task {
249    public:
250     TryToReadUntilClosed(DatagramReader& reader) : reader_(reader) {}
251 
252    private:
253     pw::async2::Poll<> DoPend(Context& cx) final {
254       Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
255       if (read.IsPending()) {
256         return Pending();
257       }
258       EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
259       return Ready();
260     }
261     DatagramReader& reader_;
262   };
263 
264   pw::async2::Dispatcher dispatcher;
265   TestChannelPair<pw::channel::DataType::kDatagram> pair;
266   TryToReadUntilClosed read_task(pair->first());
267   dispatcher.Post(read_task);
268 
269   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
270   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
271 
272   Waker empty_waker;
273   Context empty_cx(dispatcher, empty_waker);
274   EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
275 
276   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
277 }
278 
TEST(ForwardingByteChannel,IgnoresEmptyWrites)279 TEST(ForwardingByteChannel, IgnoresEmptyWrites) {
280   pw::async2::Dispatcher dispatcher;
281 
282   class : public pw::async2::Task {
283    public:
284     TestChannelPair<pw::channel::DataType::kByte> pair;
285 
286     int test_completed = 0;
287 
288    private:
289     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
290       // No data yet
291       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
292       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
293 
294       // Send nothing first->second
295       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
296                 pair->first().PendReadyToWrite(cx));
297       EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
298 
299       // Still no data
300       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
301       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
302 
303       // Send nothing second->first
304       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
305                 pair->first().PendReadyToWrite(cx));
306       EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
307 
308       // Still no data
309       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
310       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
311 
312       test_completed += 1;
313       return pw::async2::Ready();
314     }
315   } test_task;
316 
317   dispatcher.Post(test_task);
318 
319   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
320   EXPECT_EQ(test_task.test_completed, 1);
321 }
322 
TEST(ForwardingByteChannel,WriteData)323 TEST(ForwardingByteChannel, WriteData) {
324   pw::async2::Dispatcher dispatcher;
325 
326   class : public pw::async2::Task {
327    public:
328     TestChannelPair<pw::channel::DataType::kByte> pair;
329 
330     int test_completed = 0;
331 
332    private:
333     pw::async2::Poll<> DoPend(pw::async2::Context& cx) override {
334       // No data yet
335       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
336       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
337 
338       InitializedMultiBuf b1("hello");
339       InitializedMultiBuf b2(" ");
340       InitializedMultiBuf b3("world");
341 
342       // Send "hello world" first->second
343       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
344                 pair->first().PendReadyToWrite(cx));
345       EXPECT_EQ(pw::OkStatus(), pair->first().Write(b1.Take()).status());
346       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
347                 pair->first().PendReadyToWrite(cx));
348       EXPECT_EQ(pw::OkStatus(), pair->first().Write(b2.Take()).status());
349       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
350                 pair->first().PendReadyToWrite(cx));
351       EXPECT_EQ(pw::OkStatus(), pair->first().Write(b3.Take()).status());
352 
353       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
354 
355       auto hello_world_result = pair->second().PendRead(cx);
356       EXPECT_TRUE(hello_world_result.IsReady());
357 
358       EXPECT_EQ(CopyToString(hello_world_result->value()), "hello world");
359 
360       // Send nothing second->first
361       EXPECT_EQ(pw::async2::Ready(pw::OkStatus()),
362                 pair->first().PendReadyToWrite(cx));
363       EXPECT_EQ(pw::OkStatus(), pair->first().Write({}).status());
364 
365       // Still no data
366       EXPECT_EQ(pw::async2::Pending(), pair->first().PendRead(cx));
367       EXPECT_EQ(pw::async2::Pending(), pair->second().PendRead(cx));
368 
369       test_completed += 1;
370       return pw::async2::Ready();
371     }
372   } test_task;
373 
374   dispatcher.Post(test_task);
375 
376   EXPECT_TRUE(dispatcher.RunUntilStalled().IsReady());
377   EXPECT_EQ(test_task.test_completed, 1);
378 }
379 
TEST(ForwardingByteChannel,PendCloseAwakensAndClosesPeer)380 TEST(ForwardingByteChannel, PendCloseAwakensAndClosesPeer) {
381   class TryToReadUntilClosed : public Task {
382    public:
383     TryToReadUntilClosed(ByteReader& reader) : reader_(reader) {}
384 
385    private:
386     pw::async2::Poll<> DoPend(Context& cx) final {
387       Poll<Result<MultiBuf>> read = reader_.PendRead(cx);
388       if (read.IsPending()) {
389         return Pending();
390       }
391       EXPECT_EQ(read->status(), pw::Status::FailedPrecondition());
392       return Ready();
393     }
394     ByteReader& reader_;
395   };
396 
397   pw::async2::Dispatcher dispatcher;
398   TestChannelPair<pw::channel::DataType::kByte> pair;
399   TryToReadUntilClosed read_task(pair->first());
400   dispatcher.Post(read_task);
401 
402   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
403   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
404 
405   Waker empty_waker;
406   Context empty_cx(dispatcher, empty_waker);
407   EXPECT_EQ(pair->second().PendClose(empty_cx), Ready(pw::OkStatus()));
408 
409   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
410 }
411 
412 }  // namespace
413