1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 // This file defines the ServerReaderWriter, ServerReader, and ServerWriter 16 // classes for the Nanopb RPC interface. These classes are used for 17 // bidirectional, client, and server streaming RPCs. 18 #pragma once 19 20 #include "pw_bytes/span.h" 21 #include "pw_rpc/channel.h" 22 #include "pw_rpc/internal/lock.h" 23 #include "pw_rpc/internal/method_info.h" 24 #include "pw_rpc/internal/method_lookup.h" 25 #include "pw_rpc/internal/server_call.h" 26 #include "pw_rpc/nanopb/internal/common.h" 27 #include "pw_rpc/server.h" 28 29 namespace pw::rpc { 30 namespace internal { 31 32 // Forward declarations for internal classes needed in friend statements. 33 class NanopbMethod; 34 35 namespace test { 36 37 template <typename, typename, uint32_t> 38 class InvocationContext; 39 40 } // namespace test 41 42 class NanopbServerCall : public ServerCall { 43 public: NanopbServerCall()44 constexpr NanopbServerCall() : serde_(nullptr) {} 45 46 NanopbServerCall(const LockedCallContext& context, MethodType type) 47 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 48 SendUnaryResponse(const void * payload,Status status)49 Status SendUnaryResponse(const void* payload, Status status) 50 PW_LOCKS_EXCLUDED(rpc_lock()) { 51 return SendFinalResponse(*this, payload, status); 52 } 53 serde()54 const NanopbMethodSerde& serde() const 55 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 56 return *serde_; 57 } 58 59 protected: NanopbServerCall(NanopbServerCall && other)60 NanopbServerCall(NanopbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) { 61 *this = std::move(other); 62 } 63 64 NanopbServerCall& operator=(NanopbServerCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())65 PW_LOCKS_EXCLUDED(rpc_lock()) { 66 RpcLockGuard lock; 67 MoveNanopbServerCallFrom(other); 68 return *this; 69 } 70 MoveNanopbServerCallFrom(NanopbServerCall & other)71 void MoveNanopbServerCallFrom(NanopbServerCall& other) 72 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 73 MoveServerCallFrom(other); 74 serde_ = other.serde_; 75 } 76 SendServerStream(const void * payload)77 Status SendServerStream(const void* payload) PW_LOCKS_EXCLUDED(rpc_lock()) { 78 RpcLockGuard lock; 79 return NanopbSendStream(*this, payload, serde_); 80 } 81 82 private: 83 const NanopbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock()); 84 }; 85 86 // The BaseNanopbServerReader serves as the base for the ServerReader and 87 // ServerReaderWriter classes. It adds a callback templated on the request 88 // struct type. It is templated on the Request type only. 89 template <typename Request> 90 class BaseNanopbServerReader : public NanopbServerCall { 91 public: BaseNanopbServerReader(const LockedCallContext & context,MethodType type)92 BaseNanopbServerReader(const LockedCallContext& context, MethodType type) 93 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) 94 : NanopbServerCall(context, type) {} 95 96 protected: 97 constexpr BaseNanopbServerReader() = default; 98 99 BaseNanopbServerReader(BaseNanopbServerReader&& other) PW_LOCKS_EXCLUDED(rpc_lock ())100 PW_LOCKS_EXCLUDED(rpc_lock()) { 101 *this = std::move(other); 102 } 103 104 BaseNanopbServerReader& operator=(BaseNanopbServerReader&& other) PW_LOCKS_EXCLUDED(rpc_lock ())105 PW_LOCKS_EXCLUDED(rpc_lock()) { 106 RpcLockGuard lock; 107 MoveNanopbServerCallFrom(other); 108 set_nanopb_on_next_locked(std::move(other.nanopb_on_next_)); 109 return *this; 110 } 111 set_on_next(Function<void (const Request & request)> && on_next)112 void set_on_next(Function<void(const Request& request)>&& on_next) 113 PW_LOCKS_EXCLUDED(rpc_lock()) { 114 RpcLockGuard lock; 115 set_nanopb_on_next_locked(std::move(on_next)); 116 } 117 118 private: set_nanopb_on_next_locked(Function<void (const Request & request)> && on_next)119 void set_nanopb_on_next_locked( 120 Function<void(const Request& request)>&& on_next) 121 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 122 nanopb_on_next_ = std::move(on_next); 123 124 Call::set_on_next_locked( 125 [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS { 126 DecodeToStructAndInvokeOnNext( 127 payload, serde().request(), nanopb_on_next_); 128 }); 129 } 130 131 Function<void(const Request&)> nanopb_on_next_ PW_GUARDED_BY(rpc_lock()); 132 }; 133 134 } // namespace internal 135 136 // The NanopbServerReaderWriter is used to send and receive messages in a Nanopb 137 // bidirectional streaming RPC. 138 // 139 // These classes use private inheritance to hide the internal::Call API while 140 // allow direct use of its public and protected functions. 141 template <typename Request, typename Response> 142 class NanopbServerReaderWriter 143 : private internal::BaseNanopbServerReader<Request> { 144 public: 145 // Creates a NanopbServerReaderWriter that is ready to send responses for a 146 // particular RPC. This can be used for testing or to send responses to an RPC 147 // that has not been started by a client. 148 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)149 [[nodiscard]] static NanopbServerReaderWriter Open(Server& server, 150 uint32_t channel_id, 151 ServiceImpl& service) 152 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 153 using Info = internal::MethodInfo<kMethod>; 154 static_assert(std::is_same_v<Request, typename Info::Request>, 155 "The request type of a NanopbServerReaderWriter must match " 156 "the method."); 157 static_assert(std::is_same_v<Response, typename Info::Response>, 158 "The response type of a NanopbServerReaderWriter must match " 159 "the method."); 160 return server.OpenCall<NanopbServerReaderWriter<Request, Response>, 161 kMethod, 162 MethodType::kBidirectionalStreaming>( 163 channel_id, 164 service, 165 internal::MethodLookup::GetNanopbMethod<ServiceImpl, 166 Info::kMethodId>()); 167 } 168 169 constexpr NanopbServerReaderWriter() = default; 170 171 NanopbServerReaderWriter(NanopbServerReaderWriter&&) = default; 172 NanopbServerReaderWriter& operator=(NanopbServerReaderWriter&&) = default; 173 174 using internal::Call::active; 175 using internal::Call::channel_id; 176 177 // Writes a response struct. Returns the following Status codes: 178 // 179 // OK - the response was successfully sent 180 // FAILED_PRECONDITION - the writer is closed 181 // INTERNAL - pw_rpc was unable to encode the Nanopb protobuf 182 // other errors - the ChannelOutput failed to send the packet; the error 183 // codes are determined by the ChannelOutput implementation 184 // Write(const Response & response)185 Status Write(const Response& response) { 186 return internal::NanopbServerCall::SendServerStream(&response); 187 } 188 189 Status Finish(Status status = OkStatus()) { 190 return internal::Call::CloseAndSendResponse(status); 191 } 192 193 // Functions for setting RPC event callbacks. 194 using internal::Call::set_on_error; 195 using internal::ServerCall::set_on_client_stream_end; 196 using internal::BaseNanopbServerReader<Request>::set_on_next; 197 198 private: 199 friend class internal::NanopbMethod; 200 friend class Server; 201 202 template <typename, typename, uint32_t> 203 friend class internal::test::InvocationContext; 204 205 NanopbServerReaderWriter(const internal::LockedCallContext& context) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())206 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 207 : internal::BaseNanopbServerReader<Request>( 208 context, MethodType::kBidirectionalStreaming) {} 209 }; 210 211 // The NanopbServerReader is used to receive messages and send a response in a 212 // Nanopb client streaming RPC. 213 template <typename Request, typename Response> 214 class NanopbServerReader : private internal::BaseNanopbServerReader<Request> { 215 public: 216 // Creates a NanopbServerReader that is ready to send a response to a 217 // particular RPC. This can be used for testing or to finish an RPC that has 218 // not been started by the client. 219 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)220 [[nodiscard]] static NanopbServerReader Open(Server& server, 221 uint32_t channel_id, 222 ServiceImpl& service) 223 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 224 using Info = internal::MethodInfo<kMethod>; 225 static_assert( 226 std::is_same_v<Request, typename Info::Request>, 227 "The request type of a NanopbServerReader must match the method."); 228 static_assert( 229 std::is_same_v<Response, typename Info::Response>, 230 "The response type of a NanopbServerReader must match the method."); 231 return server.OpenCall<NanopbServerReader<Request, Response>, 232 kMethod, 233 MethodType::kClientStreaming>( 234 channel_id, 235 service, 236 internal::MethodLookup::GetNanopbMethod<ServiceImpl, 237 Info::kMethodId>()); 238 } 239 240 // Allow default construction so that users can declare a variable into which 241 // to move NanopbServerReaders from RPC calls. 242 constexpr NanopbServerReader() = default; 243 244 NanopbServerReader(NanopbServerReader&&) = default; 245 NanopbServerReader& operator=(NanopbServerReader&&) = default; 246 247 using internal::Call::active; 248 using internal::Call::channel_id; 249 250 // Functions for setting RPC event callbacks. 251 using internal::Call::set_on_error; 252 using internal::ServerCall::set_on_client_stream_end; 253 using internal::BaseNanopbServerReader<Request>::set_on_next; 254 255 Status Finish(const Response& response, Status status = OkStatus()) { 256 return internal::NanopbServerCall::SendUnaryResponse(&response, status); 257 } 258 259 private: 260 friend class internal::NanopbMethod; 261 friend class Server; 262 263 template <typename, typename, uint32_t> 264 friend class internal::test::InvocationContext; 265 266 NanopbServerReader(const internal::LockedCallContext& context) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())267 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 268 : internal::BaseNanopbServerReader<Request>( 269 context, MethodType::kClientStreaming) {} 270 }; 271 272 // The NanopbServerWriter is used to send responses in a Nanopb server streaming 273 // RPC. 274 template <typename Response> 275 class NanopbServerWriter : private internal::NanopbServerCall { 276 public: 277 // Creates a NanopbServerWriter that is ready to send responses for a 278 // particular RPC. This can be used for testing or to send responses to an RPC 279 // that has not been started by a client. 280 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)281 [[nodiscard]] static NanopbServerWriter Open(Server& server, 282 uint32_t channel_id, 283 ServiceImpl& service) 284 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 285 using Info = internal::MethodInfo<kMethod>; 286 static_assert( 287 std::is_same_v<Response, typename Info::Response>, 288 "The response type of a NanopbServerWriter must match the method."); 289 return server.OpenCall<NanopbServerWriter<Response>, 290 kMethod, 291 MethodType::kServerStreaming>( 292 channel_id, 293 service, 294 internal::MethodLookup::GetNanopbMethod<ServiceImpl, 295 Info::kMethodId>()); 296 } 297 298 // Allow default construction so that users can declare a variable into which 299 // to move ServerWriters from RPC calls. 300 constexpr NanopbServerWriter() = default; 301 302 NanopbServerWriter(NanopbServerWriter&&) = default; 303 NanopbServerWriter& operator=(NanopbServerWriter&&) = default; 304 305 using internal::Call::active; 306 using internal::Call::channel_id; 307 308 // Writes a response struct. Returns the following Status codes: 309 // 310 // OK - the response was successfully sent 311 // FAILED_PRECONDITION - the writer is closed 312 // INTERNAL - pw_rpc was unable to encode the Nanopb protobuf 313 // other errors - the ChannelOutput failed to send the packet; the error 314 // codes are determined by the ChannelOutput implementation 315 // Write(const Response & response)316 Status Write(const Response& response) { 317 return internal::NanopbServerCall::SendServerStream(&response); 318 } 319 320 Status Finish(Status status = OkStatus()) { 321 return internal::Call::CloseAndSendResponse(status); 322 } 323 324 using internal::Call::set_on_error; 325 using internal::ServerCall::set_on_client_stream_end; 326 327 private: 328 friend class internal::NanopbMethod; 329 friend class Server; 330 331 template <typename, typename, uint32_t> 332 friend class internal::test::InvocationContext; 333 334 NanopbServerWriter(const internal::LockedCallContext& context) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())335 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 336 : internal::NanopbServerCall(context, MethodType::kServerStreaming) {} 337 }; 338 339 template <typename Response> 340 class NanopbUnaryResponder : private internal::NanopbServerCall { 341 public: 342 // Creates a NanopbUnaryResponder that is ready to send a response for a 343 // particular RPC. This can be used for testing or to send responses to an RPC 344 // that has not been started by a client. 345 template <auto kMethod, typename ServiceImpl> Open(Server & server,uint32_t channel_id,ServiceImpl & service)346 [[nodiscard]] static NanopbUnaryResponder Open(Server& server, 347 uint32_t channel_id, 348 ServiceImpl& service) 349 PW_LOCKS_EXCLUDED(internal::rpc_lock()) { 350 using Info = internal::MethodInfo<kMethod>; 351 static_assert( 352 std::is_same_v<Response, typename Info::Response>, 353 "The response type of a NanopbUnaryResponder must match the method."); 354 return server 355 .OpenCall<NanopbUnaryResponder<Response>, kMethod, MethodType::kUnary>( 356 channel_id, 357 service, 358 internal::MethodLookup::GetNanopbMethod<ServiceImpl, 359 Info::kMethodId>()); 360 } 361 362 // Allow default construction so that users can declare a variable into which 363 // to move ServerWriters from RPC calls. 364 constexpr NanopbUnaryResponder() = default; 365 366 NanopbUnaryResponder(NanopbUnaryResponder&&) = default; 367 NanopbUnaryResponder& operator=(NanopbUnaryResponder&&) = default; 368 369 using internal::Call::active; 370 using internal::Call::channel_id; 371 372 // Sends the response. Returns the following Status codes: 373 // 374 // OK - the response was successfully sent 375 // FAILED_PRECONDITION - the writer is closed 376 // INTERNAL - pw_rpc was unable to encode the Nanopb protobuf 377 // other errors - the ChannelOutput failed to send the packet; the error 378 // codes are determined by the ChannelOutput implementation 379 // 380 Status Finish(const Response& response, Status status = OkStatus()) { 381 return internal::NanopbServerCall::SendUnaryResponse(&response, status); 382 } 383 384 using internal::Call::set_on_error; 385 using internal::ServerCall::set_on_client_stream_end; 386 387 private: 388 friend class internal::NanopbMethod; 389 friend class Server; 390 391 template <typename, typename, uint32_t> 392 friend class internal::test::InvocationContext; 393 394 NanopbUnaryResponder(const internal::LockedCallContext& context) PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock ())395 PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock()) 396 : internal::NanopbServerCall(context, MethodType::kUnary) {} 397 }; 398 399 } // namespace pw::rpc 400