• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cassert>
17 #include <cstddef>
18 #include <limits>
19 #include <utility>
20 
21 #include "pw_containers/intrusive_list.h"
22 #include "pw_function/function.h"
23 #include "pw_rpc/internal/call_context.h"
24 #include "pw_rpc/internal/channel.h"
25 #include "pw_rpc/internal/lock.h"
26 #include "pw_rpc/internal/method.h"
27 #include "pw_rpc/internal/packet.h"
28 #include "pw_rpc/method_type.h"
29 #include "pw_rpc/service.h"
30 #include "pw_span/span.h"
31 #include "pw_status/status.h"
32 #include "pw_sync/lock_annotations.h"
33 
34 namespace pw::rpc {
35 
36 class Writer;
37 
38 namespace internal {
39 
40 class Endpoint;
41 class LockedEndpoint;
42 class Packet;
43 
44 // Whether a call object is associated with a server or a client.
45 enum CallType : bool { kServerCall, kClientCall };
46 
47 // Whether callbacks that take a proto use the raw data directly or decode it
48 // to a struct. The RPC lock is held when invoking callbacks that decode to a
49 // struct.
50 enum CallbackProtoType : bool { kRawProto, kProtoStruct };
51 
52 // Immutable properties of a call object. These do not change after an active
53 // call is initialized.
54 //
55 // Bits
56 //     0-1: MethodType
57 //       2: CallType
58 //       3: Bool for whether callbacks decode to proto structs
59 //
60 class CallProperties {
61  public:
CallProperties()62   constexpr CallProperties() : bits_(0u) {}
63 
CallProperties(MethodType method_type,CallType call_type,CallbackProtoType callback_proto_type)64   constexpr CallProperties(MethodType method_type,
65                            CallType call_type,
66                            CallbackProtoType callback_proto_type)
67       : bits_((static_cast<uint8_t>(method_type) << 0) |
68               (static_cast<uint8_t>(call_type) << 2) |
69               (static_cast<uint8_t>(callback_proto_type) << 3)) {}
70 
71   constexpr CallProperties(const CallProperties&) = default;
72 
73   constexpr CallProperties& operator=(const CallProperties&) = default;
74 
method_type()75   constexpr MethodType method_type() const {
76     return static_cast<MethodType>(bits_ & 0b0011u);
77   }
78 
call_type()79   constexpr CallType call_type() const {
80     return static_cast<CallType>((bits_ & 0b0100u) >> 2);
81   }
82 
callback_proto_type()83   constexpr CallbackProtoType callback_proto_type() const {
84     return static_cast<CallbackProtoType>((bits_ & 0b1000u) >> 3);
85   }
86 
87  private:
88   uint8_t bits_;
89 };
90 
91 // Unrequested RPCs always use this call ID. When a subsequent request
92 // or response is sent with a matching channel + service + method,
93 // it will match a calls with this ID if one exists.
94 inline constexpr uint32_t kOpenCallId = std::numeric_limits<uint32_t>::max();
95 
96 // Internal RPC Call class. The Call is used to respond to any type of RPC.
97 // Public classes like ServerWriters inherit from it with private inheritance
98 // and provide a public API for their use case. The Call's public API is used by
99 // the Server and Client classes.
100 //
101 // Private inheritance is used in place of composition or more complex
102 // inheritance hierarchy so that these objects all inherit from a common
103 // IntrusiveList::Item object. Private inheritance also gives the derived classs
104 // full control over their interfaces.
105 class Call : public IntrusiveList<Call>::Item {
106  public:
107   Call(const Call&) = delete;
108 
109   // Move support is provided to derived classes through the MoveFrom function.
110   Call(Call&&) = delete;
111 
112   Call& operator=(const Call&) = delete;
113   Call& operator=(Call&&) = delete;
114 
115   ~Call() PW_LOCKS_EXCLUDED(rpc_lock());
116 
117   // True if the Call is active and ready to send responses.
active()118   [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) {
119     RpcLockGuard lock;
120     return active_locked();
121   }
122 
active_locked()123   [[nodiscard]] bool active_locked() const
124       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
125     return (state_ & kActive) != 0;
126   }
127 
awaiting_cleanup()128   [[nodiscard]] bool awaiting_cleanup() const
129       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
130     return awaiting_cleanup_ != OkStatus().code();
131   }
132 
id()133   uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; }
134 
set_id(uint32_t id)135   void set_id(uint32_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { id_ = id; }
136 
137   // Public function for accessing the channel ID of this call. Set to 0 when
138   // the call is closed.
channel_id()139   uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
140     RpcLockGuard lock;
141     return channel_id_locked();
142   }
143 
channel_id_locked()144   uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
145     return channel_id_;
146   }
147 
service_id()148   uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
149     return service_id_;
150   }
151 
method_id()152   uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
153     return method_id_;
154   }
155 
156   // Return whether this is a server or client call.
type()157   CallType type() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
158     return properties_.call_type();
159   }
160 
161   // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
162   // status from sending the packet, or FAILED_PRECONDITION if the Call is not
163   // active.
CloseAndSendResponse(ConstByteSpan response,Status status)164   Status CloseAndSendResponse(ConstByteSpan response, Status status)
165       PW_LOCKS_EXCLUDED(rpc_lock()) {
166     RpcLockGuard lock;
167     return CloseAndSendResponseLocked(response, status);
168   }
169 
CloseAndSendResponseLocked(ConstByteSpan response,Status status)170   Status CloseAndSendResponseLocked(ConstByteSpan response, Status status)
171       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
172     return CloseAndSendFinalPacketLocked(
173         pwpb::PacketType::RESPONSE, response, status);
174   }
175 
CloseAndSendResponse(Status status)176   Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
177     return CloseAndSendResponse({}, status);
178   }
179 
CloseAndSendServerErrorLocked(Status error)180   Status CloseAndSendServerErrorLocked(Status error)
181       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
182     return CloseAndSendFinalPacketLocked(
183         pwpb::PacketType::SERVER_ERROR, {}, error);
184   }
185 
186   // Public function that ends the client stream for a client call.
CloseClientStream()187   Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock()) {
188     RpcLockGuard lock;
189     return CloseClientStreamLocked();
190   }
191 
192   // Internal function that closes the client stream.
CloseClientStreamLocked()193   Status CloseClientStreamLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
194     MarkClientStreamCompleted();
195     return SendPacket(pwpb::PacketType::CLIENT_STREAM_END, {}, {});
196   }
197 
198   // Sends a payload in either a server or client stream packet.
Write(ConstByteSpan payload)199   Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
200     RpcLockGuard lock;
201     return WriteLocked(payload);
202   }
203 
204   Status WriteLocked(ConstByteSpan payload)
205       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
206 
207   // Sends the initial request for a client call. If the request fails, the call
208   // is closed.
SendInitialClientRequest(ConstByteSpan payload)209   void SendInitialClientRequest(ConstByteSpan payload)
210       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
211     if (const Status status = SendPacket(pwpb::PacketType::REQUEST, payload);
212         !status.ok()) {
213       CloseAndMarkForCleanup(status);
214     }
215   }
216 
217   void CloseAndMarkForCleanup(Status error)
218       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
219 
220   // Whenever a payload arrives (in a server/client stream or in a response),
221   // call the on_next_ callback.
222   // Precondition: rpc_lock() must be held.
223   void HandlePayload(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock());
224 
225   // Handles an error condition for the call. This closes the call and calls the
226   // on_error callback, if set.
HandleError(Status status)227   void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
228     UnregisterAndMarkClosed();
229     CallOnError(status);
230   }
231 
232   // Closes the RPC, but does NOT unregister the call or call on_error. The
233   // call must be moved to the endpoint's to_cleanup_ list and have its
234   // CleanUp() method called at a later time. Only for use by the Endpoint.
CloseAndMarkForCleanupFromEndpoint(Status error)235   void CloseAndMarkForCleanupFromEndpoint(Status error)
236       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
237     MarkClosed();
238     awaiting_cleanup_ = error.code();
239   }
240 
241   // Clears the awaiting_cleanup_ variable and calls the on_error callback. Only
242   // for use by the Endpoint, which will unlist the call.
CleanUpFromEndpoint()243   void CleanUpFromEndpoint() PW_UNLOCK_FUNCTION(rpc_lock()) {
244     const Status status(static_cast<Status::Code>(awaiting_cleanup_));
245     awaiting_cleanup_ = OkStatus().code();
246     CallOnError(status);
247   }
248 
has_client_stream()249   bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
250     return HasClientStream(properties_.method_type());
251   }
252 
has_server_stream()253   bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
254     return HasServerStream(properties_.method_type());
255   }
256 
client_stream_open()257   bool client_stream_open() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
258     return (state_ & kClientStreamActive) != 0;
259   }
260 
261   // Closes a call without doing anything else. Called from the Endpoint
262   // destructor.
CloseFromDeletedEndpoint()263   void CloseFromDeletedEndpoint() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
264     MarkClosed();
265     awaiting_cleanup_ = OkStatus().code();
266     endpoint_ = nullptr;
267   }
268 
269  protected:
270   // Creates an inactive Call.
Call()271   constexpr Call()
272       : endpoint_{},
273         channel_id_{},
274         id_{},
275         service_id_{},
276         method_id_{},
277         state_{},
278         awaiting_cleanup_{},
279         callbacks_executing_{},
280         properties_{} {}
281 
282   // Creates an active server-side Call.
283   Call(const LockedCallContext& context, CallProperties properties)
284       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
285 
286   // Creates an active client-side Call.
287   Call(LockedEndpoint& client,
288        uint32_t channel_id,
289        uint32_t service_id,
290        uint32_t method_id,
291        CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
292 
CallbackStarted()293   void CallbackStarted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
294     callbacks_executing_ += 1;
295   }
296 
CallbackFinished()297   void CallbackFinished() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
298     callbacks_executing_ -= 1;
299   }
300 
301   // This call must be in a closed state when this is called.
302   void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
303 
endpoint()304   Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
305     return *endpoint_;
306   }
307 
308   // Public function that sets the on_next function in the raw API.
set_on_next(Function<void (ConstByteSpan)> && on_next)309   void set_on_next(Function<void(ConstByteSpan)>&& on_next)
310       PW_LOCKS_EXCLUDED(rpc_lock()) {
311     RpcLockGuard lock;
312     set_on_next_locked(std::move(on_next));
313   }
314 
315   // Internal function that sets on_next.
set_on_next_locked(Function<void (ConstByteSpan)> && on_next)316   void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
317       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
318     on_next_ = std::move(on_next);
319   }
320 
321   // Public function that sets the on_error callback.
set_on_error(Function<void (Status)> && on_error)322   void set_on_error(Function<void(Status)>&& on_error)
323       PW_LOCKS_EXCLUDED(rpc_lock()) {
324     RpcLockGuard lock;
325     set_on_error_locked(std::move(on_error));
326   }
327 
328   // Internal function that sets on_error.
set_on_error_locked(Function<void (Status)> && on_error)329   void set_on_error_locked(Function<void(Status)>&& on_error)
330       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
331     on_error_ = std::move(on_error);
332   }
333 
MarkClientStreamCompleted()334   void MarkClientStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
335     state_ &= ~kClientStreamActive;
336   }
337 
CloseAndSendResponseLocked(Status status)338   Status CloseAndSendResponseLocked(Status status)
339       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
340     return CloseAndSendFinalPacketLocked(
341         pwpb::PacketType::RESPONSE, {}, status);
342   }
343 
344   // Cancels an RPC. Public function for client calls only.
Cancel()345   Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
346     RpcLockGuard lock;
347     return CloseAndSendFinalPacketLocked(
348         pwpb::PacketType::CLIENT_ERROR, {}, Status::Cancelled());
349   }
350 
351   // Unregisters the RPC from the endpoint & marks as closed. The call may be
352   // active or inactive when this is called.
353   void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
354 
355   // Define conversions to the generic server/client RPC writer class. These
356   // functions are defined in pw_rpc/writer.h after the Writer class is defined.
357   constexpr operator Writer&();
358   constexpr operator const Writer&() const;
359 
360   // Indicates if the on_next and unary on_completed callbacks are internal
361   // wrappers that decode the raw proto before invoking the user's callback. If
362   // they are, the lock must be held when they are invoked.
hold_lock_while_invoking_callback_with_payload()363   bool hold_lock_while_invoking_callback_with_payload() const
364       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
365     return properties_.callback_proto_type() == kProtoStruct;
366   }
367 
368   // Decodes a raw protobuf into a proto struct (pwpb or Nanopb) and invokes the
369   // pwpb or Nanopb version of the on_next callback.
370   //
371   // This must ONLY be called from derived classes the wrap the on_next
372   // callback. These classes MUST indicate that they call calls in their
373   // constructor.
374   template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnNext(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &)> & proto_on_next)375   void DecodeToStructAndInvokeOnNext(
376       ConstByteSpan payload,
377       const Decoder& decoder,
378       Function<void(const ProtoStruct&)>& proto_on_next)
379       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
380     if (proto_on_next == nullptr) {
381       return;
382     }
383 
384     ProtoStruct proto_struct{};
385 
386     if (!decoder.Decode(payload, proto_struct).ok()) {
387       CloseAndMarkForCleanup(Status::DataLoss());
388       return;
389     }
390 
391     const uint32_t original_id = id();
392     auto proto_on_next_local = std::move(proto_on_next);
393 
394     rpc_lock().unlock();
395     proto_on_next_local(proto_struct);
396     rpc_lock().lock();
397 
398     // Restore the original callback if the original call is still active and
399     // the callback has not been replaced.
400     // NOLINTNEXTLINE(bugprone-use-after-move)
401     if (active_locked() && id() == original_id && proto_on_next == nullptr) {
402       proto_on_next = std::move(proto_on_next_local);
403     }
404   }
405 
406   // The call is already unregistered and closed.
407   template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnCompleted(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &,Status)> & proto_on_completed,Status status)408   void DecodeToStructAndInvokeOnCompleted(
409       ConstByteSpan payload,
410       const Decoder& decoder,
411       Function<void(const ProtoStruct&, Status)>& proto_on_completed,
412       Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
413     // Always move proto_on_completed so it goes out of scope in this function.
414     auto proto_on_completed_local = std::move(proto_on_completed);
415 
416     // Move on_error in case an error occurs.
417     auto on_error_local = std::move(on_error_);
418 
419     // Release the lock before decoding, since decoder is a global.
420     rpc_lock().unlock();
421 
422     if (proto_on_completed_local == nullptr) {
423       return;
424     }
425 
426     ProtoStruct proto_struct{};
427     if (decoder.Decode(payload, proto_struct).ok()) {
428       proto_on_completed_local(proto_struct, status);
429     } else if (on_error_local != nullptr) {
430       on_error_local(Status::DataLoss());
431     }
432   }
433 
434   // An active call cannot be moved if its callbacks are running. This function
435   // must be called on the call being moved before updating any state.
436   static void WaitUntilReadyForMove(Call& destination, Call& source)
437       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
438 
439  private:
440   enum State : uint8_t {
441     kActive = 0b01,
442     kClientStreamActive = 0b10,
443   };
444 
445   // Common constructor for server & client calls.
446   Call(LockedEndpoint& endpoint,
447        uint32_t id,
448        uint32_t channel_id,
449        uint32_t service_id,
450        uint32_t method_id,
451        CallProperties properties);
452 
453   Packet MakePacket(pwpb::PacketType type,
454                     ConstByteSpan payload,
455                     Status status = OkStatus()) const
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock ())456       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
457     return Packet(type,
458                   channel_id_locked(),
459                   service_id(),
460                   method_id(),
461                   id_,
462                   payload,
463                   status);
464   }
465 
466   // Marks a call object closed without doing anything else. The call is not
467   // removed from the calls list and no callbacks are called.
MarkClosed()468   void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
469     channel_id_ = Channel::kUnassignedChannelId;
470     id_ = 0;
471     state_ = 0;
472   }
473 
474   // Calls the on_error callback without closing the RPC. This is used when the
475   // call has already completed.
476   void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock());
477 
478   // If required, removes this call from the endpoint's to_cleanup_ list and
479   // calls CleanUp(). Returns true if cleanup was required, which means the lock
480   // was released.
481   bool CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
482 
483   // Sends a payload with the specified type. The payload may either be in a
484   // previously acquired buffer or in a standalone buffer.
485   //
486   // Returns FAILED_PRECONDITION if the call is not active().
487   Status SendPacket(pwpb::PacketType type,
488                     ConstByteSpan payload,
489                     Status status = OkStatus())
490       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
491 
492   Status CloseAndSendFinalPacketLocked(pwpb::PacketType type,
493                                        ConstByteSpan response,
494                                        Status status)
495       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
496 
CallbacksAreRunning()497   bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
498     return callbacks_executing_ != 0u;
499   }
500 
501   Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
502   uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
503   uint32_t id_ PW_GUARDED_BY(rpc_lock());
504   uint32_t service_id_ PW_GUARDED_BY(rpc_lock());
505   uint32_t method_id_ PW_GUARDED_BY(rpc_lock());
506 
507   // State of call and client stream.
508   //
509   //   bit 0: call is active
510   //   bit 1: client stream is active
511   //
512   uint8_t state_ PW_GUARDED_BY(rpc_lock());
513 
514   // If non-OK, indicates that the call was closed and needs to have its
515   // on_error called with this Status code. Uses a uint8_t for compactness.
516   uint8_t awaiting_cleanup_ PW_GUARDED_BY(rpc_lock());
517 
518   // Tracks how many of this call's callbacks are running. Must be 0 for the
519   // call to be destroyed.
520   uint8_t callbacks_executing_ PW_GUARDED_BY(rpc_lock());
521 
522   CallProperties properties_ PW_GUARDED_BY(rpc_lock());
523 
524   // Called when the RPC is terminated due to an error.
525   Function<void(Status error)> on_error_ PW_GUARDED_BY(rpc_lock());
526 
527   // Called when a request is received. Only used for RPCs with client streams.
528   // The raw payload buffer is passed to the callback.
529   Function<void(ConstByteSpan payload)> on_next_ PW_GUARDED_BY(rpc_lock());
530 };
531 
532 }  // namespace internal
533 }  // namespace pw::rpc
534