1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cassert> 17 #include <cstddef> 18 #include <limits> 19 #include <utility> 20 21 #include "pw_containers/intrusive_list.h" 22 #include "pw_function/function.h" 23 #include "pw_rpc/internal/call_context.h" 24 #include "pw_rpc/internal/channel.h" 25 #include "pw_rpc/internal/lock.h" 26 #include "pw_rpc/internal/method.h" 27 #include "pw_rpc/internal/packet.h" 28 #include "pw_rpc/method_type.h" 29 #include "pw_rpc/service.h" 30 #include "pw_span/span.h" 31 #include "pw_status/status.h" 32 #include "pw_sync/lock_annotations.h" 33 34 namespace pw::rpc { 35 36 class Writer; 37 38 namespace internal { 39 40 class Endpoint; 41 class LockedEndpoint; 42 class Packet; 43 44 // Whether a call object is associated with a server or a client. 45 enum CallType : bool { kServerCall, kClientCall }; 46 47 // Whether callbacks that take a proto use the raw data directly or decode it 48 // to a struct. The RPC lock is held when invoking callbacks that decode to a 49 // struct. 50 enum CallbackProtoType : bool { kRawProto, kProtoStruct }; 51 52 // Immutable properties of a call object. These do not change after an active 53 // call is initialized. 54 // 55 // Bits 56 // 0-1: MethodType 57 // 2: CallType 58 // 3: Bool for whether callbacks decode to proto structs 59 // 60 class CallProperties { 61 public: CallProperties()62 constexpr CallProperties() : bits_(0u) {} 63 CallProperties(MethodType method_type,CallType call_type,CallbackProtoType callback_proto_type)64 constexpr CallProperties(MethodType method_type, 65 CallType call_type, 66 CallbackProtoType callback_proto_type) 67 : bits_((static_cast<uint8_t>(method_type) << 0) | 68 (static_cast<uint8_t>(call_type) << 2) | 69 (static_cast<uint8_t>(callback_proto_type) << 3)) {} 70 71 constexpr CallProperties(const CallProperties&) = default; 72 73 constexpr CallProperties& operator=(const CallProperties&) = default; 74 method_type()75 constexpr MethodType method_type() const { 76 return static_cast<MethodType>(bits_ & 0b0011u); 77 } 78 call_type()79 constexpr CallType call_type() const { 80 return static_cast<CallType>((bits_ & 0b0100u) >> 2); 81 } 82 callback_proto_type()83 constexpr CallbackProtoType callback_proto_type() const { 84 return static_cast<CallbackProtoType>((bits_ & 0b1000u) >> 3); 85 } 86 87 private: 88 uint8_t bits_; 89 }; 90 91 // Unrequested RPCs always use this call ID. When a subsequent request 92 // or response is sent with a matching channel + service + method, 93 // it will match a calls with this ID if one exists. 94 inline constexpr uint32_t kOpenCallId = std::numeric_limits<uint32_t>::max(); 95 96 // Internal RPC Call class. The Call is used to respond to any type of RPC. 97 // Public classes like ServerWriters inherit from it with private inheritance 98 // and provide a public API for their use case. The Call's public API is used by 99 // the Server and Client classes. 100 // 101 // Private inheritance is used in place of composition or more complex 102 // inheritance hierarchy so that these objects all inherit from a common 103 // IntrusiveList::Item object. Private inheritance also gives the derived classs 104 // full control over their interfaces. 105 class Call : public IntrusiveList<Call>::Item { 106 public: 107 Call(const Call&) = delete; 108 109 // Move support is provided to derived classes through the MoveFrom function. 110 Call(Call&&) = delete; 111 112 Call& operator=(const Call&) = delete; 113 Call& operator=(Call&&) = delete; 114 115 ~Call() PW_LOCKS_EXCLUDED(rpc_lock()); 116 117 // True if the Call is active and ready to send responses. active()118 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) { 119 RpcLockGuard lock; 120 return active_locked(); 121 } 122 active_locked()123 [[nodiscard]] bool active_locked() const 124 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 125 return (state_ & kActive) != 0; 126 } 127 awaiting_cleanup()128 [[nodiscard]] bool awaiting_cleanup() const 129 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 130 return awaiting_cleanup_ != OkStatus().code(); 131 } 132 id()133 uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; } 134 set_id(uint32_t id)135 void set_id(uint32_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { id_ = id; } 136 137 // Public function for accessing the channel ID of this call. Set to 0 when 138 // the call is closed. channel_id()139 uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) { 140 RpcLockGuard lock; 141 return channel_id_locked(); 142 } 143 channel_id_locked()144 uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 145 return channel_id_; 146 } 147 service_id()148 uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 149 return service_id_; 150 } 151 method_id()152 uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 153 return method_id_; 154 } 155 156 // Return whether this is a server or client call. type()157 CallType type() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 158 return properties_.call_type(); 159 } 160 161 // Closes the Call and sends a RESPONSE packet, if it is active. Returns the 162 // status from sending the packet, or FAILED_PRECONDITION if the Call is not 163 // active. CloseAndSendResponse(ConstByteSpan response,Status status)164 Status CloseAndSendResponse(ConstByteSpan response, Status status) 165 PW_LOCKS_EXCLUDED(rpc_lock()) { 166 RpcLockGuard lock; 167 return CloseAndSendResponseLocked(response, status); 168 } 169 CloseAndSendResponseLocked(ConstByteSpan response,Status status)170 Status CloseAndSendResponseLocked(ConstByteSpan response, Status status) 171 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 172 return CloseAndSendFinalPacketLocked( 173 pwpb::PacketType::RESPONSE, response, status); 174 } 175 CloseAndSendResponse(Status status)176 Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) { 177 return CloseAndSendResponse({}, status); 178 } 179 CloseAndSendServerErrorLocked(Status error)180 Status CloseAndSendServerErrorLocked(Status error) 181 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 182 return CloseAndSendFinalPacketLocked( 183 pwpb::PacketType::SERVER_ERROR, {}, error); 184 } 185 186 // Public function that ends the client stream for a client call. CloseClientStream()187 Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock()) { 188 RpcLockGuard lock; 189 return CloseClientStreamLocked(); 190 } 191 192 // Internal function that closes the client stream. CloseClientStreamLocked()193 Status CloseClientStreamLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 194 MarkClientStreamCompleted(); 195 return SendPacket(pwpb::PacketType::CLIENT_STREAM_END, {}, {}); 196 } 197 198 // Sends a payload in either a server or client stream packet. Write(ConstByteSpan payload)199 Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) { 200 RpcLockGuard lock; 201 return WriteLocked(payload); 202 } 203 204 Status WriteLocked(ConstByteSpan payload) 205 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 206 207 // Sends the initial request for a client call. If the request fails, the call 208 // is closed. SendInitialClientRequest(ConstByteSpan payload)209 void SendInitialClientRequest(ConstByteSpan payload) 210 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 211 if (const Status status = SendPacket(pwpb::PacketType::REQUEST, payload); 212 !status.ok()) { 213 CloseAndMarkForCleanup(status); 214 } 215 } 216 217 void CloseAndMarkForCleanup(Status error) 218 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 219 220 // Whenever a payload arrives (in a server/client stream or in a response), 221 // call the on_next_ callback. 222 // Precondition: rpc_lock() must be held. 223 void HandlePayload(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock()); 224 225 // Handles an error condition for the call. This closes the call and calls the 226 // on_error callback, if set. HandleError(Status status)227 void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) { 228 UnregisterAndMarkClosed(); 229 CallOnError(status); 230 } 231 232 // Closes the RPC, but does NOT unregister the call or call on_error. The 233 // call must be moved to the endpoint's to_cleanup_ list and have its 234 // CleanUp() method called at a later time. Only for use by the Endpoint. CloseAndMarkForCleanupFromEndpoint(Status error)235 void CloseAndMarkForCleanupFromEndpoint(Status error) 236 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 237 MarkClosed(); 238 awaiting_cleanup_ = error.code(); 239 } 240 241 // Clears the awaiting_cleanup_ variable and calls the on_error callback. Only 242 // for use by the Endpoint, which will unlist the call. CleanUpFromEndpoint()243 void CleanUpFromEndpoint() PW_UNLOCK_FUNCTION(rpc_lock()) { 244 const Status status(static_cast<Status::Code>(awaiting_cleanup_)); 245 awaiting_cleanup_ = OkStatus().code(); 246 CallOnError(status); 247 } 248 has_client_stream()249 bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 250 return HasClientStream(properties_.method_type()); 251 } 252 has_server_stream()253 bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 254 return HasServerStream(properties_.method_type()); 255 } 256 client_stream_open()257 bool client_stream_open() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 258 return (state_ & kClientStreamActive) != 0; 259 } 260 261 // Closes a call without doing anything else. Called from the Endpoint 262 // destructor. CloseFromDeletedEndpoint()263 void CloseFromDeletedEndpoint() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 264 MarkClosed(); 265 awaiting_cleanup_ = OkStatus().code(); 266 endpoint_ = nullptr; 267 } 268 269 protected: 270 // Creates an inactive Call. Call()271 constexpr Call() 272 : endpoint_{}, 273 channel_id_{}, 274 id_{}, 275 service_id_{}, 276 method_id_{}, 277 state_{}, 278 awaiting_cleanup_{}, 279 callbacks_executing_{}, 280 properties_{} {} 281 282 // Creates an active server-side Call. 283 Call(const LockedCallContext& context, CallProperties properties) 284 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 285 286 // Creates an active client-side Call. 287 Call(LockedEndpoint& client, 288 uint32_t channel_id, 289 uint32_t service_id, 290 uint32_t method_id, 291 CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 292 CallbackStarted()293 void CallbackStarted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 294 callbacks_executing_ += 1; 295 } 296 CallbackFinished()297 void CallbackFinished() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 298 callbacks_executing_ -= 1; 299 } 300 301 // This call must be in a closed state when this is called. 302 void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 303 endpoint()304 Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 305 return *endpoint_; 306 } 307 308 // Public function that sets the on_next function in the raw API. set_on_next(Function<void (ConstByteSpan)> && on_next)309 void set_on_next(Function<void(ConstByteSpan)>&& on_next) 310 PW_LOCKS_EXCLUDED(rpc_lock()) { 311 RpcLockGuard lock; 312 set_on_next_locked(std::move(on_next)); 313 } 314 315 // Internal function that sets on_next. set_on_next_locked(Function<void (ConstByteSpan)> && on_next)316 void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next) 317 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 318 on_next_ = std::move(on_next); 319 } 320 321 // Public function that sets the on_error callback. set_on_error(Function<void (Status)> && on_error)322 void set_on_error(Function<void(Status)>&& on_error) 323 PW_LOCKS_EXCLUDED(rpc_lock()) { 324 RpcLockGuard lock; 325 set_on_error_locked(std::move(on_error)); 326 } 327 328 // Internal function that sets on_error. set_on_error_locked(Function<void (Status)> && on_error)329 void set_on_error_locked(Function<void(Status)>&& on_error) 330 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 331 on_error_ = std::move(on_error); 332 } 333 MarkClientStreamCompleted()334 void MarkClientStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 335 state_ &= ~kClientStreamActive; 336 } 337 CloseAndSendResponseLocked(Status status)338 Status CloseAndSendResponseLocked(Status status) 339 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 340 return CloseAndSendFinalPacketLocked( 341 pwpb::PacketType::RESPONSE, {}, status); 342 } 343 344 // Cancels an RPC. Public function for client calls only. Cancel()345 Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) { 346 RpcLockGuard lock; 347 return CloseAndSendFinalPacketLocked( 348 pwpb::PacketType::CLIENT_ERROR, {}, Status::Cancelled()); 349 } 350 351 // Unregisters the RPC from the endpoint & marks as closed. The call may be 352 // active or inactive when this is called. 353 void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 354 355 // Define conversions to the generic server/client RPC writer class. These 356 // functions are defined in pw_rpc/writer.h after the Writer class is defined. 357 constexpr operator Writer&(); 358 constexpr operator const Writer&() const; 359 360 // Indicates if the on_next and unary on_completed callbacks are internal 361 // wrappers that decode the raw proto before invoking the user's callback. If 362 // they are, the lock must be held when they are invoked. hold_lock_while_invoking_callback_with_payload()363 bool hold_lock_while_invoking_callback_with_payload() const 364 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 365 return properties_.callback_proto_type() == kProtoStruct; 366 } 367 368 // Decodes a raw protobuf into a proto struct (pwpb or Nanopb) and invokes the 369 // pwpb or Nanopb version of the on_next callback. 370 // 371 // This must ONLY be called from derived classes the wrap the on_next 372 // callback. These classes MUST indicate that they call calls in their 373 // constructor. 374 template <typename Decoder, typename ProtoStruct> DecodeToStructAndInvokeOnNext(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &)> & proto_on_next)375 void DecodeToStructAndInvokeOnNext( 376 ConstByteSpan payload, 377 const Decoder& decoder, 378 Function<void(const ProtoStruct&)>& proto_on_next) 379 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 380 if (proto_on_next == nullptr) { 381 return; 382 } 383 384 ProtoStruct proto_struct{}; 385 386 if (!decoder.Decode(payload, proto_struct).ok()) { 387 CloseAndMarkForCleanup(Status::DataLoss()); 388 return; 389 } 390 391 const uint32_t original_id = id(); 392 auto proto_on_next_local = std::move(proto_on_next); 393 394 rpc_lock().unlock(); 395 proto_on_next_local(proto_struct); 396 rpc_lock().lock(); 397 398 // Restore the original callback if the original call is still active and 399 // the callback has not been replaced. 400 // NOLINTNEXTLINE(bugprone-use-after-move) 401 if (active_locked() && id() == original_id && proto_on_next == nullptr) { 402 proto_on_next = std::move(proto_on_next_local); 403 } 404 } 405 406 // The call is already unregistered and closed. 407 template <typename Decoder, typename ProtoStruct> DecodeToStructAndInvokeOnCompleted(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &,Status)> & proto_on_completed,Status status)408 void DecodeToStructAndInvokeOnCompleted( 409 ConstByteSpan payload, 410 const Decoder& decoder, 411 Function<void(const ProtoStruct&, Status)>& proto_on_completed, 412 Status status) PW_UNLOCK_FUNCTION(rpc_lock()) { 413 // Always move proto_on_completed so it goes out of scope in this function. 414 auto proto_on_completed_local = std::move(proto_on_completed); 415 416 // Move on_error in case an error occurs. 417 auto on_error_local = std::move(on_error_); 418 419 // Release the lock before decoding, since decoder is a global. 420 rpc_lock().unlock(); 421 422 if (proto_on_completed_local == nullptr) { 423 return; 424 } 425 426 ProtoStruct proto_struct{}; 427 if (decoder.Decode(payload, proto_struct).ok()) { 428 proto_on_completed_local(proto_struct, status); 429 } else if (on_error_local != nullptr) { 430 on_error_local(Status::DataLoss()); 431 } 432 } 433 434 // An active call cannot be moved if its callbacks are running. This function 435 // must be called on the call being moved before updating any state. 436 static void WaitUntilReadyForMove(Call& destination, Call& source) 437 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 438 439 private: 440 enum State : uint8_t { 441 kActive = 0b01, 442 kClientStreamActive = 0b10, 443 }; 444 445 // Common constructor for server & client calls. 446 Call(LockedEndpoint& endpoint, 447 uint32_t id, 448 uint32_t channel_id, 449 uint32_t service_id, 450 uint32_t method_id, 451 CallProperties properties); 452 453 Packet MakePacket(pwpb::PacketType type, 454 ConstByteSpan payload, 455 Status status = OkStatus()) const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock ())456 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 457 return Packet(type, 458 channel_id_locked(), 459 service_id(), 460 method_id(), 461 id_, 462 payload, 463 status); 464 } 465 466 // Marks a call object closed without doing anything else. The call is not 467 // removed from the calls list and no callbacks are called. MarkClosed()468 void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 469 channel_id_ = Channel::kUnassignedChannelId; 470 id_ = 0; 471 state_ = 0; 472 } 473 474 // Calls the on_error callback without closing the RPC. This is used when the 475 // call has already completed. 476 void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock()); 477 478 // If required, removes this call from the endpoint's to_cleanup_ list and 479 // calls CleanUp(). Returns true if cleanup was required, which means the lock 480 // was released. 481 bool CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 482 483 // Sends a payload with the specified type. The payload may either be in a 484 // previously acquired buffer or in a standalone buffer. 485 // 486 // Returns FAILED_PRECONDITION if the call is not active(). 487 Status SendPacket(pwpb::PacketType type, 488 ConstByteSpan payload, 489 Status status = OkStatus()) 490 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 491 492 Status CloseAndSendFinalPacketLocked(pwpb::PacketType type, 493 ConstByteSpan response, 494 Status status) 495 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 496 CallbacksAreRunning()497 bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 498 return callbacks_executing_ != 0u; 499 } 500 501 Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock()); 502 uint32_t channel_id_ PW_GUARDED_BY(rpc_lock()); 503 uint32_t id_ PW_GUARDED_BY(rpc_lock()); 504 uint32_t service_id_ PW_GUARDED_BY(rpc_lock()); 505 uint32_t method_id_ PW_GUARDED_BY(rpc_lock()); 506 507 // State of call and client stream. 508 // 509 // bit 0: call is active 510 // bit 1: client stream is active 511 // 512 uint8_t state_ PW_GUARDED_BY(rpc_lock()); 513 514 // If non-OK, indicates that the call was closed and needs to have its 515 // on_error called with this Status code. Uses a uint8_t for compactness. 516 uint8_t awaiting_cleanup_ PW_GUARDED_BY(rpc_lock()); 517 518 // Tracks how many of this call's callbacks are running. Must be 0 for the 519 // call to be destroyed. 520 uint8_t callbacks_executing_ PW_GUARDED_BY(rpc_lock()); 521 522 CallProperties properties_ PW_GUARDED_BY(rpc_lock()); 523 524 // Called when the RPC is terminated due to an error. 525 Function<void(Status error)> on_error_ PW_GUARDED_BY(rpc_lock()); 526 527 // Called when a request is received. Only used for RPCs with client streams. 528 // The raw payload buffer is passed to the callback. 529 Function<void(ConstByteSpan payload)> on_next_ PW_GUARDED_BY(rpc_lock()); 530 }; 531 532 } // namespace internal 533 } // namespace pw::rpc 534