• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <cassert>
17 #include <cstddef>
18 #include <span>
19 #include <utility>
20 
21 #include "pw_containers/intrusive_list.h"
22 #include "pw_function/function.h"
23 #include "pw_rpc/internal/call_context.h"
24 #include "pw_rpc/internal/channel.h"
25 #include "pw_rpc/internal/lock.h"
26 #include "pw_rpc/internal/method.h"
27 #include "pw_rpc/internal/packet.h"
28 #include "pw_rpc/method_type.h"
29 #include "pw_rpc/service.h"
30 #include "pw_status/status.h"
31 #include "pw_sync/lock_annotations.h"
32 
33 namespace pw::rpc {
34 
35 class Writer;
36 
37 namespace internal {
38 
39 class Endpoint;
40 class Packet;
41 
42 // Internal RPC Call class. The Call is used to respond to any type of RPC.
43 // Public classes like ServerWriters inherit from it with private inheritance
44 // and provide a public API for their use case. The Call's public API is used by
45 // the Server and Client classes.
46 //
47 // Private inheritance is used in place of composition or more complex
48 // inheritance hierarchy so that these objects all inherit from a common
49 // IntrusiveList::Item object. Private inheritance also gives the derived classs
50 // full control over their interfaces.
51 class Call : public IntrusiveList<Call>::Item {
52  public:
53   Call(const Call&) = delete;
54 
55   // Move support is provided to derived classes through the MoveFrom function.
56   Call(Call&&) = delete;
57 
58   Call& operator=(const Call&) = delete;
59   Call& operator=(Call&&) = delete;
60 
61   // True if the Call is active and ready to send responses.
active()62   [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) {
63     LockGuard lock(rpc_lock());
64     return active_locked();
65   }
66 
active_locked()67   [[nodiscard]] bool active_locked() const
68       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
69     return rpc_state_ == kActive;
70   }
71 
id()72   uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; }
73 
channel_id()74   uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
75     LockGuard lock(rpc_lock());
76     return channel_id_locked();
77   }
channel_id_locked()78   uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
79     return channel_id_;
80   }
service_id()81   uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
82     return service_id_;
83   }
method_id()84   uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
85     return method_id_;
86   }
87 
88   // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
89   // status from sending the packet, or FAILED_PRECONDITION if the Call is not
90   // active.
CloseAndSendResponse(ConstByteSpan response,Status status)91   Status CloseAndSendResponse(ConstByteSpan response, Status status)
92       PW_LOCKS_EXCLUDED(rpc_lock()) {
93     LockGuard lock(rpc_lock());
94     return CloseAndSendResponseLocked(response, status);
95   }
96 
CloseAndSendResponseLocked(ConstByteSpan response,Status status)97   Status CloseAndSendResponseLocked(ConstByteSpan response, Status status)
98       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
99     return CloseAndSendFinalPacketLocked(
100         PacketType::RESPONSE, response, status);
101   }
102 
CloseAndSendResponse(Status status)103   Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
104     return CloseAndSendResponse({}, status);
105   }
106 
CloseAndSendServerErrorLocked(Status error)107   Status CloseAndSendServerErrorLocked(Status error)
108       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
109     return CloseAndSendFinalPacketLocked(PacketType::SERVER_ERROR, {}, error);
110   }
111 
112   // Public call that ends the client stream for a client call.
CloseClientStream()113   Status CloseClientStream() PW_LOCKS_EXCLUDED(rpc_lock()) {
114     LockGuard lock(rpc_lock());
115     return CloseClientStreamLocked();
116   }
117 
118   // Internal call that closes the client stream.
CloseClientStreamLocked()119   Status CloseClientStreamLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
120     client_stream_state_ = kClientStreamInactive;
121     return SendPacket(PacketType::CLIENT_STREAM_END, {}, {});
122   }
123 
124   // Sends a payload in either a server or client stream packet.
Write(ConstByteSpan payload)125   Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
126     LockGuard lock(rpc_lock());
127     return WriteLocked(payload);
128   }
129 
130   Status WriteLocked(ConstByteSpan payload)
131       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
132 
133   // Sends the initial request for a client call. If the request fails, the call
134   // is closed.
SendInitialClientRequest(ConstByteSpan payload)135   void SendInitialClientRequest(ConstByteSpan payload)
136       PW_UNLOCK_FUNCTION(rpc_lock()) {
137     // TODO(pwbug/597): Ensure the call object is locked before releasing the
138     //     RPC mutex.
139     if (const Status status = SendPacket(PacketType::REQUEST, payload);
140         !status.ok()) {
141       HandleError(status);
142     } else {
143       rpc_lock().unlock();
144     }
145   }
146 
147   // Whenever a payload arrives (in a server/client stream or in a response),
148   // call the on_next_ callback.
149   // Precondition: rpc_lock() must be held.
HandlePayload(ConstByteSpan message)150   void HandlePayload(ConstByteSpan message) const
151       PW_UNLOCK_FUNCTION(rpc_lock()) {
152     const bool invoke = on_next_ != nullptr;
153     // TODO(pwbug/597): Ensure on_next_ is properly guarded.
154     rpc_lock().unlock();
155 
156     if (invoke) {
157       on_next_(message);
158     }
159   }
160 
161   // Handles an error condition for the call. This closes the call and calls the
162   // on_error callback, if set.
HandleError(Status status)163   void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
164     UnregisterAndMarkClosed();
165     CallOnError(status);
166   }
167 
168   // Aborts the RPC because its channel was closed. Does NOT unregister the
169   // call! The calls are removed when iterating over the list in the endpoint.
HandleChannelClose()170   void HandleChannelClose() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
171     // Locking here is problematic because CallOnError releases rpc_lock().
172     //
173     // pwbug/597 must be addressed before the locking here can be cleaned up.
174     MarkClosed();
175 
176     CallOnError(Status::Aborted());
177 
178     // Re-lock rpc_lock().
179     rpc_lock().lock();
180   }
181 
has_client_stream()182   bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
183     return HasClientStream(type_);
184   }
185 
has_server_stream()186   bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
187     return HasServerStream(type_);
188   }
189 
client_stream_open()190   bool client_stream_open() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
191     return client_stream_state_ == kClientStreamActive;
192   }
193 
194   // Keep this public so the Nanopb implementation can set it from a helper
195   // function.
set_on_next(Function<void (ConstByteSpan)> && on_next)196   void set_on_next(Function<void(ConstByteSpan)>&& on_next)
197       PW_LOCKS_EXCLUDED(rpc_lock()) {
198     LockGuard lock(rpc_lock());
199     set_on_next_locked(std::move(on_next));
200   }
201 
202  protected:
203   // Creates an inactive Call.
Call()204   constexpr Call()
205       : endpoint_{},
206         channel_id_{},
207         id_{},
208         service_id_{},
209         method_id_{},
210         rpc_state_{},
211         type_{},
212         call_type_{},
213         client_stream_state_ {}
214   {}
215 
216   // Creates an active server-side Call.
Call(const CallContext & context,MethodType type)217   Call(const CallContext& context, MethodType type)
218       : Call(context.server(),
219              context.call_id(),
220              context.channel_id(),
221              context.service().id(),
222              context.method().id(),
223              type,
224              kServerCall) {}
225 
226   // Creates an active client-side Call.
227   Call(Endpoint& client,
228        uint32_t channel_id,
229        uint32_t service_id,
230        uint32_t method_id,
231        MethodType type);
232 
233   // This call must be in a closed state when this is called.
234   void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
235 
endpoint()236   Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
237     return *endpoint_;
238   }
239 
set_on_next_locked(Function<void (ConstByteSpan)> && on_next)240   void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
241       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
242     on_next_ = std::move(on_next);
243   }
244 
set_on_error(Function<void (Status)> && on_error)245   void set_on_error(Function<void(Status)>&& on_error)
246       PW_LOCKS_EXCLUDED(rpc_lock()) {
247     LockGuard lock(rpc_lock());
248     set_on_error_locked(std::move(on_error));
249   }
250 
set_on_error_locked(Function<void (Status)> && on_error)251   void set_on_error_locked(Function<void(Status)>&& on_error)
252       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
253     on_error_ = std::move(on_error);
254   }
255 
256   // Calls the on_error callback without closing the RPC. This is used when the
257   // call has already completed.
CallOnError(Status error)258   void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock()) {
259     const bool invoke = on_error_ != nullptr;
260 
261     // TODO(pwbug/597): Ensure on_error_ is properly guarded.
262     rpc_lock().unlock();
263     if (invoke) {
264       on_error_(error);
265     }
266   }
267 
MarkClientStreamCompleted()268   void MarkClientStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
269     client_stream_state_ = kClientStreamInactive;
270   }
271 
CloseAndSendResponseLocked(Status status)272   Status CloseAndSendResponseLocked(Status status)
273       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
274     return CloseAndSendFinalPacketLocked(PacketType::RESPONSE, {}, status);
275   }
276 
277   // Cancels an RPC. For client calls only.
Cancel()278   Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
279     LockGuard lock(rpc_lock());
280     return CloseAndSendFinalPacketLocked(
281         PacketType::CLIENT_ERROR, {}, Status::Cancelled());
282   }
283 
284   // Unregisters the RPC from the endpoint & marks as closed. The call may be
285   // active or inactive when this is called.
286   void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
287 
288   // Define conversions to the generic server/client RPC writer class. These
289   // functions are defined in pw_rpc/writer.h after the Writer class is defined.
290   constexpr operator Writer&();
291   constexpr operator const Writer&() const;
292 
293  private:
294   enum CallType : bool { kServerCall, kClientCall };
295 
296   // Common constructor for server & client calls.
297   Call(Endpoint& endpoint,
298        uint32_t id,
299        uint32_t channel_id,
300        uint32_t service_id,
301        uint32_t method_id,
302        MethodType type,
303        CallType call_type);
304 
305   Packet MakePacket(PacketType type,
306                     ConstByteSpan payload,
307                     Status status = OkStatus()) const
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock ())308       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
309     return Packet(type,
310                   channel_id_locked(),
311                   service_id(),
312                   method_id(),
313                   id_,
314                   payload,
315                   status);
316   }
317 
MarkClosed()318   void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
319     channel_id_ = Channel::kUnassignedChannelId;
320     rpc_state_ = kInactive;
321     client_stream_state_ = kClientStreamInactive;
322   }
323 
324   // Sends a payload with the specified type. The payload may either be in a
325   // previously acquired buffer or in a standalone buffer.
326   //
327   // Returns FAILED_PRECONDITION if the call is not active().
328   Status SendPacket(PacketType type,
329                     ConstByteSpan payload,
330                     Status status = OkStatus())
331       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
332 
333   Status CloseAndSendFinalPacketLocked(PacketType type,
334                                        ConstByteSpan response,
335                                        Status status)
336       PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
337 
338   internal::Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
339   uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
340   uint32_t id_ PW_GUARDED_BY(rpc_lock());
341   uint32_t service_id_ PW_GUARDED_BY(rpc_lock());
342   uint32_t method_id_ PW_GUARDED_BY(rpc_lock());
343 
344   enum : bool { kInactive, kActive } rpc_state_ PW_GUARDED_BY(rpc_lock());
345   MethodType type_ PW_GUARDED_BY(rpc_lock());
346   CallType call_type_ PW_GUARDED_BY(rpc_lock());
347   enum : bool {
348     kClientStreamInactive,
349     kClientStreamActive,
350   } client_stream_state_ PW_GUARDED_BY(rpc_lock());
351 
352   // Called when the RPC is terminated due to an error.
353   Function<void(Status error)> on_error_;
354 
355   // Called when a request is received. Only used for RPCs with client streams.
356   // The raw payload buffer is passed to the callback.
357   Function<void(ConstByteSpan payload)> on_next_;
358 };
359 
360 }  // namespace internal
361 }  // namespace pw::rpc
362