1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 // This file defines the ServerReaderWriter, ServerReader, ServerWriter, and 16 // UnaryResponder classes for the pw_protobuf RPC interface. These classes are 17 // used for bidirectional, client, and server streaming, and unary RPCs. 18 #pragma once 19 20 #include "pw_bytes/span.h" 21 #include "pw_function/function.h" 22 #include "pw_rpc/channel.h" 23 #include "pw_rpc/internal/lock.h" 24 #include "pw_rpc/internal/method_info.h" 25 #include "pw_rpc/internal/method_lookup.h" 26 #include "pw_rpc/internal/server_call.h" 27 #include "pw_rpc/method_type.h" 28 #include "pw_rpc/pwpb/internal/common.h" 29 #include "pw_rpc/server.h" 30 31 namespace pw::rpc { 32 namespace internal { 33 34 // Forward declarations for internal classes needed in friend statements. 35 namespace test { 36 template <typename, typename, uint32_t> 37 class InvocationContext; 38 } // namespace test 39 40 class PwpbMethod; 41 42 // internal::PwpbServerCall extends internal::ServerCall by adding a method 43 // serializer/deserializer that is initialized based on the method context. 44 class PwpbServerCall : public ServerCall { 45 public: 46 // Allow construction using a call context and method type which creates 47 // a working server call. 48 PwpbServerCall(const LockedCallContext& context, MethodType type) 49 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 50 51 // Sends a unary response. 52 // Returns the following Status codes: 53 // 54 // OK - the response was successfully sent 55 // FAILED_PRECONDITION - the writer is closed 56 // INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf 57 // other errors - the ChannelOutput failed to send the packet; the error 58 // codes are determined by the ChannelOutput implementation 59 // 60 template <typename Response> 61 Status SendUnaryResponse(const Response& response, Status status = OkStatus()) PW_LOCKS_EXCLUDED(rpc_lock ())62 PW_LOCKS_EXCLUDED(rpc_lock()) { 63 RpcLockGuard lock; 64 if (!active_locked()) { 65 return Status::FailedPrecondition(); 66 } 67 68 Result<ByteSpan> buffer = 69 EncodeToPayloadBuffer(response, serde_->response()); 70 if (!buffer.ok()) { 71 return CloseAndSendServerErrorLocked(Status::Internal()); 72 } 73 74 return CloseAndSendResponseLocked(*buffer, status); 75 } 76 77 protected: 78 // Derived classes allow default construction so that users can declare a 79 // variable into which to move server reader/writers from RPC calls. PwpbServerCall()80 constexpr PwpbServerCall() : serde_(nullptr) {} 81 82 // Give access to the serializer/deserializer object for converting requests 83 // and responses between the wire format and pw_protobuf structs. serde()84 const PwpbMethodSerde& serde() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 85 return *serde_; 86 } 87 88 // Allow derived classes to be constructed moving another instance. PW_LOCKS_EXCLUDED(rpc_lock ())89 PwpbServerCall(PwpbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) { 90 *this = std::move(other); 91 } 92 93 // Allow derived classes to use move assignment from another instance. 94 PwpbServerCall& operator=(PwpbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())95 PW_LOCKS_EXCLUDED(rpc_lock()) { 96 RpcLockGuard lock; 97 MovePwpbServerCallFrom(other); 98 return *this; 99 } 100 101 // Implement moving by copying the serde pointer. MovePwpbServerCallFrom(PwpbServerCall & other)102 void MovePwpbServerCallFrom(PwpbServerCall& other) 103 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 104 MoveServerCallFrom(other); 105 serde_ = other.serde_; 106 } 107 108 // Sends a streamed response. 109 // Returns the following Status codes: 110 // 111 // OK - the response was successfully sent 112 // FAILED_PRECONDITION - the writer is closed 113 // INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf 114 // other errors - the ChannelOutput failed to send the packet; the error 115 // codes are determined by the ChannelOutput implementation 116 // 117 template <typename Response> SendStreamResponse(const Response & response)118 Status SendStreamResponse(const Response& response) 119 PW_LOCKS_EXCLUDED(rpc_lock()) { 120 RpcLockGuard lock; 121 return PwpbSendStream(*this, response, serde_); 122 } 123 124 private: 125 const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock()); 126 }; 127 128 // internal::BasePwpbServerReader extends internal::PwpbServerCall further by 129 // adding an on_next callback templated on the request type. 130 template <typename Request> 131 class BasePwpbServerReader : public PwpbServerCall { 132 public: BasePwpbServerReader(const LockedCallContext & context,MethodType type)133 BasePwpbServerReader(const LockedCallContext& context, MethodType type) 134 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) 135 : PwpbServerCall(context, type) {} 136 137 protected: 138 // Allow default construction so that users can declare a variable into 139 // which to move server reader/writers from RPC calls. 140 constexpr BasePwpbServerReader() = default; 141 142 // Allow derived classes to be constructed moving another instance. 143 BasePwpbServerReader(BasePwpbServerReader&& other) PW_LOCKS_EXCLUDED(rpc_lock ())144 PW_LOCKS_EXCLUDED(rpc_lock()) { 145 *this = std::move(other); 146 } 147 148 // Allow derived classes to use move assignment from another instance. 149 BasePwpbServerReader& operator=(BasePwpbServerReader&& other) PW_LOCKS_EXCLUDED(rpc_lock ())150 PW_LOCKS_EXCLUDED(rpc_lock()) { 151 RpcLockGuard lock; 152 MoveBasePwpbServerReaderFrom(other); 153 return *this; 154 } 155 156 // Implement moving by copying the on_next function. MoveBasePwpbServerReaderFrom(BasePwpbServerReader & other)157 void MoveBasePwpbServerReaderFrom(BasePwpbServerReader& other) 158 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 159 MovePwpbServerCallFrom(other); 160 set_pwpb_on_next_locked(std::move(other.pwpb_on_next_)); 161 } 162 set_on_next(Function<void (const Request & request)> && on_next)163 void set_on_next(Function<void(const Request& request)>&& on_next) 164 PW_LOCKS_EXCLUDED(rpc_lock()) { 165 RpcLockGuard lock; 166 set_pwpb_on_next_locked(std::move(on_next)); 167 } 168 169 private: set_pwpb_on_next_locked(Function<void (const Request & request)> && on_next)170 void set_pwpb_on_next_locked(Function<void(const Request& request)>&& on_next) 171 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 172 pwpb_on_next_ = std::move(on_next); 173 174 Call::set_on_next_locked( 175 [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS { 176 DecodeToStructAndInvokeOnNext( 177 payload, serde().request(), pwpb_on_next_); 178 }); 179 } 180 181 Function<void(const Request&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock()); 182 }; 183 184 } // namespace internal 185 186 // The PwpbServerReaderWriter is used to send and receive typed messages in a 187 // pw_protobuf bidirectional streaming RPC. 188 // 189 // These classes use private inheritance to hide the internal::Call API while 190 // allow direct use of its public and protected functions. 191 template <typename Request, typename Response> 192 class PwpbServerReaderWriter : private internal::BasePwpbServerReader<Request> { 193 public: 194 // Creates a PwpbServerReaderWriter that is ready to send responses for a 195 // particular RPC. This can be used for testing or to send responses to an RPC 196 // that has not been started by a client. 197 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)198 [[nodiscard]] static PwpbServerReaderWriter Open(Server& server, 199 uint32_t channel_id, 200 ServiceImpl& service) 201 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 202 using MethodInfo = internal::MethodInfo<kMethod>; 203 static_assert(std::is_same_v<Request, typename MethodInfo::Request>, 204 "The request type of a PwpbServerReaderWriter must match " 205 "the method."); 206 static_assert(std::is_same_v<Response, typename MethodInfo::Response>, 207 "The response type of a PwpbServerReaderWriter must match " 208 "the method."); 209 return server.OpenCall<PwpbServerReaderWriter, 210 kMethod, 211 MethodType::kBidirectionalStreaming>( 212 channel_id, 213 service, 214 internal::MethodLookup::GetPwpbMethod<ServiceImpl, 215 MethodInfo::kMethodId>()); 216 } 217 218 // Allow default construction so that users can declare a variable into 219 // which to move server reader/writers from RPC calls. 220 constexpr PwpbServerReaderWriter() = default; 221 222 PwpbServerReaderWriter(PwpbServerReaderWriter&&) = default; 223 PwpbServerReaderWriter& operator=(PwpbServerReaderWriter&&) = default; 224 225 using internal::Call::active; 226 using internal::Call::channel_id; 227 228 // Functions for setting RPC event callbacks. 229 using internal::Call::set_on_error; 230 using internal::BasePwpbServerReader<Request>::set_on_next; 231 using internal::ServerCall::set_on_client_stream_end; 232 233 // Writes a response. Returns the following Status codes: 234 // 235 // OK - the response was successfully sent 236 // FAILED_PRECONDITION - the writer is closed 237 // INTERNAL - pw_rpc was unable to encode the pw_protobuf message 238 // other errors - the ChannelOutput failed to send the packet; the error 239 // codes are determined by the ChannelOutput implementation 240 // Write(const Response & response)241 Status Write(const Response& response) { 242 return internal::PwpbServerCall::SendStreamResponse(response); 243 } 244 245 Status Finish(Status status = OkStatus()) { 246 return internal::Call::CloseAndSendResponse(status); 247 } 248 249 private: 250 friend class internal::PwpbMethod; 251 friend class Server; 252 253 template <typename, typename, uint32_t> 254 friend class internal::test::InvocationContext; 255 256 PwpbServerReaderWriter(const internal::LockedCallContext& context, 257 MethodType type = MethodType::kBidirectionalStreaming) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())258 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 259 : internal::BasePwpbServerReader<Request>(context, type) {} 260 }; 261 262 // The PwpbServerReader is used to receive typed messages and send a typed 263 // response in a pw_protobuf client streaming RPC. 264 // 265 // These classes use private inheritance to hide the internal::Call API while 266 // allow direct use of its public and protected functions. 267 template <typename Request, typename Response> 268 class PwpbServerReader : private internal::BasePwpbServerReader<Request> { 269 public: 270 // Creates a PwpbServerReader that is ready to send a response to a particular 271 // RPC. This can be used for testing or to finish an RPC that has not been 272 // started by the client. 273 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)274 [[nodiscard]] static PwpbServerReader Open(Server& server, 275 uint32_t channel_id, 276 ServiceImpl& service) 277 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 278 using MethodInfo = internal::MethodInfo<kMethod>; 279 static_assert(std::is_same_v<Request, typename MethodInfo::Request>, 280 "The request type of a PwpbServerReader must match " 281 "the method."); 282 static_assert(std::is_same_v<Response, typename MethodInfo::Response>, 283 "The response type of a PwpbServerReader must match " 284 "the method."); 285 return server 286 .OpenCall<PwpbServerReader, kMethod, MethodType::kClientStreaming>( 287 channel_id, 288 service, 289 internal::MethodLookup::GetPwpbMethod<ServiceImpl, 290 MethodInfo::kMethodId>()); 291 } 292 293 // Allow default construction so that users can declare a variable into 294 // which to move server reader/writers from RPC calls. 295 constexpr PwpbServerReader() = default; 296 297 PwpbServerReader(PwpbServerReader&&) = default; 298 PwpbServerReader& operator=(PwpbServerReader&&) = default; 299 300 using internal::Call::active; 301 using internal::Call::channel_id; 302 303 // Functions for setting RPC event callbacks. 304 using internal::Call::set_on_error; 305 using internal::BasePwpbServerReader<Request>::set_on_next; 306 using internal::ServerCall::set_on_client_stream_end; 307 308 // Sends the response. Returns the following Status codes: 309 // 310 // OK - the response was successfully sent 311 // FAILED_PRECONDITION - the writer is closed 312 // INTERNAL - pw_rpc was unable to encode the pw_protobuf message 313 // other errors - the ChannelOutput failed to send the packet; the error 314 // codes are determined by the ChannelOutput implementation 315 // 316 Status Finish(const Response& response, Status status = OkStatus()) { 317 return internal::PwpbServerCall::SendUnaryResponse(response, status); 318 } 319 320 private: 321 friend class internal::PwpbMethod; 322 friend class Server; 323 324 template <typename, typename, uint32_t> 325 friend class internal::test::InvocationContext; 326 327 PwpbServerReader(const internal::LockedCallContext& context) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())328 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 329 : internal::BasePwpbServerReader<Request>(context, 330 MethodType::kClientStreaming) {} 331 }; 332 333 // The PwpbServerWriter is used to send typed responses in a pw_protobuf server 334 // streaming RPC. 335 // 336 // These classes use private inheritance to hide the internal::Call API while 337 // allow direct use of its public and protected functions. 338 template <typename Response> 339 class PwpbServerWriter : private internal::PwpbServerCall { 340 public: 341 // Creates a PwpbServerWriter that is ready to send responses for a particular 342 // RPC. This can be used for testing or to send responses to an RPC that has 343 // not been started by a client. 344 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)345 [[nodiscard]] static PwpbServerWriter Open(Server& server, 346 uint32_t channel_id, 347 ServiceImpl& service) 348 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 349 using MethodInfo = internal::MethodInfo<kMethod>; 350 static_assert(std::is_same_v<Response, typename MethodInfo::Response>, 351 "The response type of a PwpbServerWriter must match " 352 "the method."); 353 return server 354 .OpenCall<PwpbServerWriter, kMethod, MethodType::kServerStreaming>( 355 channel_id, 356 service, 357 internal::MethodLookup::GetPwpbMethod<ServiceImpl, 358 MethodInfo::kMethodId>()); 359 } 360 361 // Allow default construction so that users can declare a variable into 362 // which to move server reader/writers from RPC calls. 363 constexpr PwpbServerWriter() = default; 364 365 PwpbServerWriter(PwpbServerWriter&&) = default; 366 PwpbServerWriter& operator=(PwpbServerWriter&&) = default; 367 368 using internal::Call::active; 369 using internal::Call::channel_id; 370 371 // Functions for setting RPC event callbacks. 372 using internal::Call::set_on_error; 373 using internal::ServerCall::set_on_client_stream_end; 374 375 // Writes a response. Returns the following Status codes: 376 // 377 // OK - the response was successfully sent 378 // FAILED_PRECONDITION - the writer is closed 379 // INTERNAL - pw_rpc was unable to encode the pw_protobuf message 380 // other errors - the ChannelOutput failed to send the packet; the error 381 // codes are determined by the ChannelOutput implementation 382 // Write(const Response & response)383 Status Write(const Response& response) { 384 return internal::PwpbServerCall::SendStreamResponse(response); 385 } 386 387 Status Finish(Status status = OkStatus()) { 388 return internal::Call::CloseAndSendResponse(status); 389 } 390 391 private: 392 friend class internal::PwpbMethod; 393 friend class Server; 394 395 template <typename, typename, uint32_t> 396 friend class internal::test::InvocationContext; 397 398 PwpbServerWriter(const internal::LockedCallContext& context) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())399 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 400 : internal::PwpbServerCall(context, MethodType::kServerStreaming) {} 401 }; 402 403 // The PwpbUnaryResponder is used to send a typed response in a pw_protobuf 404 // unary RPC. 405 // 406 // These classes use private inheritance to hide the internal::Call API while 407 // allow direct use of its public and protected functions. 408 template <typename Response> 409 class PwpbUnaryResponder : private internal::PwpbServerCall { 410 public: 411 // Creates a PwpbUnaryResponder that is ready to send responses for a 412 // particular RPC. This can be used for testing or to send responses to an 413 // RPC that has not been started by a client. 414 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)415 [[nodiscard]] static PwpbUnaryResponder Open(Server& server, 416 uint32_t channel_id, 417 ServiceImpl& service) 418 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 419 using MethodInfo = internal::MethodInfo<kMethod>; 420 static_assert(std::is_same_v<Response, typename MethodInfo::Response>, 421 "The response type of a PwpbUnaryResponder must match " 422 "the method."); 423 return server 424 .OpenCall<PwpbUnaryResponder<Response>, kMethod, MethodType::kUnary>( 425 channel_id, 426 service, 427 internal::MethodLookup::GetPwpbMethod<ServiceImpl, 428 MethodInfo::kMethodId>()); 429 } 430 431 // Allow default construction so that users can declare a variable into 432 // which to move server reader/writers from RPC calls. 433 constexpr PwpbUnaryResponder() = default; 434 435 PwpbUnaryResponder(PwpbUnaryResponder&&) = default; 436 PwpbUnaryResponder& operator=(PwpbUnaryResponder&&) = default; 437 438 using internal::ServerCall::active; 439 using internal::ServerCall::channel_id; 440 441 // Functions for setting RPC event callbacks. 442 using internal::Call::set_on_error; 443 using internal::ServerCall::set_on_client_stream_end; 444 445 // Sends the response. Returns the following Status codes: 446 // 447 // OK - the response was successfully sent 448 // FAILED_PRECONDITION - the writer is closed 449 // INTERNAL - pw_rpc was unable to encode the pw_protobuf message 450 // other errors - the ChannelOutput failed to send the packet; the error 451 // codes are determined by the ChannelOutput implementation 452 // 453 Status Finish(const Response& response, Status status = OkStatus()) { 454 return internal::PwpbServerCall::SendUnaryResponse(response, status); 455 } 456 457 private: 458 friend class internal::PwpbMethod; 459 friend class Server; 460 461 template <typename, typename, uint32_t> 462 friend class internal::test::InvocationContext; 463 464 PwpbUnaryResponder(const internal::LockedCallContext& context) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())465 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 466 : internal::PwpbServerCall(context, MethodType::kUnary) {} 467 }; 468 469 } // namespace pw::rpc 470