1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 // This file defines the ClientReaderWriter, ClientReader, ClientWriter, 16 // and UnaryReceiver classes for the pw_protobuf RPC interface. These classes 17 // are used for bidirectional, client, and server streaming, and unary RPCs. 18 #pragma once 19 20 #include "pw_bytes/span.h" 21 #include "pw_function/function.h" 22 #include "pw_rpc/channel.h" 23 #include "pw_rpc/internal/client_call.h" 24 #include "pw_rpc/pwpb/internal/common.h" 25 26 namespace pw::rpc { 27 namespace internal { 28 29 // internal::PwpbUnaryResponseClientCall extends 30 // internal::UnaryResponseClientCall by adding a method serializer/deserializer 31 // passed in to Start(), typed request messages to the Start() call, and an 32 // on_completed callback templated on the response type. 33 template <typename Response> 34 class PwpbUnaryResponseClientCall : public UnaryResponseClientCall { 35 public: 36 // Start() can be called with zero or one request objects. 37 template <typename CallType, typename... Request> Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde,Function<void (const Response &,Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)38 static CallType Start(Endpoint& client, 39 uint32_t channel_id, 40 uint32_t service_id, 41 uint32_t method_id, 42 const PwpbMethodSerde& serde, 43 Function<void(const Response&, Status)>&& on_completed, 44 Function<void(Status)>&& on_error, 45 const Request&... request) 46 PW_LOCKS_EXCLUDED(rpc_lock()) { 47 rpc_lock().lock(); 48 CallType call( 49 client.ClaimLocked(), channel_id, service_id, method_id, serde); 50 51 call.set_pwpb_on_completed_locked(std::move(on_completed)); 52 call.set_on_error_locked(std::move(on_error)); 53 54 if constexpr (sizeof...(Request) == 0u) { 55 call.SendInitialClientRequest({}); 56 } else { 57 PwpbSendInitialRequest(call, serde.request(), request...); 58 } 59 60 client.CleanUpCalls(); 61 return call; 62 } 63 64 protected: 65 // Derived classes allow default construction so that users can declare a 66 // variable into which to move client reader/writers from RPC calls. 67 constexpr PwpbUnaryResponseClientCall() = default; 68 PwpbUnaryResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const PwpbMethodSerde & serde)69 PwpbUnaryResponseClientCall(LockedEndpoint& client, 70 uint32_t channel_id, 71 uint32_t service_id, 72 uint32_t method_id, 73 MethodType type, 74 const PwpbMethodSerde& serde) 75 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) 76 : UnaryResponseClientCall( 77 client, channel_id, service_id, method_id, StructCallProps(type)), 78 serde_(&serde) {} 79 80 // Allow derived classes to be constructed moving another instance. 81 PwpbUnaryResponseClientCall(PwpbUnaryResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())82 PW_LOCKS_EXCLUDED(rpc_lock()) { 83 *this = std::move(other); 84 } 85 86 // Allow derived classes to use move assignment from another instance. 87 PwpbUnaryResponseClientCall& operator=(PwpbUnaryResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())88 PW_LOCKS_EXCLUDED(rpc_lock()) { 89 RpcLockGuard lock; 90 MovePwpbUnaryResponseClientCallFrom(other); 91 return *this; 92 } 93 94 // Implement moving by copying the serde pointer and on_completed function. MovePwpbUnaryResponseClientCallFrom(PwpbUnaryResponseClientCall & other)95 void MovePwpbUnaryResponseClientCallFrom(PwpbUnaryResponseClientCall& other) 96 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 97 MoveUnaryResponseClientCallFrom(other); 98 serde_ = other.serde_; 99 set_pwpb_on_completed_locked(std::move(other.pwpb_on_completed_)); 100 } 101 set_on_completed(Function<void (const Response & response,Status)> && on_completed)102 void set_on_completed( 103 Function<void(const Response& response, Status)>&& on_completed) 104 PW_LOCKS_EXCLUDED(rpc_lock()) { 105 RpcLockGuard lock; 106 set_pwpb_on_completed_locked(std::move(on_completed)); 107 } 108 109 // Sends a streamed request. 110 // Returns the following Status codes: 111 // 112 // OK - the request was successfully sent 113 // FAILED_PRECONDITION - the writer is closed 114 // INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf 115 // other errors - the ChannelOutput failed to send the packet; the error 116 // codes are determined by the ChannelOutput implementation 117 // 118 template <typename Request> SendStreamRequest(const Request & request)119 Status SendStreamRequest(const Request& request) 120 PW_LOCKS_EXCLUDED(rpc_lock()) { 121 RpcLockGuard lock; 122 return PwpbSendStream(*this, request, serde_); 123 } 124 125 private: set_pwpb_on_completed_locked(Function<void (const Response & response,Status)> && on_completed)126 void set_pwpb_on_completed_locked( 127 Function<void(const Response& response, Status)>&& on_completed) 128 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 129 pwpb_on_completed_ = std::move(on_completed); 130 131 UnaryResponseClientCall::set_on_completed_locked( 132 [this](ConstByteSpan payload, Status status) 133 PW_NO_LOCK_SAFETY_ANALYSIS { 134 DecodeToStructAndInvokeOnCompleted( 135 payload, serde_->response(), pwpb_on_completed_, status); 136 }); 137 } 138 139 const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock()); 140 Function<void(const Response&, Status)> pwpb_on_completed_ 141 PW_GUARDED_BY(rpc_lock()); 142 }; 143 144 // internal::PwpbStreamResponseClientCall extends 145 // internal::StreamResponseClientCall by adding a method serializer/deserializer 146 // passed in to Start(), typed request messages to the Start() call, and an 147 // on_next callback templated on the response type. 148 template <typename Response> 149 class PwpbStreamResponseClientCall : public StreamResponseClientCall { 150 public: 151 // Start() can be called with zero or one request objects. 152 template <typename CallType, typename... Request> Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde,Function<void (const Response &)> && on_next,Function<void (Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)153 static CallType Start(Endpoint& client, 154 uint32_t channel_id, 155 uint32_t service_id, 156 uint32_t method_id, 157 const PwpbMethodSerde& serde, 158 Function<void(const Response&)>&& on_next, 159 Function<void(Status)>&& on_completed, 160 Function<void(Status)>&& on_error, 161 const Request&... request) 162 PW_LOCKS_EXCLUDED(rpc_lock()) { 163 rpc_lock().lock(); 164 CallType call( 165 client.ClaimLocked(), channel_id, service_id, method_id, serde); 166 167 call.set_pwpb_on_next_locked(std::move(on_next)); 168 call.set_on_completed_locked(std::move(on_completed)); 169 call.set_on_error_locked(std::move(on_error)); 170 171 if constexpr (sizeof...(Request) == 0u) { 172 call.SendInitialClientRequest({}); 173 } else { 174 PwpbSendInitialRequest(call, serde.request(), request...); 175 } 176 client.CleanUpCalls(); 177 return call; 178 } 179 180 protected: 181 // Derived classes allow default construction so that users can declare a 182 // variable into which to move client reader/writers from RPC calls. 183 constexpr PwpbStreamResponseClientCall() = default; 184 PwpbStreamResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const PwpbMethodSerde & serde)185 PwpbStreamResponseClientCall(LockedEndpoint& client, 186 uint32_t channel_id, 187 uint32_t service_id, 188 uint32_t method_id, 189 MethodType type, 190 const PwpbMethodSerde& serde) 191 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) 192 : StreamResponseClientCall( 193 client, channel_id, service_id, method_id, StructCallProps(type)), 194 serde_(&serde) {} 195 196 // Allow derived classes to be constructed moving another instance. 197 PwpbStreamResponseClientCall(PwpbStreamResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())198 PW_LOCKS_EXCLUDED(rpc_lock()) { 199 *this = std::move(other); 200 } 201 202 // Allow derived classes to use move assignment from another instance. 203 PwpbStreamResponseClientCall& operator=(PwpbStreamResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())204 PW_LOCKS_EXCLUDED(rpc_lock()) { 205 RpcLockGuard lock; 206 MovePwpbStreamResponseClientCallFrom(other); 207 return *this; 208 } 209 210 // Implement moving by copying the serde pointer and on_next function. MovePwpbStreamResponseClientCallFrom(PwpbStreamResponseClientCall & other)211 void MovePwpbStreamResponseClientCallFrom(PwpbStreamResponseClientCall& other) 212 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 213 MoveStreamResponseClientCallFrom(other); 214 serde_ = other.serde_; 215 set_pwpb_on_next_locked(std::move(other.pwpb_on_next_)); 216 } 217 set_on_next(Function<void (const Response & response)> && on_next)218 void set_on_next(Function<void(const Response& response)>&& on_next) 219 PW_LOCKS_EXCLUDED(rpc_lock()) { 220 RpcLockGuard lock; 221 set_pwpb_on_next_locked(std::move(on_next)); 222 } 223 224 // Sends a streamed request. 225 // Returns the following Status codes: 226 // 227 // OK - the request was successfully sent 228 // FAILED_PRECONDITION - the writer is closed 229 // INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf 230 // other errors - the ChannelOutput failed to send the packet; the error 231 // codes are determined by the ChannelOutput implementation 232 // 233 template <typename Request> SendStreamRequest(const Request & request)234 Status SendStreamRequest(const Request& request) 235 PW_LOCKS_EXCLUDED(rpc_lock()) { 236 RpcLockGuard lock; 237 return PwpbSendStream(*this, request, serde_); 238 } 239 240 private: set_pwpb_on_next_locked(Function<void (const Response & response)> && on_next)241 void set_pwpb_on_next_locked( 242 Function<void(const Response& response)>&& on_next) 243 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 244 pwpb_on_next_ = std::move(on_next); 245 246 Call::set_on_next_locked( 247 [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS { 248 DecodeToStructAndInvokeOnNext( 249 payload, serde_->response(), pwpb_on_next_); 250 }); 251 } 252 253 const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock()); 254 Function<void(const Response&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock()); 255 }; 256 257 } // namespace internal 258 259 // The PwpbClientReaderWriter is used to send and receive typed messages in a 260 // pw_protobuf bidirectional streaming RPC. 261 // 262 // These classes use private inheritance to hide the internal::Call API while 263 // allow direct use of its public and protected functions. 264 template <typename Request, typename Response> 265 class PwpbClientReaderWriter 266 : private internal::PwpbStreamResponseClientCall<Response> { 267 public: 268 // Allow default construction so that users can declare a variable into 269 // which to move client reader/writers from RPC calls. 270 constexpr PwpbClientReaderWriter() = default; 271 272 PwpbClientReaderWriter(PwpbClientReaderWriter&&) = default; 273 PwpbClientReaderWriter& operator=(PwpbClientReaderWriter&&) = default; 274 275 using internal::Call::active; 276 using internal::Call::channel_id; 277 278 // Writes a request. Returns the following Status codes: 279 // 280 // OK - the request was successfully sent 281 // FAILED_PRECONDITION - the writer is closed 282 // INTERNAL - pw_rpc was unable to encode the pw_protobuf message 283 // other errors - the ChannelOutput failed to send the packet; the error 284 // codes are determined by the ChannelOutput implementation 285 // Write(const Request & request)286 Status Write(const Request& request) { 287 return internal::PwpbStreamResponseClientCall<Response>::SendStreamRequest( 288 request); 289 } 290 291 // Notifies the server that no further client stream messages will be sent. 292 using internal::ClientCall::CloseClientStream; 293 294 // Cancels this RPC. Closes the call locally and sends a CANCELLED error to 295 // the server. 296 using internal::Call::Cancel; 297 298 // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation 299 // packet. Future packets for this RPC are dropped, and the client sends a 300 // FAILED_PRECONDITION error in response because the call is not active. 301 using internal::ClientCall::Abandon; 302 303 // Functions for setting RPC event callbacks. 304 using internal::PwpbStreamResponseClientCall<Response>::set_on_next; 305 using internal::StreamResponseClientCall::set_on_completed; 306 using internal::StreamResponseClientCall::set_on_error; 307 308 protected: 309 friend class internal::PwpbStreamResponseClientCall<Response>; 310 PwpbClientReaderWriter(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)311 PwpbClientReaderWriter(internal::LockedEndpoint& client, 312 uint32_t channel_id_v, 313 uint32_t service_id, 314 uint32_t method_id, 315 const PwpbMethodSerde& serde) 316 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 317 : internal::PwpbStreamResponseClientCall<Response>( 318 client, 319 channel_id_v, 320 service_id, 321 method_id, 322 MethodType::kBidirectionalStreaming, 323 serde) {} 324 }; 325 326 // The PwpbClientReader is used to receive typed messages and send a typed 327 // response in a pw_protobuf client streaming RPC. 328 // 329 // These classes use private inheritance to hide the internal::Call API while 330 // allow direct use of its public and protected functions. 331 template <typename Response> 332 class PwpbClientReader 333 : private internal::PwpbStreamResponseClientCall<Response> { 334 public: 335 // Allow default construction so that users can declare a variable into 336 // which to move client reader/writers from RPC calls. 337 constexpr PwpbClientReader() = default; 338 339 PwpbClientReader(PwpbClientReader&&) = default; 340 PwpbClientReader& operator=(PwpbClientReader&&) = default; 341 342 using internal::StreamResponseClientCall::active; 343 using internal::StreamResponseClientCall::channel_id; 344 345 using internal::Call::Cancel; 346 using internal::ClientCall::Abandon; 347 348 // Functions for setting RPC event callbacks. 349 using internal::PwpbStreamResponseClientCall<Response>::set_on_next; 350 using internal::StreamResponseClientCall::set_on_completed; 351 using internal::StreamResponseClientCall::set_on_error; 352 353 private: 354 friend class internal::PwpbStreamResponseClientCall<Response>; 355 PwpbClientReader(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)356 PwpbClientReader(internal::LockedEndpoint& client, 357 uint32_t channel_id_v, 358 uint32_t service_id, 359 uint32_t method_id, 360 const PwpbMethodSerde& serde) 361 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 362 : internal::PwpbStreamResponseClientCall<Response>( 363 client, 364 channel_id_v, 365 service_id, 366 method_id, 367 MethodType::kServerStreaming, 368 serde) {} 369 }; 370 371 // The PwpbClientWriter is used to send typed responses in a pw_protobuf server 372 // streaming RPC. 373 // 374 // These classes use private inheritance to hide the internal::Call API while 375 // allow direct use of its public and protected functions. 376 template <typename Request, typename Response> 377 class PwpbClientWriter 378 : private internal::PwpbUnaryResponseClientCall<Response> { 379 public: 380 // Allow default construction so that users can declare a variable into 381 // which to move client reader/writers from RPC calls. 382 constexpr PwpbClientWriter() = default; 383 384 PwpbClientWriter(PwpbClientWriter&&) = default; 385 PwpbClientWriter& operator=(PwpbClientWriter&&) = default; 386 387 using internal::UnaryResponseClientCall::active; 388 using internal::UnaryResponseClientCall::channel_id; 389 390 // Writes a request. Returns the following Status codes: 391 // 392 // OK - the request was successfully sent 393 // FAILED_PRECONDITION - the writer is closed 394 // INTERNAL - pw_rpc was unable to encode the pw_protobuf message 395 // other errors - the ChannelOutput failed to send the packet; the error 396 // codes are determined by the ChannelOutput implementation 397 // Write(const Request & request)398 Status Write(const Request& request) { 399 return internal::PwpbUnaryResponseClientCall<Response>::SendStreamRequest( 400 request); 401 } 402 403 using internal::Call::Cancel; 404 using internal::Call::CloseClientStream; 405 using internal::ClientCall::Abandon; 406 407 // Functions for setting RPC event callbacks. 408 using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed; 409 using internal::UnaryResponseClientCall::set_on_error; 410 411 private: 412 friend class internal::PwpbUnaryResponseClientCall<Response>; 413 PwpbClientWriter(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)414 PwpbClientWriter(internal::LockedEndpoint& client, 415 uint32_t channel_id_v, 416 uint32_t service_id, 417 uint32_t method_id, 418 const PwpbMethodSerde& serde) 419 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 420 421 : internal::PwpbUnaryResponseClientCall<Response>( 422 client, 423 channel_id_v, 424 service_id, 425 method_id, 426 MethodType::kClientStreaming, 427 serde) {} 428 }; 429 430 // The PwpbUnaryReceiver is used to handle a typed response to a pw_protobuf 431 // unary RPC. 432 // 433 // These classes use private inheritance to hide the internal::Call API while 434 // allow direct use of its public and protected functions. 435 template <typename Response> 436 class PwpbUnaryReceiver 437 : private internal::PwpbUnaryResponseClientCall<Response> { 438 public: 439 // Allow default construction so that users can declare a variable into 440 // which to move client reader/writers from RPC calls. 441 constexpr PwpbUnaryReceiver() = default; 442 443 PwpbUnaryReceiver(PwpbUnaryReceiver&&) = default; 444 PwpbUnaryReceiver& operator=(PwpbUnaryReceiver&&) = default; 445 446 using internal::Call::active; 447 using internal::Call::channel_id; 448 449 // Functions for setting RPC event callbacks. 450 using internal::Call::set_on_error; 451 using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed; 452 453 using internal::Call::Cancel; 454 using internal::ClientCall::Abandon; 455 456 private: 457 friend class internal::PwpbUnaryResponseClientCall<Response>; 458 PwpbUnaryReceiver(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)459 PwpbUnaryReceiver(internal::LockedEndpoint& client, 460 uint32_t channel_id_v, 461 uint32_t service_id, 462 uint32_t method_id, 463 const PwpbMethodSerde& serde) 464 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 465 : internal::PwpbUnaryResponseClientCall<Response>(client, 466 channel_id_v, 467 service_id, 468 method_id, 469 MethodType::kUnary, 470 serde) {} 471 }; 472 473 } // namespace pw::rpc 474