1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 // __ ___ ___ _ _ ___ _ _ ___ 17 // \ \ / /_\ | _ \ \| |_ _| \| |/ __| 18 // \ \/\/ / _ \| / .` || || .` | (_ | 19 // \_/\_/_/ \_\_|_\_|\_|___|_|\_|\___| 20 // _____ _____ ___ ___ ___ __ __ ___ _ _ _____ _ _ 21 // | __\ \/ / _ \ __| _ \_ _| \/ | __| \| |_ _/_\ | | 22 // | _| > <| _/ _|| /| || |\/| | _|| .` | | |/ _ \| |__ 23 // |___/_/\_\_| |___|_|_\___|_| |_|___|_|\_| |_/_/ \_\____| 24 // 25 // This module is in an early, experimental state. The APIs are in flux and may 26 // change without notice. Please do not rely on it in production code, but feel 27 // free to explore and share feedback with the Pigweed team! 28 29 #include <cstddef> 30 #include <cstdint> 31 #include <limits> 32 33 #include "pw_async2/dispatcher.h" 34 #include "pw_async2/poll.h" 35 #include "pw_bytes/span.h" 36 #include "pw_multibuf/allocator.h" 37 #include "pw_multibuf/multibuf.h" 38 #include "pw_result/result.h" 39 #include "pw_status/status.h" 40 41 namespace pw::channel { 42 43 /// @defgroup pw_channel 44 /// @{ 45 46 /// Basic properties of a `Channel`. A `Channel` type can convert to any other 47 /// `Channel` for which it supports the required properties. For example, a 48 /// `kReadable` and `kWritable` channel may be passed to an API that only 49 /// requires `kReadable`. 50 enum Property : uint8_t { 51 /// All data is guaranteed to be delivered in order. The channel is closed if 52 /// data is lost. 53 kReliable = 1 << 0, 54 55 /// The channel supports reading. 56 kReadable = 1 << 1, 57 58 /// The channel supports writing. 59 kWritable = 1 << 2, 60 61 /// The channel supports seeking (changing the read/write position). 62 kSeekable = 1 << 3, 63 }; 64 65 /// The type of data exchanged in `Channel` read and write calls. Unlike 66 /// `Property`, `Channels` with different `DataType`s cannot be used 67 /// interchangeably. 68 enum class DataType : uint8_t { kByte = 0, kDatagram = 1 }; 69 70 /// Positions from which to seek. 71 enum Whence : uint8_t { 72 /// Seek from the beginning of the channel. The offset is a direct offset 73 /// into the data. 74 kBeginning, 75 76 /// Seek from the current position in the channel. The offset is added to 77 /// the current position. Use a negative offset to seek backwards. 78 /// 79 /// Implementations may only support seeking within a limited range from the 80 /// current position. 81 kCurrent, 82 83 /// Seek from the end of the channel. The offset is added to the end 84 /// position. Use a negative offset to seek backwards from the end. 85 kEnd, 86 }; 87 88 /// Represents a write operation. `WriteToken` can be used to track whether a 89 /// particular write has been flushed. 90 class [[nodiscard]] WriteToken { 91 public: WriteToken()92 constexpr WriteToken() : token_(0) {} 93 94 constexpr WriteToken(const WriteToken&) = default; 95 constexpr WriteToken& operator=(const WriteToken&) = default; 96 97 constexpr bool operator==(const WriteToken& other) const { 98 return token_ == other.token_; 99 } 100 constexpr bool operator!=(const WriteToken& other) const { 101 return token_ != other.token_; 102 } 103 constexpr bool operator<(const WriteToken& other) const { 104 return token_ < other.token_; 105 } 106 constexpr bool operator>(const WriteToken& other) const { 107 return token_ > other.token_; 108 } 109 constexpr bool operator<=(const WriteToken& other) const { 110 return token_ <= other.token_; 111 } 112 constexpr bool operator>=(const WriteToken& other) const { 113 return token_ >= other.token_; 114 } 115 116 private: 117 friend class AnyChannel; 118 WriteToken(uint32_t value)119 constexpr WriteToken(uint32_t value) : token_(value) {} 120 121 uint32_t token_; 122 }; 123 124 /// A generic data channel that may support reading or writing bytes or 125 /// datagrams. 126 /// 127 /// Note that this channel should be used from only one ``pw::async::Task`` 128 /// at a time, as the ``Pend`` methods are only required to remember the 129 /// latest ``pw::async2::Context`` that was provided. 130 class AnyChannel { 131 public: 132 virtual ~AnyChannel() = default; 133 134 // Returned by Position() if getting the position is not supported. 135 // TODO: b/323622630 - `Seek` and `Position` are not yet implemented. 136 // static constexpr size_t kUnknownPosition = 137 // std::numeric_limits<size_t>::max(); 138 139 // Channel properties 140 data_type()141 [[nodiscard]] constexpr DataType data_type() const { return data_type_; } 142 reliable()143 [[nodiscard]] constexpr bool reliable() const { 144 return (properties_ & Property::kReliable) != 0; 145 } 146 seekable()147 [[nodiscard]] constexpr bool seekable() const { 148 return (properties_ & Property::kSeekable) != 0; 149 } 150 readable()151 [[nodiscard]] constexpr bool readable() const { 152 return (properties_ & Property::kReadable) != 0; 153 } 154 writable()155 [[nodiscard]] constexpr bool writable() const { 156 return (properties_ & Property::kWritable) != 0; 157 } 158 159 /// True if the channel is open for reading. Always false for write-only 160 /// channels. is_read_open()161 [[nodiscard]] constexpr bool is_read_open() const { return read_open_; } 162 163 /// True if the channel is open for writing. Always false for read-only 164 /// channels. is_write_open()165 [[nodiscard]] constexpr bool is_write_open() const { return write_open_; } 166 is_read_or_write_open()167 [[nodiscard]] constexpr bool is_read_or_write_open() const { 168 return read_open_ || write_open_; 169 } 170 171 /// Read API 172 173 /// Returns a `pw::multibuf::MultiBuf` with read data, if available. If data 174 /// is not available, invokes `cx.waker()` when it becomes available. 175 /// 176 /// For datagram channels, each successful read yields one complete 177 /// datagram, which may contain zero or more bytes. For byte stream channels, 178 /// each successful read yields one or more bytes. 179 /// 180 /// Channels only support one read operation / waker at a time. 181 /// 182 /// @returns @rst 183 /// 184 /// .. pw-status-codes:: 185 /// 186 /// OK: Data was read into a MultiBuf. 187 /// 188 /// UNIMPLEMENTED: The channel does not support reading. 189 /// 190 /// FAILED_PRECONDITION: The channel is closed. 191 /// 192 /// OUT_OF_RANGE: The end of the stream was reached. This may be though 193 /// of as reaching the end of a file. Future reads may succeed after 194 /// ``Seek`` ing backwards, but no more new data will be produced. The 195 /// channel is still open; writes and seeks may succeed. 196 /// 197 /// @endrst PendRead(async2::Context & cx)198 async2::Poll<Result<multibuf::MultiBuf>> PendRead(async2::Context& cx) { 199 if (!is_read_open()) { 200 return Status::FailedPrecondition(); 201 } 202 async2::Poll<Result<multibuf::MultiBuf>> result = DoPendRead(cx); 203 if (result.IsReady() && result->status().IsFailedPrecondition()) { 204 set_read_closed(); 205 } 206 return result; 207 } 208 209 /// Write API 210 211 /// Checks whether a writeable channel is *currently* writeable. 212 /// 213 /// This should be called before attempting to ``Write``, and may be called 214 /// before allocating a write buffer if trying to reduce memory pressure. 215 /// 216 /// This method will return: 217 /// 218 /// * Ready(OK) - The channel is currently writeable, and a single caller 219 /// may proceed to ``Write``. 220 /// * Ready(UNIMPLEMENTED) - The channel does not support writing. 221 /// * Ready(FAILED_PRECONDITION) - The channel is closed for writing. 222 /// * Pending - ``cx`` will be awoken when the channel becomes writeable 223 /// again. 224 /// 225 /// Note: this method will always return ``Ready`` for non-writeable 226 /// channels. PendReadyToWrite(pw::async2::Context & cx)227 async2::Poll<Status> PendReadyToWrite(pw::async2::Context& cx) { 228 if (!is_write_open()) { 229 return Status::FailedPrecondition(); 230 } 231 async2::Poll<Status> result = DoPendReadyToWrite(cx); 232 if (result.IsReady() && result->IsFailedPrecondition()) { 233 set_write_closed(); 234 } 235 return result; 236 } 237 238 /// Gives access to an allocator for write buffers. The MultiBufAllocator 239 /// provides an asynchronous API for obtaining a buffer. 240 /// 241 /// This allocator must *only* be used to allocate the next argument to 242 /// ``Write``. The allocator must be used at most once per call to 243 /// ``Write``, and the returned ``MultiBuf`` must not be combined with 244 /// any other ``MultiBuf`` s or ``Chunk`` s. 245 /// 246 /// This method must not be called on channels which do not support writing. GetWriteAllocator()247 multibuf::MultiBufAllocator& GetWriteAllocator() { 248 return DoGetWriteAllocator(); 249 } 250 251 /// Writes using a previously allocated MultiBuf. Returns a token that 252 /// refers to this write. These tokens are monotonically increasing, and 253 /// PendFlush() returns the value of the latest token it has flushed. 254 /// 255 /// The ``MultiBuf`` argument to ``Write`` may consist of either: 256 /// (1) A single ``MultiBuf`` allocated by ``GetWriteAllocator()`` 257 /// that has not been combined with any other ``MultiBuf`` s 258 /// or ``Chunk``s OR 259 /// (2) A ``MultiBuf`` containing any combination of buffers from sources 260 /// other than ``GetWriteAllocator``. 261 /// 262 /// This requirement allows for more efficient use of memory in case (1). 263 /// For example, a ring-buffer implementation of a ``Channel`` may 264 /// specialize ``GetWriteAllocator`` to return the next section of the 265 /// buffer available for writing. 266 /// 267 /// @returns @rst 268 /// May fail with the following error codes: 269 /// 270 /// .. pw-status-codes:: 271 /// 272 /// OK: Data was accepted by the channel. 273 /// 274 /// UNIMPLEMENTED: The channel does not support writing. 275 /// 276 /// UNAVAILABLE: The write failed due to a transient error (only applies 277 /// to unreliable channels). 278 /// 279 /// FAILED_PRECONDITION: The channel is closed. 280 /// 281 /// @endrst Write(multibuf::MultiBuf && data)282 Result<WriteToken> Write(multibuf::MultiBuf&& data) { 283 if (!is_write_open()) { 284 return Status::FailedPrecondition(); 285 } 286 Result<WriteToken> result = DoWrite(std::move(data)); 287 if (result.status().IsFailedPrecondition()) { 288 set_write_closed(); 289 } 290 return result; 291 } 292 293 /// Flushes pending writes. 294 /// 295 /// Returns a ``async2::Poll`` indicating whether or not flushing has 296 /// completed. 297 /// 298 /// * Ready(OK) - All data has been successfully flushed. 299 /// * Ready(UNIMPLEMENTED) - The channel does not support writing. 300 /// * Ready(FAILED_PRECONDITION) - The channel is closed. 301 /// * Pending - Data remains to be flushed. PendFlush(async2::Context & cx)302 async2::Poll<Result<WriteToken>> PendFlush(async2::Context& cx) { 303 if (!is_write_open()) { 304 return Status::FailedPrecondition(); 305 } 306 async2::Poll<Result<WriteToken>> result = DoPendFlush(cx); 307 if (result.IsReady() && result->status().IsFailedPrecondition()) { 308 set_write_closed(); 309 } 310 return result; 311 } 312 313 /// Seek changes the position in the stream. 314 /// 315 /// TODO: b/323622630 - `Seek` and `Position` are not yet implemented. 316 /// 317 /// Any ``PendRead`` or ``Write`` calls following a call to ``Seek`` will be 318 /// relative to the new position. Already-written data still being flushed 319 /// will be output relative to the old position. 320 /// 321 /// @returns @rst 322 /// 323 /// .. pw-status-codes:: 324 /// 325 /// OK: The current position was successfully changed. 326 /// 327 /// UNIMPLEMENTED: The channel does not support seeking. 328 /// 329 /// FAILED_PRECONDITION: The channel is closed. 330 /// 331 /// NOT_FOUND: The seek was to a valid position, but the channel is no 332 /// longer capable of seeking to this position (partially seekable 333 /// channels only). 334 /// 335 /// OUT_OF_RANGE: The seek went beyond the end of the stream. 336 /// 337 /// @endrst 338 Status Seek(async2::Context& cx, ptrdiff_t position, Whence whence); 339 340 /// Returns the current position in the stream, or `kUnknownPosition` if 341 /// unsupported. 342 /// 343 /// TODO: b/323622630 - `Seek` and `Position` are not yet implemented. 344 size_t Position() const; 345 346 /// Closes the channel, flushing any data. 347 /// 348 /// @returns @rst 349 /// 350 /// .. pw-status-codes:: 351 /// 352 /// OK: The channel was closed and all data was sent successfully. 353 /// 354 /// DATA_LOSS: The channel was closed, but not all previously written 355 /// data was delivered. 356 /// 357 /// FAILED_PRECONDITION: Channel was already closed, which can happen 358 /// out-of-band due to errors. 359 /// 360 /// @endrst PendClose(async2::Context & cx)361 async2::Poll<pw::Status> PendClose(async2::Context& cx) { 362 if (!is_read_or_write_open()) { 363 return Status::FailedPrecondition(); 364 } 365 auto result = DoPendClose(cx); 366 if (result.IsReady()) { 367 set_read_closed(); 368 set_write_closed(); 369 } 370 return result; 371 } 372 373 protected: CreateWriteToken(uint32_t value)374 static constexpr WriteToken CreateWriteToken(uint32_t value) { 375 return WriteToken(value); 376 } 377 378 // Marks the channel as closed for reading, but does nothing else. 379 // 380 // PendClose() always marks the channel closed when DoPendClose() returns 381 // Ready(), regardless of the status. set_read_closed()382 void set_read_closed() { read_open_ = false; } 383 384 // Marks the channel as closed for writing, but does nothing else. 385 // 386 // PendClose() always marks the channel closed when DoPendClose() returns 387 // Ready(), regardless of the status. set_write_closed()388 void set_write_closed() { write_open_ = false; } 389 390 private: 391 template <DataType, Property...> 392 friend class Channel; 393 394 template <Property kLhs, Property kRhs, Property... kProperties> PropertiesAreInOrderWithoutDuplicates()395 static constexpr bool PropertiesAreInOrderWithoutDuplicates() { 396 return (kLhs < kRhs) && 397 PropertiesAreInOrderWithoutDuplicates<kRhs, kProperties...>(); 398 } 399 400 template <Property> PropertiesAreInOrderWithoutDuplicates()401 static constexpr bool PropertiesAreInOrderWithoutDuplicates() { 402 return true; 403 } 404 405 template <Property... kProperties> GetProperties()406 static constexpr uint8_t GetProperties() { 407 return (static_cast<uint8_t>(kProperties) | ...); 408 } 409 410 template <Property... kProperties> PropertiesAreValid()411 static constexpr bool PropertiesAreValid() { 412 static_assert(((kProperties != kSeekable) && ...), 413 "Seekable channels are not yet implemented; see b/323624921"); 414 415 static_assert(((kProperties == kReadable) || ...) || 416 ((kProperties == kWritable) || ...), 417 "At least one of kReadable or kWritable must be provided"); 418 static_assert(sizeof...(kProperties) <= 4, 419 "Too many properties given; no more than 4 may be specified " 420 "(kReliable, kReadable, kWritable, kSeekable)"); 421 static_assert( 422 PropertiesAreInOrderWithoutDuplicates<kProperties...>(), 423 "Properties must be specified in the following order, without " 424 "duplicates: kReliable, kReadable, kWritable, kSeekable"); 425 return true; 426 } 427 428 // `AnyChannel` may only be constructed by deriving from `Channel`. AnyChannel(DataType type,uint8_t properties)429 explicit constexpr AnyChannel(DataType type, uint8_t properties) 430 : data_type_(type), 431 properties_(properties), 432 read_open_(readable()), 433 write_open_(writable()) {} 434 435 // Virtual interface 436 437 // Read functions 438 439 // The max_bytes argument is ignored for datagram-oriented channels. 440 virtual async2::Poll<Result<multibuf::MultiBuf>> DoPendRead( 441 async2::Context& cx) = 0; 442 443 // Write functions 444 445 virtual multibuf::MultiBufAllocator& DoGetWriteAllocator() = 0; 446 447 virtual pw::async2::Poll<Status> DoPendReadyToWrite(async2::Context& cx) = 0; 448 449 virtual Result<WriteToken> DoWrite(multibuf::MultiBuf&& buffer) = 0; 450 451 virtual pw::async2::Poll<Result<WriteToken>> DoPendFlush( 452 async2::Context& cx) = 0; 453 454 // Seek functions 455 /// TODO: b/323622630 - `Seek` and `Position` are not yet implemented. 456 457 // virtual Status DoSeek(ptrdiff_t position, Whence whence) = 0; 458 459 // virtual size_t DoPosition() const = 0; 460 461 // Common functions 462 virtual async2::Poll<Status> DoPendClose(async2::Context& cx) = 0; 463 464 DataType data_type_; 465 uint8_t properties_; 466 bool read_open_; 467 bool write_open_; 468 }; 469 470 /// The basic `Channel` type. Unlike `AnyChannel`, the `Channel`'s properties 471 /// are expressed in template parameters and thus reflected in the type. 472 /// 473 /// Properties must be specified in order (`kDatagram`, `kReliable`, 474 /// `kReadable`, `kWritable`, `kSeekable`) and without duplicates. 475 template <DataType kDataType, Property... kProperties> 476 class Channel : public AnyChannel { 477 static_assert(PropertiesAreValid<kProperties...>()); 478 }; 479 480 /// A `ByteChannel` exchanges data as a stream of bytes. 481 template <Property... kProperties> 482 using ByteChannel = Channel<DataType::kByte, kProperties...>; 483 484 /// A `DatagramChannel` exchanges data as a series of datagrams. 485 template <Property... kProperties> 486 using DatagramChannel = Channel<DataType::kDatagram, kProperties...>; 487 488 /// Unreliable byte-oriented `Channel` that supports reading. 489 using ByteReader = ByteChannel<kReadable>; 490 /// Unreliable byte-oriented `Channel` that supports writing. 491 using ByteWriter = ByteChannel<kWritable>; 492 /// Unreliable byte-oriented `Channel` that supports reading and writing. 493 using ByteReaderWriter = ByteChannel<kReadable, kWritable>; 494 495 /// Reliable byte-oriented `Channel` that supports reading. 496 using ReliableByteReader = ByteChannel<kReliable, kReadable>; 497 /// Reliable byte-oriented `Channel` that supports writing. 498 using ReliableByteWriter = ByteChannel<kReliable, kWritable>; 499 /// Reliable byte-oriented `Channel` that supports reading and writing. 500 using ReliableByteReaderWriter = ByteChannel<kReliable, kReadable, kWritable>; 501 502 /// Unreliable datagram-oriented `Channel` that supports reading. 503 using DatagramReader = DatagramChannel<kReadable>; 504 /// Unreliable datagram-oriented `Channel` that supports writing. 505 using DatagramWriter = DatagramChannel<kWritable>; 506 /// Unreliable datagram-oriented `Channel` that supports reading and writing. 507 using DatagramReaderWriter = DatagramChannel<kReadable, kWritable>; 508 509 /// Reliable datagram-oriented `Channel` that supports reading. 510 using ReliableDatagramReader = DatagramChannel<kReliable, kReadable>; 511 /// Reliable datagram-oriented `Channel` that supports writing. 512 using ReliableDatagramWriter = DatagramChannel<kReliable, kWritable>; 513 /// Reliable datagram-oriented `Channel` that supports reading and writing. 514 using ReliableDatagramReaderWriter = 515 DatagramChannel<kReliable, kReadable, kWritable>; 516 517 /// @} 518 519 } // namespace pw::channel 520 521 // Include specializations for supported Channel types. 522 #include "pw_channel/internal/channel_specializations.h" 523