• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // This file defines the ClientReaderWriter, ClientReader, ClientWriter,
16 // and UnaryReceiver classes for the pw_protobuf RPC interface. These classes
17 // are used for bidirectional, client, and server streaming, and unary RPCs.
18 #pragma once
19 
20 #include "pw_bytes/span.h"
21 #include "pw_function/function.h"
22 #include "pw_rpc/channel.h"
23 #include "pw_rpc/internal/client_call.h"
24 #include "pw_rpc/pwpb/internal/common.h"
25 
26 namespace pw::rpc {
27 namespace internal {
28 
29 // internal::PwpbUnaryResponseClientCall extends
30 // internal::UnaryResponseClientCall by adding a method serializer/deserializer
31 // passed in to Start(), typed request messages to the Start() call, and an
32 // on_completed callback templated on the response type.
33 template <typename Response>
34 class PwpbUnaryResponseClientCall : public UnaryResponseClientCall {
35  public:
36   // Start() can be called with zero or one request objects.
37   template <typename CallType, typename... Request>
Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde,Function<void (const Response &,Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)38   static CallType Start(Endpoint& client,
39                         uint32_t channel_id,
40                         uint32_t service_id,
41                         uint32_t method_id,
42                         const PwpbMethodSerde& serde,
43                         Function<void(const Response&, Status)>&& on_completed,
44                         Function<void(Status)>&& on_error,
45                         const Request&... request)
46       PW_LOCKS_EXCLUDED(rpc_lock()) {
47     rpc_lock().lock();
48     CallType call(
49         client.ClaimLocked(), channel_id, service_id, method_id, serde);
50 
51     call.set_pwpb_on_completed_locked(std::move(on_completed));
52     call.set_on_error_locked(std::move(on_error));
53 
54     if constexpr (sizeof...(Request) == 0u) {
55       call.SendInitialClientRequest({});
56     } else {
57       PwpbSendInitialRequest(call, serde.request(), request...);
58     }
59 
60     client.CleanUpCalls();
61     return call;
62   }
63 
64  protected:
65   // Derived classes allow default construction so that users can declare a
66   // variable into which to move client reader/writers from RPC calls.
67   constexpr PwpbUnaryResponseClientCall() = default;
68 
PwpbUnaryResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const PwpbMethodSerde & serde)69   PwpbUnaryResponseClientCall(LockedEndpoint& client,
70                               uint32_t channel_id,
71                               uint32_t service_id,
72                               uint32_t method_id,
73                               MethodType type,
74                               const PwpbMethodSerde& serde)
75       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
76       : UnaryResponseClientCall(
77             client, channel_id, service_id, method_id, StructCallProps(type)),
78         serde_(&serde) {}
79 
80   // Allow derived classes to be constructed moving another instance.
81   PwpbUnaryResponseClientCall(PwpbUnaryResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())82       PW_LOCKS_EXCLUDED(rpc_lock()) {
83     *this = std::move(other);
84   }
85 
86   // Allow derived classes to use move assignment from another instance.
87   PwpbUnaryResponseClientCall& operator=(PwpbUnaryResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())88       PW_LOCKS_EXCLUDED(rpc_lock()) {
89     RpcLockGuard lock;
90     MovePwpbUnaryResponseClientCallFrom(other);
91     return *this;
92   }
93 
94   // Implement moving by copying the serde pointer and on_completed function.
MovePwpbUnaryResponseClientCallFrom(PwpbUnaryResponseClientCall & other)95   void MovePwpbUnaryResponseClientCallFrom(PwpbUnaryResponseClientCall& other)
96       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
97     MoveUnaryResponseClientCallFrom(other);
98     serde_ = other.serde_;
99     set_pwpb_on_completed_locked(std::move(other.pwpb_on_completed_));
100   }
101 
set_on_completed(Function<void (const Response & response,Status)> && on_completed)102   void set_on_completed(
103       Function<void(const Response& response, Status)>&& on_completed)
104       PW_LOCKS_EXCLUDED(rpc_lock()) {
105     RpcLockGuard lock;
106     set_pwpb_on_completed_locked(std::move(on_completed));
107   }
108 
109   // Sends a streamed request.
110   // Returns the following Status codes:
111   //
112   //   OK - the request was successfully sent
113   //   FAILED_PRECONDITION - the writer is closed
114   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf
115   //   other errors - the ChannelOutput failed to send the packet; the error
116   //       codes are determined by the ChannelOutput implementation
117   //
118   template <typename Request>
SendStreamRequest(const Request & request)119   Status SendStreamRequest(const Request& request)
120       PW_LOCKS_EXCLUDED(rpc_lock()) {
121     RpcLockGuard lock;
122     return PwpbSendStream(*this, request, serde_);
123   }
124 
125  private:
set_pwpb_on_completed_locked(Function<void (const Response & response,Status)> && on_completed)126   void set_pwpb_on_completed_locked(
127       Function<void(const Response& response, Status)>&& on_completed)
128       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
129     pwpb_on_completed_ = std::move(on_completed);
130 
131     UnaryResponseClientCall::set_on_completed_locked(
132         [this](ConstByteSpan payload, Status status)
133             PW_NO_LOCK_SAFETY_ANALYSIS {
134               DecodeToStructAndInvokeOnCompleted(
135                   payload, serde_->response(), pwpb_on_completed_, status);
136             });
137   }
138 
139   const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
140   Function<void(const Response&, Status)> pwpb_on_completed_
141       PW_GUARDED_BY(rpc_lock());
142 };
143 
144 // internal::PwpbStreamResponseClientCall extends
145 // internal::StreamResponseClientCall by adding a method serializer/deserializer
146 // passed in to Start(), typed request messages to the Start() call, and an
147 // on_next callback templated on the response type.
148 template <typename Response>
149 class PwpbStreamResponseClientCall : public StreamResponseClientCall {
150  public:
151   // Start() can be called with zero or one request objects.
152   template <typename CallType, typename... Request>
Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde,Function<void (const Response &)> && on_next,Function<void (Status)> && on_completed,Function<void (Status)> && on_error,const Request &...request)153   static CallType Start(Endpoint& client,
154                         uint32_t channel_id,
155                         uint32_t service_id,
156                         uint32_t method_id,
157                         const PwpbMethodSerde& serde,
158                         Function<void(const Response&)>&& on_next,
159                         Function<void(Status)>&& on_completed,
160                         Function<void(Status)>&& on_error,
161                         const Request&... request)
162       PW_LOCKS_EXCLUDED(rpc_lock()) {
163     rpc_lock().lock();
164     CallType call(
165         client.ClaimLocked(), channel_id, service_id, method_id, serde);
166 
167     call.set_pwpb_on_next_locked(std::move(on_next));
168     call.set_on_completed_locked(std::move(on_completed));
169     call.set_on_error_locked(std::move(on_error));
170 
171     if constexpr (sizeof...(Request) == 0u) {
172       call.SendInitialClientRequest({});
173     } else {
174       PwpbSendInitialRequest(call, serde.request(), request...);
175     }
176     client.CleanUpCalls();
177     return call;
178   }
179 
180  protected:
181   // Derived classes allow default construction so that users can declare a
182   // variable into which to move client reader/writers from RPC calls.
183   constexpr PwpbStreamResponseClientCall() = default;
184 
PwpbStreamResponseClientCall(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type,const PwpbMethodSerde & serde)185   PwpbStreamResponseClientCall(LockedEndpoint& client,
186                                uint32_t channel_id,
187                                uint32_t service_id,
188                                uint32_t method_id,
189                                MethodType type,
190                                const PwpbMethodSerde& serde)
191       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
192       : StreamResponseClientCall(
193             client, channel_id, service_id, method_id, StructCallProps(type)),
194         serde_(&serde) {}
195 
196   // Allow derived classes to be constructed moving another instance.
197   PwpbStreamResponseClientCall(PwpbStreamResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())198       PW_LOCKS_EXCLUDED(rpc_lock()) {
199     *this = std::move(other);
200   }
201 
202   // Allow derived classes to use move assignment from another instance.
203   PwpbStreamResponseClientCall& operator=(PwpbStreamResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock ())204       PW_LOCKS_EXCLUDED(rpc_lock()) {
205     RpcLockGuard lock;
206     MovePwpbStreamResponseClientCallFrom(other);
207     return *this;
208   }
209 
210   // Implement moving by copying the serde pointer and on_next function.
MovePwpbStreamResponseClientCallFrom(PwpbStreamResponseClientCall & other)211   void MovePwpbStreamResponseClientCallFrom(PwpbStreamResponseClientCall& other)
212       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
213     MoveStreamResponseClientCallFrom(other);
214     serde_ = other.serde_;
215     set_pwpb_on_next_locked(std::move(other.pwpb_on_next_));
216   }
217 
set_on_next(Function<void (const Response & response)> && on_next)218   void set_on_next(Function<void(const Response& response)>&& on_next)
219       PW_LOCKS_EXCLUDED(rpc_lock()) {
220     RpcLockGuard lock;
221     set_pwpb_on_next_locked(std::move(on_next));
222   }
223 
224   // Sends a streamed request.
225   // Returns the following Status codes:
226   //
227   //   OK - the request was successfully sent
228   //   FAILED_PRECONDITION - the writer is closed
229   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf protobuf
230   //   other errors - the ChannelOutput failed to send the packet; the error
231   //       codes are determined by the ChannelOutput implementation
232   //
233   template <typename Request>
SendStreamRequest(const Request & request)234   Status SendStreamRequest(const Request& request)
235       PW_LOCKS_EXCLUDED(rpc_lock()) {
236     RpcLockGuard lock;
237     return PwpbSendStream(*this, request, serde_);
238   }
239 
240  private:
set_pwpb_on_next_locked(Function<void (const Response & response)> && on_next)241   void set_pwpb_on_next_locked(
242       Function<void(const Response& response)>&& on_next)
243       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
244     pwpb_on_next_ = std::move(on_next);
245 
246     Call::set_on_next_locked(
247         [this](ConstByteSpan payload) PW_NO_LOCK_SAFETY_ANALYSIS {
248           DecodeToStructAndInvokeOnNext(
249               payload, serde_->response(), pwpb_on_next_);
250         });
251   }
252 
253   const PwpbMethodSerde* serde_ PW_GUARDED_BY(rpc_lock());
254   Function<void(const Response&)> pwpb_on_next_ PW_GUARDED_BY(rpc_lock());
255 };
256 
257 }  // namespace internal
258 
259 // The PwpbClientReaderWriter is used to send and receive typed messages in a
260 // pw_protobuf bidirectional streaming RPC.
261 //
262 // These classes use private inheritance to hide the internal::Call API while
263 // allow direct use of its public and protected functions.
264 template <typename Request, typename Response>
265 class PwpbClientReaderWriter
266     : private internal::PwpbStreamResponseClientCall<Response> {
267  public:
268   // Allow default construction so that users can declare a variable into
269   // which to move client reader/writers from RPC calls.
270   constexpr PwpbClientReaderWriter() = default;
271 
272   PwpbClientReaderWriter(PwpbClientReaderWriter&&) = default;
273   PwpbClientReaderWriter& operator=(PwpbClientReaderWriter&&) = default;
274 
275   using internal::Call::active;
276   using internal::Call::channel_id;
277 
278   // Writes a request. Returns the following Status codes:
279   //
280   //   OK - the request was successfully sent
281   //   FAILED_PRECONDITION - the writer is closed
282   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf message
283   //   other errors - the ChannelOutput failed to send the packet; the error
284   //       codes are determined by the ChannelOutput implementation
285   //
Write(const Request & request)286   Status Write(const Request& request) {
287     return internal::PwpbStreamResponseClientCall<Response>::SendStreamRequest(
288         request);
289   }
290 
291   // Notifies the server that no further client stream messages will be sent.
292   using internal::ClientCall::CloseClientStream;
293 
294   // Cancels this RPC. Closes the call locally and sends a CANCELLED error to
295   // the server.
296   using internal::Call::Cancel;
297 
298   // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
299   // packet. Future packets for this RPC are dropped, and the client sends a
300   // FAILED_PRECONDITION error in response because the call is not active.
301   using internal::ClientCall::Abandon;
302 
303   // Functions for setting RPC event callbacks.
304   using internal::PwpbStreamResponseClientCall<Response>::set_on_next;
305   using internal::StreamResponseClientCall::set_on_completed;
306   using internal::StreamResponseClientCall::set_on_error;
307 
308  protected:
309   friend class internal::PwpbStreamResponseClientCall<Response>;
310 
PwpbClientReaderWriter(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)311   PwpbClientReaderWriter(internal::LockedEndpoint& client,
312                          uint32_t channel_id_v,
313                          uint32_t service_id,
314                          uint32_t method_id,
315                          const PwpbMethodSerde& serde)
316       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
317       : internal::PwpbStreamResponseClientCall<Response>(
318             client,
319             channel_id_v,
320             service_id,
321             method_id,
322             MethodType::kBidirectionalStreaming,
323             serde) {}
324 };
325 
326 // The PwpbClientReader is used to receive typed messages and send a typed
327 // response in a pw_protobuf client streaming RPC.
328 //
329 // These classes use private inheritance to hide the internal::Call API while
330 // allow direct use of its public and protected functions.
331 template <typename Response>
332 class PwpbClientReader
333     : private internal::PwpbStreamResponseClientCall<Response> {
334  public:
335   // Allow default construction so that users can declare a variable into
336   // which to move client reader/writers from RPC calls.
337   constexpr PwpbClientReader() = default;
338 
339   PwpbClientReader(PwpbClientReader&&) = default;
340   PwpbClientReader& operator=(PwpbClientReader&&) = default;
341 
342   using internal::StreamResponseClientCall::active;
343   using internal::StreamResponseClientCall::channel_id;
344 
345   using internal::Call::Cancel;
346   using internal::ClientCall::Abandon;
347 
348   // Functions for setting RPC event callbacks.
349   using internal::PwpbStreamResponseClientCall<Response>::set_on_next;
350   using internal::StreamResponseClientCall::set_on_completed;
351   using internal::StreamResponseClientCall::set_on_error;
352 
353  private:
354   friend class internal::PwpbStreamResponseClientCall<Response>;
355 
PwpbClientReader(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)356   PwpbClientReader(internal::LockedEndpoint& client,
357                    uint32_t channel_id_v,
358                    uint32_t service_id,
359                    uint32_t method_id,
360                    const PwpbMethodSerde& serde)
361       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
362       : internal::PwpbStreamResponseClientCall<Response>(
363             client,
364             channel_id_v,
365             service_id,
366             method_id,
367             MethodType::kServerStreaming,
368             serde) {}
369 };
370 
371 // The PwpbClientWriter is used to send typed responses in a pw_protobuf server
372 // streaming RPC.
373 //
374 // These classes use private inheritance to hide the internal::Call API while
375 // allow direct use of its public and protected functions.
376 template <typename Request, typename Response>
377 class PwpbClientWriter
378     : private internal::PwpbUnaryResponseClientCall<Response> {
379  public:
380   // Allow default construction so that users can declare a variable into
381   // which to move client reader/writers from RPC calls.
382   constexpr PwpbClientWriter() = default;
383 
384   PwpbClientWriter(PwpbClientWriter&&) = default;
385   PwpbClientWriter& operator=(PwpbClientWriter&&) = default;
386 
387   using internal::UnaryResponseClientCall::active;
388   using internal::UnaryResponseClientCall::channel_id;
389 
390   // Writes a request. Returns the following Status codes:
391   //
392   //   OK - the request was successfully sent
393   //   FAILED_PRECONDITION - the writer is closed
394   //   INTERNAL - pw_rpc was unable to encode the pw_protobuf message
395   //   other errors - the ChannelOutput failed to send the packet; the error
396   //       codes are determined by the ChannelOutput implementation
397   //
Write(const Request & request)398   Status Write(const Request& request) {
399     return internal::PwpbUnaryResponseClientCall<Response>::SendStreamRequest(
400         request);
401   }
402 
403   using internal::Call::Cancel;
404   using internal::Call::CloseClientStream;
405   using internal::ClientCall::Abandon;
406 
407   // Functions for setting RPC event callbacks.
408   using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed;
409   using internal::UnaryResponseClientCall::set_on_error;
410 
411  private:
412   friend class internal::PwpbUnaryResponseClientCall<Response>;
413 
PwpbClientWriter(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)414   PwpbClientWriter(internal::LockedEndpoint& client,
415                    uint32_t channel_id_v,
416                    uint32_t service_id,
417                    uint32_t method_id,
418                    const PwpbMethodSerde& serde)
419       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
420 
421       : internal::PwpbUnaryResponseClientCall<Response>(
422             client,
423             channel_id_v,
424             service_id,
425             method_id,
426             MethodType::kClientStreaming,
427             serde) {}
428 };
429 
430 // The PwpbUnaryReceiver is used to handle a typed response to a pw_protobuf
431 // unary RPC.
432 //
433 // These classes use private inheritance to hide the internal::Call API while
434 // allow direct use of its public and protected functions.
435 template <typename Response>
436 class PwpbUnaryReceiver
437     : private internal::PwpbUnaryResponseClientCall<Response> {
438  public:
439   // Allow default construction so that users can declare a variable into
440   // which to move client reader/writers from RPC calls.
441   constexpr PwpbUnaryReceiver() = default;
442 
443   PwpbUnaryReceiver(PwpbUnaryReceiver&&) = default;
444   PwpbUnaryReceiver& operator=(PwpbUnaryReceiver&&) = default;
445 
446   using internal::Call::active;
447   using internal::Call::channel_id;
448 
449   // Functions for setting RPC event callbacks.
450   using internal::Call::set_on_error;
451   using internal::PwpbUnaryResponseClientCall<Response>::set_on_completed;
452 
453   using internal::Call::Cancel;
454   using internal::ClientCall::Abandon;
455 
456  private:
457   friend class internal::PwpbUnaryResponseClientCall<Response>;
458 
PwpbUnaryReceiver(internal::LockedEndpoint & client,uint32_t channel_id_v,uint32_t service_id,uint32_t method_id,const PwpbMethodSerde & serde)459   PwpbUnaryReceiver(internal::LockedEndpoint& client,
460                     uint32_t channel_id_v,
461                     uint32_t service_id,
462                     uint32_t method_id,
463                     const PwpbMethodSerde& serde)
464       PW_EXCLUSIVE_LOCKS_REQUIRED(internal::rpc_lock())
465       : internal::PwpbUnaryResponseClientCall<Response>(client,
466                                                         channel_id_v,
467                                                         service_id,
468                                                         method_id,
469                                                         MethodType::kUnary,
470                                                         serde) {}
471 };
472 
473 }  // namespace pw::rpc
474