1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cassert> 17 #include <cstddef> 18 #include <span> 19 #include <utility> 20 21 #include "pw_containers/intrusive_list.h" 22 #include "pw_function/function.h" 23 #include "pw_rpc/internal/call_context.h" 24 #include "pw_rpc/internal/channel.h" 25 #include "pw_rpc/internal/lock.h" 26 #include "pw_rpc/internal/method.h" 27 #include "pw_rpc/internal/packet.h" 28 #include "pw_rpc/method_type.h" 29 #include "pw_rpc/service.h" 30 #include "pw_status/status.h" 31 #include "pw_sync/lock_annotations.h" 32 33 namespace pw::rpc { 34 35 class Writer; 36 37 namespace internal { 38 39 class Endpoint; 40 class Packet; 41 42 // Internal RPC Call class. The Call is used to respond to any type of RPC. 43 // Public classes like ServerWriters inherit from it with private inheritance 44 // and provide a public API for their use case. The Call's public API is used by 45 // the Server and Client classes. 46 // 47 // Private inheritance is used in place of composition or more complex 48 // inheritance hierarchy so that these objects all inherit from a common 49 // IntrusiveList::Item object. Private inheritance also gives the derived classs 50 // full control over their interfaces. 51 class Call : public IntrusiveList<Call>::Item { 52 public: 53 Call(const Call&) = delete; 54 55 // Move support is provided to derived classes through the MoveFrom function. 56 Call(Call&&) = delete; 57 58 Call& operator=(const Call&) = delete; 59 Call& operator=(Call&&) = delete; 60 61 // True if the Call is active and ready to send responses. active()62 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) { 63 LockGuard lock(rpc_lock()); 64 return active_locked(); 65 } 66 active_locked()67 [[nodiscard]] bool active_locked() const 68 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 69 return rpc_state_ == kActive; 70 } 71 id()72 uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; } 73 channel_id()74 uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) { 75 LockGuard lock(rpc_lock()); 76 return channel_id_locked(); 77 } channel_id_locked()78 uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 79 return channel_id_; 80 } service_id()81 uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 82 return service_id_; 83 } method_id()84 uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 85 return method_id_; 86 } 87 88 // Closes the Call and sends a RESPONSE packet, if it is active. Returns the 89 // status from sending the packet, or FAILED_PRECONDITION if the Call is not 90 // active. CloseAndSendResponse(ConstByteSpan response,Status status)91 Status CloseAndSendResponse(ConstByteSpan response, Status status) 92 PW_LOCKS_EXCLUDED(rpc_lock()) { 93 LockGuard lock(rpc_lock()); 94 return CloseAndSendResponseLocked(response, status); 95 } 96 CloseAndSendResponseLocked(ConstByteSpan response,Status status)97 Status CloseAndSendResponseLocked(ConstByteSpan response, Status status) 98 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 99 return CloseAndSendFinalPacketLocked( 100 PacketType::RESPONSE, response, status); 101 } 102 CloseAndSendResponse(Status status)103 Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) { 104 return CloseAndSendResponse({}, status); 105 } 106 CloseAndSendServerErrorLocked(Status error)107 Status CloseAndSendServerErrorLocked(Status error) 108 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 109 return CloseAndSendFinalPacketLocked(PacketType::SERVER_ERROR, {}, error); 110 } 111 112 // Public call that ends the client stream for a client call. CloseClientStream()113 Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock()) { 114 LockGuard lock(rpc_lock()); 115 return CloseClientStreamLocked(); 116 } 117 118 // Internal call that closes the client stream. CloseClientStreamLocked()119 Status CloseClientStreamLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 120 client_stream_state_ = kClientStreamInactive; 121 return SendPacket(PacketType::CLIENT_STREAM_END, {}, {}); 122 } 123 124 // Sends a payload in either a server or client stream packet. Write(ConstByteSpan payload)125 Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) { 126 LockGuard lock(rpc_lock()); 127 return WriteLocked(payload); 128 } 129 130 Status WriteLocked(ConstByteSpan payload) 131 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 132 133 // Sends the initial request for a client call. If the request fails, the call 134 // is closed. SendInitialClientRequest(ConstByteSpan payload)135 void SendInitialClientRequest(ConstByteSpan payload) 136 PW_UNLOCK_FUNCTION(rpc_lock()) { 137 // TODO(pwbug/597): Ensure the call object is locked before releasing the 138 // RPC mutex. 139 if (const Status status = SendPacket(PacketType::REQUEST, payload); 140 !status.ok()) { 141 HandleError(status); 142 } else { 143 rpc_lock().unlock(); 144 } 145 } 146 147 // Whenever a payload arrives (in a server/client stream or in a response), 148 // call the on_next_ callback. 149 // Precondition: rpc_lock() must be held. HandlePayload(ConstByteSpan message)150 void HandlePayload(ConstByteSpan message) const 151 PW_UNLOCK_FUNCTION(rpc_lock()) { 152 const bool invoke = on_next_ != nullptr; 153 // TODO(pwbug/597): Ensure on_next_ is properly guarded. 154 rpc_lock().unlock(); 155 156 if (invoke) { 157 on_next_(message); 158 } 159 } 160 161 // Handles an error condition for the call. This closes the call and calls the 162 // on_error callback, if set. HandleError(Status status)163 void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) { 164 UnregisterAndMarkClosed(); 165 CallOnError(status); 166 } 167 168 // Aborts the RPC because its channel was closed. Does NOT unregister the 169 // call! The calls are removed when iterating over the list in the endpoint. HandleChannelClose()170 void HandleChannelClose() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 171 // Locking here is problematic because CallOnError releases rpc_lock(). 172 // 173 // pwbug/597 must be addressed before the locking here can be cleaned up. 174 MarkClosed(); 175 176 CallOnError(Status::Aborted()); 177 178 // Re-lock rpc_lock(). 179 rpc_lock().lock(); 180 } 181 has_client_stream()182 bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 183 return HasClientStream(type_); 184 } 185 has_server_stream()186 bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 187 return HasServerStream(type_); 188 } 189 client_stream_open()190 bool client_stream_open() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 191 return client_stream_state_ == kClientStreamActive; 192 } 193 194 // Keep this public so the Nanopb implementation can set it from a helper 195 // function. set_on_next(Function<void (ConstByteSpan)> && on_next)196 void set_on_next(Function<void(ConstByteSpan)>&& on_next) 197 PW_LOCKS_EXCLUDED(rpc_lock()) { 198 LockGuard lock(rpc_lock()); 199 set_on_next_locked(std::move(on_next)); 200 } 201 202 protected: 203 // Creates an inactive Call. Call()204 constexpr Call() 205 : endpoint_{}, 206 channel_id_{}, 207 id_{}, 208 service_id_{}, 209 method_id_{}, 210 rpc_state_{}, 211 type_{}, 212 call_type_{}, 213 client_stream_state_ {} 214 {} 215 216 // Creates an active server-side Call. Call(const CallContext & context,MethodType type)217 Call(const CallContext& context, MethodType type) 218 : Call(context.server(), 219 context.call_id(), 220 context.channel_id(), 221 context.service().id(), 222 context.method().id(), 223 type, 224 kServerCall) {} 225 226 // Creates an active client-side Call. 227 Call(Endpoint& client, 228 uint32_t channel_id, 229 uint32_t service_id, 230 uint32_t method_id, 231 MethodType type); 232 233 // This call must be in a closed state when this is called. 234 void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 235 endpoint()236 Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 237 return *endpoint_; 238 } 239 set_on_next_locked(Function<void (ConstByteSpan)> && on_next)240 void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next) 241 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 242 on_next_ = std::move(on_next); 243 } 244 set_on_error(Function<void (Status)> && on_error)245 void set_on_error(Function<void(Status)>&& on_error) 246 PW_LOCKS_EXCLUDED(rpc_lock()) { 247 LockGuard lock(rpc_lock()); 248 set_on_error_locked(std::move(on_error)); 249 } 250 set_on_error_locked(Function<void (Status)> && on_error)251 void set_on_error_locked(Function<void(Status)>&& on_error) 252 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 253 on_error_ = std::move(on_error); 254 } 255 256 // Calls the on_error callback without closing the RPC. This is used when the 257 // call has already completed. CallOnError(Status error)258 void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock()) { 259 const bool invoke = on_error_ != nullptr; 260 261 // TODO(pwbug/597): Ensure on_error_ is properly guarded. 262 rpc_lock().unlock(); 263 if (invoke) { 264 on_error_(error); 265 } 266 } 267 MarkClientStreamCompleted()268 void MarkClientStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 269 client_stream_state_ = kClientStreamInactive; 270 } 271 CloseAndSendResponseLocked(Status status)272 Status CloseAndSendResponseLocked(Status status) 273 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 274 return CloseAndSendFinalPacketLocked(PacketType::RESPONSE, {}, status); 275 } 276 277 // Cancels an RPC. For client calls only. Cancel()278 Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) { 279 LockGuard lock(rpc_lock()); 280 return CloseAndSendFinalPacketLocked( 281 PacketType::CLIENT_ERROR, {}, Status::Cancelled()); 282 } 283 284 // Unregisters the RPC from the endpoint & marks as closed. The call may be 285 // active or inactive when this is called. 286 void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 287 288 // Define conversions to the generic server/client RPC writer class. These 289 // functions are defined in pw_rpc/writer.h after the Writer class is defined. 290 constexpr operator Writer&(); 291 constexpr operator const Writer&() const; 292 293 private: 294 enum CallType : bool { kServerCall, kClientCall }; 295 296 // Common constructor for server & client calls. 297 Call(Endpoint& endpoint, 298 uint32_t id, 299 uint32_t channel_id, 300 uint32_t service_id, 301 uint32_t method_id, 302 MethodType type, 303 CallType call_type); 304 305 Packet MakePacket(PacketType type, 306 ConstByteSpan payload, 307 Status status = OkStatus()) const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock ())308 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 309 return Packet(type, 310 channel_id_locked(), 311 service_id(), 312 method_id(), 313 id_, 314 payload, 315 status); 316 } 317 MarkClosed()318 void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 319 channel_id_ = Channel::kUnassignedChannelId; 320 rpc_state_ = kInactive; 321 client_stream_state_ = kClientStreamInactive; 322 } 323 324 // Sends a payload with the specified type. The payload may either be in a 325 // previously acquired buffer or in a standalone buffer. 326 // 327 // Returns FAILED_PRECONDITION if the call is not active(). 328 Status SendPacket(PacketType type, 329 ConstByteSpan payload, 330 Status status = OkStatus()) 331 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 332 333 Status CloseAndSendFinalPacketLocked(PacketType type, 334 ConstByteSpan response, 335 Status status) 336 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 337 338 internal::Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock()); 339 uint32_t channel_id_ PW_GUARDED_BY(rpc_lock()); 340 uint32_t id_ PW_GUARDED_BY(rpc_lock()); 341 uint32_t service_id_ PW_GUARDED_BY(rpc_lock()); 342 uint32_t method_id_ PW_GUARDED_BY(rpc_lock()); 343 344 enum : bool { kInactive, kActive } rpc_state_ PW_GUARDED_BY(rpc_lock()); 345 MethodType type_ PW_GUARDED_BY(rpc_lock()); 346 CallType call_type_ PW_GUARDED_BY(rpc_lock()); 347 enum : bool { 348 kClientStreamInactive, 349 kClientStreamActive, 350 } client_stream_state_ PW_GUARDED_BY(rpc_lock()); 351 352 // Called when the RPC is terminated due to an error. 353 Function<void(Status error)> on_error_; 354 355 // Called when a request is received. Only used for RPCs with client streams. 356 // The raw payload buffer is passed to the callback. 357 Function<void(ConstByteSpan payload)> on_next_; 358 }; 359 360 } // namespace internal 361 } // namespace pw::rpc 362