• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 //         __      ___   ___ _  _ ___ _  _  ___
17 //         \ \    / /_\ | _ \ \| |_ _| \| |/ __|
18 //          \ \/\/ / _ \|   / .` || || .` | (_ |
19 //           \_/\_/_/ \_\_|_\_|\_|___|_|\_|\___|
20 //  _____  _____ ___ ___ ___ __  __ ___ _  _ _____ _   _
21 // | __\ \/ / _ \ __| _ \_ _|  \/  | __| \| |_   _/_\ | |
22 // | _| >  <|  _/ _||   /| || |\/| | _|| .` | | |/ _ \| |__
23 // |___/_/\_\_| |___|_|_\___|_|  |_|___|_|\_| |_/_/ \_\____|
24 //
25 // This module is in an early, experimental state. The APIs are in flux and may
26 // change without notice. Please do not rely on it in production code, but feel
27 // free to explore and share feedback with the Pigweed team!
28 
29 #include <cstddef>
30 #include <cstdint>
31 #include <limits>
32 
33 #include "pw_async2/dispatcher.h"
34 #include "pw_async2/poll.h"
35 #include "pw_bytes/span.h"
36 #include "pw_multibuf/allocator.h"
37 #include "pw_multibuf/multibuf.h"
38 #include "pw_result/result.h"
39 #include "pw_status/status.h"
40 
41 namespace pw::channel {
42 
43 /// @defgroup pw_channel
44 /// @{
45 
46 /// Basic properties of a `Channel`. A `Channel` type can convert to any other
47 /// `Channel` for which it supports the required properties. For example, a
48 /// `kReadable` and `kWritable` channel may be passed to an API that only
49 /// requires `kReadable`.
50 enum Property : uint8_t {
51   /// All data is guaranteed to be delivered in order. The channel is closed if
52   /// data is lost.
53   kReliable = 1 << 0,
54 
55   /// The channel supports reading.
56   kReadable = 1 << 1,
57 
58   /// The channel supports writing.
59   kWritable = 1 << 2,
60 
61   /// The channel supports seeking (changing the read/write position).
62   kSeekable = 1 << 3,
63 };
64 
65 /// The type of data exchanged in `Channel` read and write calls. Unlike
66 /// `Property`, `Channels` with different `DataType`s cannot be used
67 /// interchangeably.
68 enum class DataType : uint8_t { kByte = 0, kDatagram = 1 };
69 
70 /// Positions from which to seek.
71 enum Whence : uint8_t {
72   /// Seek from the beginning of the channel. The offset is a direct offset
73   /// into the data.
74   kBeginning,
75 
76   /// Seek from the current position in the channel. The offset is added to
77   /// the current position. Use a negative offset to seek backwards.
78   ///
79   /// Implementations may only support seeking within a limited range from the
80   /// current position.
81   kCurrent,
82 
83   /// Seek from the end of the channel. The offset is added to the end
84   /// position. Use a negative offset to seek backwards from the end.
85   kEnd,
86 };
87 
88 /// Represents a write operation. `WriteToken` can be used to track whether a
89 /// particular write has been flushed.
90 class [[nodiscard]] WriteToken {
91  public:
WriteToken()92   constexpr WriteToken() : token_(0) {}
93 
94   constexpr WriteToken(const WriteToken&) = default;
95   constexpr WriteToken& operator=(const WriteToken&) = default;
96 
97   constexpr bool operator==(const WriteToken& other) const {
98     return token_ == other.token_;
99   }
100   constexpr bool operator!=(const WriteToken& other) const {
101     return token_ != other.token_;
102   }
103   constexpr bool operator<(const WriteToken& other) const {
104     return token_ < other.token_;
105   }
106   constexpr bool operator>(const WriteToken& other) const {
107     return token_ > other.token_;
108   }
109   constexpr bool operator<=(const WriteToken& other) const {
110     return token_ <= other.token_;
111   }
112   constexpr bool operator>=(const WriteToken& other) const {
113     return token_ >= other.token_;
114   }
115 
116  private:
117   friend class AnyChannel;
118 
WriteToken(uint32_t value)119   constexpr WriteToken(uint32_t value) : token_(value) {}
120 
121   uint32_t token_;
122 };
123 
124 /// A generic data channel that may support reading or writing bytes or
125 /// datagrams.
126 ///
127 /// Note that this channel should be used from only one ``pw::async::Task``
128 /// at a time, as the ``Pend`` methods are only required to remember the
129 /// latest ``pw::async2::Context`` that was provided.
130 class AnyChannel {
131  public:
132   virtual ~AnyChannel() = default;
133 
134   // Returned by Position() if getting the position is not supported.
135   // TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
136   // static constexpr size_t kUnknownPosition =
137   //     std::numeric_limits<size_t>::max();
138 
139   // Channel properties
140 
data_type()141   [[nodiscard]] constexpr DataType data_type() const { return data_type_; }
142 
reliable()143   [[nodiscard]] constexpr bool reliable() const {
144     return (properties_ & Property::kReliable) != 0;
145   }
146 
seekable()147   [[nodiscard]] constexpr bool seekable() const {
148     return (properties_ & Property::kSeekable) != 0;
149   }
150 
readable()151   [[nodiscard]] constexpr bool readable() const {
152     return (properties_ & Property::kReadable) != 0;
153   }
154 
writable()155   [[nodiscard]] constexpr bool writable() const {
156     return (properties_ & Property::kWritable) != 0;
157   }
158 
159   /// True if the channel is open for reading. Always false for write-only
160   /// channels.
is_read_open()161   [[nodiscard]] constexpr bool is_read_open() const { return read_open_; }
162 
163   /// True if the channel is open for writing. Always false for read-only
164   /// channels.
is_write_open()165   [[nodiscard]] constexpr bool is_write_open() const { return write_open_; }
166 
is_read_or_write_open()167   [[nodiscard]] constexpr bool is_read_or_write_open() const {
168     return read_open_ || write_open_;
169   }
170 
171   /// Read API
172 
173   /// Returns a `pw::multibuf::MultiBuf` with read data, if available. If data
174   /// is not available, invokes `cx.waker()` when it becomes available.
175   ///
176   /// For datagram channels, each successful read yields one complete
177   /// datagram, which may contain zero or more bytes. For byte stream channels,
178   /// each successful read yields one or more bytes.
179   ///
180   /// Channels only support one read operation / waker at a time.
181   ///
182   /// @returns @rst
183   ///
184   /// .. pw-status-codes::
185   ///
186   ///    OK: Data was read into a MultiBuf.
187   ///
188   ///    UNIMPLEMENTED: The channel does not support reading.
189   ///
190   ///    FAILED_PRECONDITION: The channel is closed.
191   ///
192   ///    OUT_OF_RANGE: The end of the stream was reached. This may be though
193   ///    of as reaching the end of a file. Future reads may succeed after
194   ///    ``Seek`` ing backwards, but no more new data will be produced. The
195   ///    channel is still open; writes and seeks may succeed.
196   ///
197   /// @endrst
PendRead(async2::Context & cx)198   async2::Poll<Result<multibuf::MultiBuf>> PendRead(async2::Context& cx) {
199     if (!is_read_open()) {
200       return Status::FailedPrecondition();
201     }
202     async2::Poll<Result<multibuf::MultiBuf>> result = DoPendRead(cx);
203     if (result.IsReady() && result->status().IsFailedPrecondition()) {
204       set_read_closed();
205     }
206     return result;
207   }
208 
209   /// Write API
210 
211   /// Checks whether a writeable channel is *currently* writeable.
212   ///
213   /// This should be called before attempting to ``Write``, and may be called
214   /// before allocating a write buffer if trying to reduce memory pressure.
215   ///
216   /// This method will return:
217   ///
218   /// * Ready(OK) - The channel is currently writeable, and a single caller
219   ///   may proceed to ``Write``.
220   /// * Ready(UNIMPLEMENTED) - The channel does not support writing.
221   /// * Ready(FAILED_PRECONDITION) - The channel is closed for writing.
222   /// * Pending - ``cx`` will be awoken when the channel becomes writeable
223   ///   again.
224   ///
225   /// Note: this method will always return ``Ready`` for non-writeable
226   /// channels.
PendReadyToWrite(pw::async2::Context & cx)227   async2::Poll<Status> PendReadyToWrite(pw::async2::Context& cx) {
228     if (!is_write_open()) {
229       return Status::FailedPrecondition();
230     }
231     async2::Poll<Status> result = DoPendReadyToWrite(cx);
232     if (result.IsReady() && result->IsFailedPrecondition()) {
233       set_write_closed();
234     }
235     return result;
236   }
237 
238   /// Gives access to an allocator for write buffers. The MultiBufAllocator
239   /// provides an asynchronous API for obtaining a buffer.
240   ///
241   /// This allocator must *only* be used to allocate the next argument to
242   /// ``Write``. The allocator must be used at most once per call to
243   /// ``Write``, and the returned ``MultiBuf`` must not be combined with
244   /// any other ``MultiBuf`` s or ``Chunk`` s.
245   ///
246   /// This method must not be called on channels which do not support writing.
GetWriteAllocator()247   multibuf::MultiBufAllocator& GetWriteAllocator() {
248     return DoGetWriteAllocator();
249   }
250 
251   /// Writes using a previously allocated MultiBuf. Returns a token that
252   /// refers to this write. These tokens are monotonically increasing, and
253   /// PendFlush() returns the value of the latest token it has flushed.
254   ///
255   /// The ``MultiBuf`` argument to ``Write`` may consist of either:
256   ///   (1) A single ``MultiBuf`` allocated by ``GetWriteAllocator()``
257   ///       that has not been combined with any other ``MultiBuf`` s
258   ///       or ``Chunk``s OR
259   ///   (2) A ``MultiBuf`` containing any combination of buffers from sources
260   ///       other than ``GetWriteAllocator``.
261   ///
262   /// This requirement allows for more efficient use of memory in case (1).
263   /// For example, a ring-buffer implementation of a ``Channel`` may
264   /// specialize ``GetWriteAllocator`` to return the next section of the
265   /// buffer available for writing.
266   ///
267   /// @returns @rst
268   /// May fail with the following error codes:
269   ///
270   /// .. pw-status-codes::
271   ///
272   ///    OK: Data was accepted by the channel.
273   ///
274   ///    UNIMPLEMENTED: The channel does not support writing.
275   ///
276   ///    UNAVAILABLE: The write failed due to a transient error (only applies
277   ///    to unreliable channels).
278   ///
279   ///    FAILED_PRECONDITION: The channel is closed.
280   ///
281   /// @endrst
Write(multibuf::MultiBuf && data)282   Result<WriteToken> Write(multibuf::MultiBuf&& data) {
283     if (!is_write_open()) {
284       return Status::FailedPrecondition();
285     }
286     Result<WriteToken> result = DoWrite(std::move(data));
287     if (result.status().IsFailedPrecondition()) {
288       set_write_closed();
289     }
290     return result;
291   }
292 
293   /// Flushes pending writes.
294   ///
295   /// Returns a ``async2::Poll`` indicating whether or not flushing has
296   /// completed.
297   ///
298   /// * Ready(OK) - All data has been successfully flushed.
299   /// * Ready(UNIMPLEMENTED) - The channel does not support writing.
300   /// * Ready(FAILED_PRECONDITION) - The channel is closed.
301   /// * Pending - Data remains to be flushed.
PendFlush(async2::Context & cx)302   async2::Poll<Result<WriteToken>> PendFlush(async2::Context& cx) {
303     if (!is_write_open()) {
304       return Status::FailedPrecondition();
305     }
306     async2::Poll<Result<WriteToken>> result = DoPendFlush(cx);
307     if (result.IsReady() && result->status().IsFailedPrecondition()) {
308       set_write_closed();
309     }
310     return result;
311   }
312 
313   /// Seek changes the position in the stream.
314   ///
315   /// TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
316   ///
317   /// Any ``PendRead`` or ``Write`` calls following a call to ``Seek`` will be
318   /// relative to the new position. Already-written data still being flushed
319   /// will be output relative to the old position.
320   ///
321   /// @returns @rst
322   ///
323   /// .. pw-status-codes::
324   ///
325   ///    OK: The current position was successfully changed.
326   ///
327   ///    UNIMPLEMENTED: The channel does not support seeking.
328   ///
329   ///    FAILED_PRECONDITION: The channel is closed.
330   ///
331   ///    NOT_FOUND: The seek was to a valid position, but the channel is no
332   ///    longer capable of seeking to this position (partially seekable
333   ///    channels only).
334   ///
335   ///    OUT_OF_RANGE: The seek went beyond the end of the stream.
336   ///
337   /// @endrst
338   Status Seek(async2::Context& cx, ptrdiff_t position, Whence whence);
339 
340   /// Returns the current position in the stream, or `kUnknownPosition` if
341   /// unsupported.
342   ///
343   /// TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
344   size_t Position() const;
345 
346   /// Closes the channel, flushing any data.
347   ///
348   /// @returns @rst
349   ///
350   /// .. pw-status-codes::
351   ///
352   ///    OK: The channel was closed and all data was sent successfully.
353   ///
354   ///    DATA_LOSS: The channel was closed, but not all previously written
355   ///    data was delivered.
356   ///
357   ///    FAILED_PRECONDITION: Channel was already closed, which can happen
358   ///    out-of-band due to errors.
359   ///
360   /// @endrst
PendClose(async2::Context & cx)361   async2::Poll<pw::Status> PendClose(async2::Context& cx) {
362     if (!is_read_or_write_open()) {
363       return Status::FailedPrecondition();
364     }
365     auto result = DoPendClose(cx);
366     if (result.IsReady()) {
367       set_read_closed();
368       set_write_closed();
369     }
370     return result;
371   }
372 
373  protected:
CreateWriteToken(uint32_t value)374   static constexpr WriteToken CreateWriteToken(uint32_t value) {
375     return WriteToken(value);
376   }
377 
378   // Marks the channel as closed for reading, but does nothing else.
379   //
380   // PendClose() always marks the channel closed when DoPendClose() returns
381   // Ready(), regardless of the status.
set_read_closed()382   void set_read_closed() { read_open_ = false; }
383 
384   // Marks the channel as closed for writing, but does nothing else.
385   //
386   // PendClose() always marks the channel closed when DoPendClose() returns
387   // Ready(), regardless of the status.
set_write_closed()388   void set_write_closed() { write_open_ = false; }
389 
390  private:
391   template <DataType, Property...>
392   friend class Channel;
393 
394   template <Property kLhs, Property kRhs, Property... kProperties>
PropertiesAreInOrderWithoutDuplicates()395   static constexpr bool PropertiesAreInOrderWithoutDuplicates() {
396     return (kLhs < kRhs) &&
397            PropertiesAreInOrderWithoutDuplicates<kRhs, kProperties...>();
398   }
399 
400   template <Property>
PropertiesAreInOrderWithoutDuplicates()401   static constexpr bool PropertiesAreInOrderWithoutDuplicates() {
402     return true;
403   }
404 
405   template <Property... kProperties>
GetProperties()406   static constexpr uint8_t GetProperties() {
407     return (static_cast<uint8_t>(kProperties) | ...);
408   }
409 
410   template <Property... kProperties>
PropertiesAreValid()411   static constexpr bool PropertiesAreValid() {
412     static_assert(((kProperties != kSeekable) && ...),
413                   "Seekable channels are not yet implemented; see b/323624921");
414 
415     static_assert(((kProperties == kReadable) || ...) ||
416                       ((kProperties == kWritable) || ...),
417                   "At least one of kReadable or kWritable must be provided");
418     static_assert(sizeof...(kProperties) <= 4,
419                   "Too many properties given; no more than 4 may be specified "
420                   "(kReliable, kReadable, kWritable, kSeekable)");
421     static_assert(
422         PropertiesAreInOrderWithoutDuplicates<kProperties...>(),
423         "Properties must be specified in the following order, without "
424         "duplicates: kReliable, kReadable, kWritable, kSeekable");
425     return true;
426   }
427 
428   // `AnyChannel` may only be constructed by deriving from `Channel`.
AnyChannel(DataType type,uint8_t properties)429   explicit constexpr AnyChannel(DataType type, uint8_t properties)
430       : data_type_(type),
431         properties_(properties),
432         read_open_(readable()),
433         write_open_(writable()) {}
434 
435   // Virtual interface
436 
437   // Read functions
438 
439   // The max_bytes argument is ignored for datagram-oriented channels.
440   virtual async2::Poll<Result<multibuf::MultiBuf>> DoPendRead(
441       async2::Context& cx) = 0;
442 
443   // Write functions
444 
445   virtual multibuf::MultiBufAllocator& DoGetWriteAllocator() = 0;
446 
447   virtual pw::async2::Poll<Status> DoPendReadyToWrite(async2::Context& cx) = 0;
448 
449   virtual Result<WriteToken> DoWrite(multibuf::MultiBuf&& buffer) = 0;
450 
451   virtual pw::async2::Poll<Result<WriteToken>> DoPendFlush(
452       async2::Context& cx) = 0;
453 
454   // Seek functions
455   /// TODO: b/323622630 - `Seek` and `Position` are not yet implemented.
456 
457   // virtual Status DoSeek(ptrdiff_t position, Whence whence) = 0;
458 
459   // virtual size_t DoPosition() const = 0;
460 
461   // Common functions
462   virtual async2::Poll<Status> DoPendClose(async2::Context& cx) = 0;
463 
464   DataType data_type_;
465   uint8_t properties_;
466   bool read_open_;
467   bool write_open_;
468 };
469 
470 /// The basic `Channel` type. Unlike `AnyChannel`, the `Channel`'s properties
471 /// are expressed in template parameters and thus reflected in the type.
472 ///
473 /// Properties must be specified in order (`kDatagram`, `kReliable`,
474 /// `kReadable`, `kWritable`, `kSeekable`) and without duplicates.
475 template <DataType kDataType, Property... kProperties>
476 class Channel : public AnyChannel {
477   static_assert(PropertiesAreValid<kProperties...>());
478 };
479 
480 /// A `ByteChannel` exchanges data as a stream of bytes.
481 template <Property... kProperties>
482 using ByteChannel = Channel<DataType::kByte, kProperties...>;
483 
484 /// A `DatagramChannel` exchanges data as a series of datagrams.
485 template <Property... kProperties>
486 using DatagramChannel = Channel<DataType::kDatagram, kProperties...>;
487 
488 /// Unreliable byte-oriented `Channel` that supports reading.
489 using ByteReader = ByteChannel<kReadable>;
490 /// Unreliable byte-oriented `Channel` that supports writing.
491 using ByteWriter = ByteChannel<kWritable>;
492 /// Unreliable byte-oriented `Channel` that supports reading and writing.
493 using ByteReaderWriter = ByteChannel<kReadable, kWritable>;
494 
495 /// Reliable byte-oriented `Channel` that supports reading.
496 using ReliableByteReader = ByteChannel<kReliable, kReadable>;
497 /// Reliable byte-oriented `Channel` that supports writing.
498 using ReliableByteWriter = ByteChannel<kReliable, kWritable>;
499 /// Reliable byte-oriented `Channel` that supports reading and writing.
500 using ReliableByteReaderWriter = ByteChannel<kReliable, kReadable, kWritable>;
501 
502 /// Unreliable datagram-oriented `Channel` that supports reading.
503 using DatagramReader = DatagramChannel<kReadable>;
504 /// Unreliable datagram-oriented `Channel` that supports writing.
505 using DatagramWriter = DatagramChannel<kWritable>;
506 /// Unreliable datagram-oriented `Channel` that supports reading and writing.
507 using DatagramReaderWriter = DatagramChannel<kReadable, kWritable>;
508 
509 /// Reliable datagram-oriented `Channel` that supports reading.
510 using ReliableDatagramReader = DatagramChannel<kReliable, kReadable>;
511 /// Reliable datagram-oriented `Channel` that supports writing.
512 using ReliableDatagramWriter = DatagramChannel<kReliable, kWritable>;
513 /// Reliable datagram-oriented `Channel` that supports reading and writing.
514 using ReliableDatagramReaderWriter =
515     DatagramChannel<kReliable, kReadable, kWritable>;
516 
517 /// @}
518 
519 }  // namespace pw::channel
520 
521 // Include specializations for supported Channel types.
522 #include "pw_channel/internal/channel_specializations.h"
523