1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15
16 #include <cassert>
17 #include <cstddef>
18 #include <limits>
19 #include <utility>
20
21 #include "pw_containers/intrusive_list.h"
22 #include "pw_function/function.h"
23 #include "pw_rpc/internal/call_context.h"
24 #include "pw_rpc/internal/channel.h"
25 #include "pw_rpc/internal/lock.h"
26 #include "pw_rpc/internal/method.h"
27 #include "pw_rpc/internal/packet.h"
28 #include "pw_rpc/method_type.h"
29 #include "pw_rpc/service.h"
30 #include "pw_rpc/writer.h"
31 #include "pw_span/span.h"
32 #include "pw_status/status.h"
33 #include "pw_sync/lock_annotations.h"
34
35 namespace pw::rpc {
36 namespace internal {
37
38 class Endpoint;
39 class LockedEndpoint;
40 class Packet;
41
42 // Whether a call object is associated with a server or a client.
43 enum CallType : bool { kServerCall, kClientCall };
44
45 // Whether callbacks that take a proto use the raw data directly or decode it
46 // to a struct. The RPC lock is held when invoking callbacks that decode to a
47 // struct.
48 enum CallbackProtoType : bool { kRawProto, kProtoStruct };
49
50 // Immutable properties of a call object. These do not change after an active
51 // call is initialized.
52 //
53 // Bits
54 // 0-1: MethodType
55 // 2: CallType
56 // 3: Bool for whether callbacks decode to proto structs
57 //
58 class CallProperties {
59 public:
CallProperties()60 constexpr CallProperties() : bits_(0u) {}
61
CallProperties(MethodType method_type,CallType call_type,CallbackProtoType callback_proto_type)62 constexpr CallProperties(MethodType method_type,
63 CallType call_type,
64 CallbackProtoType callback_proto_type)
65 : bits_(static_cast<uint8_t>(
66 (static_cast<uint8_t>(method_type) << 0) |
67 (static_cast<uint8_t>(call_type) << 2) |
68 (static_cast<uint8_t>(callback_proto_type) << 3))) {}
69
70 constexpr CallProperties(const CallProperties&) = default;
71
72 constexpr CallProperties& operator=(const CallProperties&) = default;
73
method_type()74 constexpr MethodType method_type() const {
75 return static_cast<MethodType>(bits_ & 0b0011u);
76 }
77
call_type()78 constexpr CallType call_type() const {
79 return static_cast<CallType>((bits_ & 0b0100u) >> 2);
80 }
81
callback_proto_type()82 constexpr CallbackProtoType callback_proto_type() const {
83 return static_cast<CallbackProtoType>((bits_ & 0b1000u) >> 3);
84 }
85
86 private:
87 uint8_t bits_;
88 };
89
90 // Unrequested RPCs always use this call ID. When a subsequent request
91 // or response is sent with a matching channel + service + method,
92 // it will match a calls with this ID if one exists.
93 inline constexpr uint32_t kOpenCallId = std::numeric_limits<uint32_t>::max();
94
95 // Legacy clients and servers didn't make use of call IDs at all, and will send
96 // unrequested responses with an "empty" (zero) call ID.
97 inline constexpr uint32_t kLegacyOpenCallId = 0;
98
99 // Internal RPC Call class. The Call is used to respond to any type of RPC.
100 // Public classes like ServerWriters inherit from it with private inheritance
101 // and provide a public API for their use case. The Call's public API is used by
102 // the Server and Client classes.
103 //
104 // Private inheritance is used in place of composition or more complex
105 // inheritance hierarchy so that these objects all inherit from a common
106 // IntrusiveList::Item object. Private inheritance also gives the derived class
107 // full control over their interfaces.
108 //
109 // IMPLEMENTATION NOTE:
110 //
111 // Subclasses of `Call` must include a destructor which calls
112 // `DestroyServerCall` or `DestroyClientCall` (as appropriate) if the subclass
113 // contains any fields which might be referenced by the call's callbacks. This
114 // ensures that the callbacks do not reference fields which may have already
115 // been destroyed.
116 //
117 // At the top level, `ServerCall` and `ClientCall` invoke `DestroyServerCall`
118 // `DestroyClientCall` respectively to perform cleanup in the case where no
119 // subclass carries additional state.
120 class Call : public IntrusiveList<Call>::Item, private rpc::Writer {
121 public:
122 Call(const Call&) = delete;
123
124 // Move support is provided to derived classes through the MoveFrom function.
125 Call(Call&&) = delete;
126
127 Call& operator=(const Call&) = delete;
128 Call& operator=(Call&&) = delete;
129
~Call()130 ~Call() {
131 // Ensure that calls have already been closed and unregistered.
132 // See class IMPLEMENTATION NOTE for further details.
133 PW_DASSERT((state_ & kHasBeenDestroyed) != 0);
134 PW_DASSERT(!active_locked() && !CallbacksAreRunning());
135 }
136
137 // True if the Call is active and ready to send responses.
active()138 [[nodiscard]] bool active() const PW_LOCKS_EXCLUDED(rpc_lock()) {
139 RpcLockGuard lock;
140 return active_locked();
141 }
142
active_locked()143 [[nodiscard]] bool active_locked() const
144 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
145 return (state_ & kActive) != 0;
146 }
147
awaiting_cleanup()148 [[nodiscard]] bool awaiting_cleanup() const
149 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
150 return awaiting_cleanup_ != OkStatus().code();
151 }
152
id()153 uint32_t id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { return id_; }
154
set_id(uint32_t id)155 void set_id(uint32_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { id_ = id; }
156
157 // Public function for accessing the channel ID of this call. Set to 0 when
158 // the call is closed.
channel_id()159 uint32_t channel_id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
160 RpcLockGuard lock;
161 return channel_id_locked();
162 }
163
channel_id_locked()164 uint32_t channel_id_locked() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
165 return channel_id_;
166 }
167
service_id()168 uint32_t service_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
169 return service_id_;
170 }
171
method_id()172 uint32_t method_id() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
173 return method_id_;
174 }
175
176 // Return whether this is a server or client call.
type()177 CallType type() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
178 return properties_.call_type();
179 }
180
181 // Closes the Call and sends a RESPONSE packet, if it is active. Returns the
182 // status from sending the packet, or FAILED_PRECONDITION if the Call is not
183 // active.
CloseAndSendResponse(ConstByteSpan response,Status status)184 Status CloseAndSendResponse(ConstByteSpan response, Status status)
185 PW_LOCKS_EXCLUDED(rpc_lock()) {
186 RpcLockGuard lock;
187 return CloseAndSendResponseLocked(response, status);
188 }
189
CloseAndSendResponseLocked(ConstByteSpan response,Status status)190 Status CloseAndSendResponseLocked(ConstByteSpan response, Status status)
191 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
192 return CloseAndSendFinalPacketLocked(
193 pwpb::PacketType::RESPONSE, response, status);
194 }
195
CloseAndSendResponse(Status status)196 Status CloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
197 return CloseAndSendResponse({}, status);
198 }
199
CloseAndSendServerErrorLocked(Status error)200 Status CloseAndSendServerErrorLocked(Status error)
201 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
202 return CloseAndSendFinalPacketLocked(
203 pwpb::PacketType::SERVER_ERROR, {}, error);
204 }
205
206 // Closes the Call and sends a RESPONSE packet, if the RESPONSE packet failed
207 // to send , keep the call alive and return error. This API allows user to
208 // resend RESPONSE packet when transmission failed.
TryCloseAndSendResponse(ConstByteSpan response,Status status)209 Status TryCloseAndSendResponse(ConstByteSpan response, Status status)
210 PW_LOCKS_EXCLUDED(rpc_lock()) {
211 RpcLockGuard lock;
212 return TryCloseAndSendResponseLocked(response, status);
213 }
214
TryCloseAndSendResponseLocked(ConstByteSpan response,Status status)215 Status TryCloseAndSendResponseLocked(ConstByteSpan response, Status status)
216 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
217 return TryCloseAndSendFinalPacketLocked(
218 pwpb::PacketType::RESPONSE, response, status);
219 }
220
TryCloseAndSendResponse(Status status)221 Status TryCloseAndSendResponse(Status status) PW_LOCKS_EXCLUDED(rpc_lock()) {
222 return TryCloseAndSendResponse({}, status);
223 }
224
TryCloseAndSendServerErrorLocked(Status error)225 Status TryCloseAndSendServerErrorLocked(Status error)
226 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
227 return TryCloseAndSendFinalPacketLocked(
228 pwpb::PacketType::SERVER_ERROR, {}, error);
229 }
230
231 // Public function that indicates that the client requests completion of the
232 // RPC, but is still active and listening to responses. For client streaming
233 // and bi-directional streaming RPCs, this also closes the client stream. If
234 // PW_RPC_COMPLETION_REQUEST_CALLBACK is enabled and
235 // on_client_requested_completion callback is set using the
236 // set_on_completion_requested_if_enabled, then the callback will be invoked
237 // on the server side. The server may then take an appropriate action to
238 // cleanup and stop server streaming.
RequestCompletion()239 Status RequestCompletion() PW_LOCKS_EXCLUDED(rpc_lock()) {
240 RpcLockGuard lock;
241 return RequestCompletionLocked();
242 }
243
244 // Internal function that closes the client stream (if applicable) and sends
245 // CLIENT_REQUEST_COMPLETION packet to request call completion.
RequestCompletionLocked()246 Status RequestCompletionLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
247 MarkStreamCompleted();
248 return SendPacket(pwpb::PacketType::CLIENT_REQUEST_COMPLETION, {}, {});
249 }
250
251 // Sends a payload in either a server or client stream packet.
Write(ConstByteSpan payload)252 Status Write(ConstByteSpan payload) PW_LOCKS_EXCLUDED(rpc_lock()) {
253 RpcLockGuard lock;
254 return WriteLocked(payload);
255 }
256
257 Status WriteLocked(ConstByteSpan payload)
258 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
259
260 // Sends the initial request for a client call. If the request fails, the call
261 // is closed.
SendInitialClientRequest(ConstByteSpan payload)262 void SendInitialClientRequest(ConstByteSpan payload)
263 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
264 if (const Status status = SendPacket(pwpb::PacketType::REQUEST, payload);
265 !status.ok()) {
266 CloseAndMarkForCleanup(status);
267 }
268 }
269
270 void CloseAndMarkForCleanup(Status error)
271 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
272
273 // Whenever a payload arrives (in a server/client stream or in a response),
274 // call the on_next_ callback.
275 // Precondition: rpc_lock() must be held.
276 void HandlePayload(ConstByteSpan payload) PW_UNLOCK_FUNCTION(rpc_lock());
277
278 // Handles an error condition for the call. This closes the call and calls the
279 // on_error callback, if set.
HandleError(Status status)280 void HandleError(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
281 UnregisterAndMarkClosed();
282 CallOnError(status);
283 }
284
285 // Closes the RPC, but does NOT unregister the call or call on_error. The
286 // call must be moved to the endpoint's to_cleanup_ list and have its
287 // CleanUp() method called at a later time. Only for use by the Endpoint.
CloseAndMarkForCleanupFromEndpoint(Status error)288 void CloseAndMarkForCleanupFromEndpoint(Status error)
289 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
290 MarkClosed();
291 awaiting_cleanup_ = error.code();
292 }
293
294 // Clears the awaiting_cleanup_ variable and calls the on_error callback. Only
295 // for use by the Endpoint, which will unlist the call.
CleanUpFromEndpoint()296 void CleanUpFromEndpoint() PW_UNLOCK_FUNCTION(rpc_lock()) {
297 const Status status(static_cast<Status::Code>(awaiting_cleanup_));
298 awaiting_cleanup_ = OkStatus().code();
299 CallOnError(status);
300 }
301
has_client_stream()302 bool has_client_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
303 return HasClientStream(properties_.method_type());
304 }
305
has_server_stream()306 bool has_server_stream() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
307 return HasServerStream(properties_.method_type());
308 }
309
310 // Returns true if the client has already requested completion.
client_requested_completion()311 bool client_requested_completion() const
312 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
313 return (state_ & kClientRequestedCompletion) != 0;
314 }
315
316 // Closes a call without doing anything else. Called from the Endpoint
317 // destructor.
CloseFromDeletedEndpoint()318 void CloseFromDeletedEndpoint() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
319 MarkClosed();
320 awaiting_cleanup_ = OkStatus().code();
321 endpoint_ = nullptr;
322 }
323
324 // Logs detailed info about this call at INFO level. NOT for production use!
325 void DebugLog() const;
326
327 protected:
328 // Creates an inactive Call.
Call()329 constexpr Call()
330 : endpoint_{},
331 channel_id_{},
332 id_{},
333 service_id_{},
334 method_id_{},
335 state_{},
336 awaiting_cleanup_{},
337 callbacks_executing_{},
338 properties_{} {}
339
340 // Creates an active server-side Call.
341 Call(const LockedCallContext& context, CallProperties properties)
342 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
343
344 // Creates an active client-side Call.
345 Call(LockedEndpoint& client,
346 uint32_t channel_id,
347 uint32_t service_id,
348 uint32_t method_id,
349 CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
350
351 // Closes the call and waits for their callbacks to complete so destructors
352 // can run safely.
353 void DestroyServerCall() PW_LOCKS_EXCLUDED(rpc_lock());
354 void DestroyClientCall() PW_LOCKS_EXCLUDED(rpc_lock());
355
CallbackStarted()356 void CallbackStarted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
357 callbacks_executing_ += 1;
358 }
359
CallbackFinished()360 void CallbackFinished() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
361 callbacks_executing_ -= 1;
362 }
363
364 // This call must be in a closed state when this is called.
365 void MoveFrom(Call& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
366
endpoint()367 Endpoint& endpoint() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
368 return *endpoint_;
369 }
370
371 // Public function that sets the on_next function in the raw API.
set_on_next(Function<void (ConstByteSpan)> && on_next)372 void set_on_next(Function<void(ConstByteSpan)>&& on_next)
373 PW_LOCKS_EXCLUDED(rpc_lock()) {
374 RpcLockGuard lock;
375 set_on_next_locked(std::move(on_next));
376 }
377
378 // Internal function that sets on_next.
set_on_next_locked(Function<void (ConstByteSpan)> && on_next)379 void set_on_next_locked(Function<void(ConstByteSpan)>&& on_next)
380 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
381 on_next_ = std::move(on_next);
382 }
383
384 // Public function that sets the on_error callback.
set_on_error(Function<void (Status)> && on_error)385 void set_on_error(Function<void(Status)>&& on_error)
386 PW_LOCKS_EXCLUDED(rpc_lock()) {
387 RpcLockGuard lock;
388 set_on_error_locked(std::move(on_error));
389 }
390
391 // Internal function that sets on_error.
set_on_error_locked(Function<void (Status)> && on_error)392 void set_on_error_locked(Function<void(Status)>&& on_error)
393 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
394 on_error_ = std::move(on_error);
395 }
396
MarkStreamCompleted()397 void MarkStreamCompleted() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
398 state_ |= kClientRequestedCompletion;
399 }
400
401 // Closes a client call. Sends a CLIENT_REQUEST_COMPLETION for client /
402 // bidirectional streaming RPCs if not sent yet.
403 void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
404
405 // Closes a server call.
CloseAndSendResponseLocked(Status status)406 Status CloseAndSendResponseLocked(Status status)
407 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
408 return CloseAndSendFinalPacketLocked(
409 pwpb::PacketType::RESPONSE, {}, status);
410 }
411
412 // Cancels an RPC. Public function for client calls only.
Cancel()413 Status Cancel() PW_LOCKS_EXCLUDED(rpc_lock()) {
414 RpcLockGuard lock;
415 return CloseAndSendFinalPacketLocked(
416 pwpb::PacketType::CLIENT_ERROR, {}, Status::Cancelled());
417 }
418
419 // Unregisters the RPC from the endpoint & marks as closed. The call may be
420 // active or inactive when this is called.
421 void UnregisterAndMarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
422
423 // Define conversions to the generic server/client RPC writer class.
as_writer()424 constexpr Writer& as_writer() { return *this; }
as_writer()425 constexpr const Writer& as_writer() const { return *this; }
426
427 // Indicates if the on_next and unary on_completed callbacks are internal
428 // wrappers that decode the raw proto before invoking the user's callback. If
429 // they are, the lock must be held when they are invoked.
hold_lock_while_invoking_callback_with_payload()430 bool hold_lock_while_invoking_callback_with_payload() const
431 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
432 return properties_.callback_proto_type() == kProtoStruct;
433 }
434
435 // Decodes a raw protobuf into a proto struct (pwpb or Nanopb) and invokes the
436 // pwpb or Nanopb version of the on_next callback.
437 //
438 // This must ONLY be called from derived classes the wrap the on_next
439 // callback. These classes MUST indicate that they call calls in their
440 // constructor.
441 template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnNext(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &)> & proto_on_next)442 void DecodeToStructAndInvokeOnNext(
443 ConstByteSpan payload,
444 const Decoder& decoder,
445 Function<void(const ProtoStruct&)>& proto_on_next)
446 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
447 if (proto_on_next == nullptr) {
448 return;
449 }
450
451 ProtoStruct proto_struct{};
452
453 if (!decoder.Decode(payload, proto_struct).ok()) {
454 CloseAndMarkForCleanup(Status::DataLoss());
455 return;
456 }
457
458 const uint32_t original_id = id();
459 auto proto_on_next_local = std::move(proto_on_next);
460
461 rpc_lock().unlock();
462 proto_on_next_local(proto_struct);
463 rpc_lock().lock();
464
465 // Restore the original callback if the original call is still active and
466 // the callback has not been replaced.
467 // NOLINTNEXTLINE(bugprone-use-after-move)
468 if (active_locked() && id() == original_id && proto_on_next == nullptr) {
469 proto_on_next = std::move(proto_on_next_local);
470 }
471 }
472
473 // The call is already unregistered and closed.
474 template <typename Decoder, typename ProtoStruct>
DecodeToStructAndInvokeOnCompleted(ConstByteSpan payload,const Decoder & decoder,Function<void (const ProtoStruct &,Status)> & proto_on_completed,Status status)475 void DecodeToStructAndInvokeOnCompleted(
476 ConstByteSpan payload,
477 const Decoder& decoder,
478 Function<void(const ProtoStruct&, Status)>& proto_on_completed,
479 Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
480 // Always move proto_on_completed so it goes out of scope in this function.
481 auto proto_on_completed_local = std::move(proto_on_completed);
482
483 // Move on_error in case an error occurs.
484 auto on_error_local = std::move(on_error_);
485
486 // Release the lock before decoding, since decoder is a global.
487 rpc_lock().unlock();
488
489 if (proto_on_completed_local == nullptr) {
490 return;
491 }
492
493 ProtoStruct proto_struct{};
494 if (decoder.Decode(payload, proto_struct).ok()) {
495 proto_on_completed_local(proto_struct, status);
496 } else if (on_error_local != nullptr) {
497 on_error_local(Status::DataLoss());
498 }
499 }
500
501 // An active call cannot be moved if its callbacks are running. This function
502 // must be called on the call being moved before updating any state.
503 static void WaitUntilReadyForMove(Call& destination, Call& source)
504 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
505
506 private:
507 friend class rpc::Writer;
508
509 enum State : uint8_t {
510 kActive = 0b001,
511 kClientRequestedCompletion = 0b010,
512 kHasBeenDestroyed = 0b100,
513 };
514
515 // Common constructor for server & client calls.
516 Call(LockedEndpoint& endpoint,
517 uint32_t id,
518 uint32_t channel_id,
519 uint32_t service_id,
520 uint32_t method_id,
521 CallProperties properties);
522
523 Packet MakePacket(pwpb::PacketType type,
524 ConstByteSpan payload,
525 Status status = OkStatus()) const
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock ())526 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
527 return Packet(type,
528 channel_id_locked(),
529 service_id(),
530 method_id(),
531 id_,
532 payload,
533 status);
534 }
535
536 // Marks a call object closed without doing anything else. The call is not
537 // removed from the calls list and no callbacks are called.
MarkClosed()538 void MarkClosed() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
539 channel_id_ = Channel::kUnassignedChannelId;
540 id_ = 0;
541 state_ = kClientRequestedCompletion;
542 }
543
544 // Calls the on_error callback without closing the RPC. This is used when the
545 // call has already completed.
546 void CallOnError(Status error) PW_UNLOCK_FUNCTION(rpc_lock());
547
548 // If required, removes this call from the endpoint's to_cleanup_ list and
549 // calls CleanUp(). Returns true if cleanup was required, which means the lock
550 // was released.
551 bool CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
552
553 // Sends a payload with the specified type. The payload may either be in a
554 // previously acquired buffer or in a standalone buffer.
555 //
556 // Returns FAILED_PRECONDITION if the call is not active().
557 Status SendPacket(pwpb::PacketType type,
558 ConstByteSpan payload,
559 Status status = OkStatus())
560 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
561
562 Status CloseAndSendFinalPacketLocked(pwpb::PacketType type,
563 ConstByteSpan response,
564 Status status)
565 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
566
567 Status TryCloseAndSendFinalPacketLocked(pwpb::PacketType type,
568 ConstByteSpan response,
569 Status status)
570 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
571
CallbacksAreRunning()572 bool CallbacksAreRunning() const PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
573 return callbacks_executing_ != 0u;
574 }
575
576 // Waits for callbacks to complete so that a call object can be destroyed.
577 void WaitForCallbacksToComplete() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
578
579 Endpoint* endpoint_ PW_GUARDED_BY(rpc_lock());
580 uint32_t channel_id_ PW_GUARDED_BY(rpc_lock());
581 uint32_t id_ PW_GUARDED_BY(rpc_lock());
582 uint32_t service_id_ PW_GUARDED_BY(rpc_lock());
583 uint32_t method_id_ PW_GUARDED_BY(rpc_lock());
584
585 // State of call and client stream.
586 //
587 // bit 0: call is active
588 // bit 1: client stream is active
589 // bit 2: call has been destroyed
590 uint8_t state_ PW_GUARDED_BY(rpc_lock());
591
592 // If non-OK, indicates that the call was closed and needs to have its
593 // on_error called with this Status code. Uses a uint8_t for compactness.
594 uint8_t awaiting_cleanup_ PW_GUARDED_BY(rpc_lock());
595
596 // Tracks how many of this call's callbacks are running. Must be 0 for the
597 // call to be destroyed.
598 uint8_t callbacks_executing_ PW_GUARDED_BY(rpc_lock());
599
600 CallProperties properties_ PW_GUARDED_BY(rpc_lock());
601
602 // Called when the RPC is terminated due to an error.
603 Function<void(Status error)> on_error_ PW_GUARDED_BY(rpc_lock());
604
605 // Called when a request is received. Only used for RPCs with client streams.
606 // The raw payload buffer is passed to the callback.
607 Function<void(ConstByteSpan payload)> on_next_ PW_GUARDED_BY(rpc_lock());
608 };
609
610 } // namespace internal
611
active()612 inline bool Writer::active() const {
613 return static_cast<const internal::Call*>(this)->active();
614 }
615
channel_id()616 inline uint32_t Writer::channel_id() const {
617 return static_cast<const internal::Call*>(this)->channel_id();
618 }
619
Write(ConstByteSpan payload)620 inline Status Writer::Write(ConstByteSpan payload) {
621 return static_cast<internal::Call*>(this)->Write(payload);
622 }
623
624 } // namespace pw::rpc
625