• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/internal/call.h"
16 
17 #include "pw_assert/check.h"
18 #include "pw_log/log.h"
19 #include "pw_preprocessor/util.h"
20 #include "pw_rpc/client.h"
21 #include "pw_rpc/internal/encoding_buffer.h"
22 #include "pw_rpc/internal/endpoint.h"
23 #include "pw_rpc/internal/method.h"
24 #include "pw_rpc/server.h"
25 
26 // If the callback timeout is enabled, count the number of iterations of the
27 // waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS.
28 #if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
29 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
30   iterations += 1;                                      \
31   PW_CHECK(                                                                  \
32       iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS,                            \
33       "A callback for RPC %u:%08x/%08x has not finished after "              \
34       PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS)                            \
35       " ticks. This may indicate that an RPC callback attempted to "         \
36       timeout_source                                                         \
37       " its own call object, which is not permitted. Fix this condition or " \
38       "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this "     \
39       "crash. See https://pigweed.dev/pw_rpc"                                \
40       "#destructors-moves-wait-for-callbacks-to-complete for details.",      \
41       static_cast<unsigned>((call).channel_id_),                             \
42       static_cast<unsigned>((call).service_id_),                             \
43       static_cast<unsigned>((call).method_id_))
44 #else
45 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
46   static_cast<void>(iterations)
47 #endif  // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
48 
49 namespace pw::rpc::internal {
50 
51 using pwpb::PacketType;
52 
53 // Creates an active server-side Call.
Call(const LockedCallContext & context,CallProperties properties)54 Call::Call(const LockedCallContext& context, CallProperties properties)
55     : Call(context.server().ClaimLocked(),
56            context.call_id(),
57            context.channel_id(),
58            UnwrapServiceId(context.service().service_id()),
59            context.method().id(),
60            properties) {}
61 
62 // Creates an active client-side call, assigning it a new ID.
Call(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)63 Call::Call(LockedEndpoint& client,
64            uint32_t channel_id,
65            uint32_t service_id,
66            uint32_t method_id,
67            CallProperties properties)
68     : Call(client,
69            client.NewCallId(),
70            channel_id,
71            service_id,
72            method_id,
73            properties) {}
74 
Call(LockedEndpoint & endpoint_ref,uint32_t call_id,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)75 Call::Call(LockedEndpoint& endpoint_ref,
76            uint32_t call_id,
77            uint32_t channel_id,
78            uint32_t service_id,
79            uint32_t method_id,
80            CallProperties properties)
81     : endpoint_(&endpoint_ref),
82       channel_id_(channel_id),
83       id_(call_id),
84       service_id_(service_id),
85       method_id_(method_id),
86       // Note: Bit kActive set to 1 and kClientRequestedCompletion is set to 0.
87       state_(kActive),
88       awaiting_cleanup_(OkStatus().code()),
89       callbacks_executing_(0),
90       properties_(properties) {
91   PW_CHECK_UINT_NE(channel_id,
92                    Channel::kUnassignedChannelId,
93                    "Calls cannot be created with channel ID 0 "
94                    "(Channel::kUnassignedChannelId)");
95   endpoint().RegisterCall(*this);
96 }
97 
DestroyServerCall()98 void Call::DestroyServerCall() {
99   RpcLockGuard lock;
100   // Any errors are logged in Channel::Send.
101   CloseAndSendResponseLocked(OkStatus()).IgnoreError();
102   WaitForCallbacksToComplete();
103   state_ |= kHasBeenDestroyed;
104 }
105 
DestroyClientCall()106 void Call::DestroyClientCall() {
107   RpcLockGuard lock;
108   CloseClientCall();
109   WaitForCallbacksToComplete();
110   state_ |= kHasBeenDestroyed;
111 }
112 
WaitForCallbacksToComplete()113 void Call::WaitForCallbacksToComplete() {
114   do {
115     int iterations = 0;
116     while (CallbacksAreRunning()) {
117       PW_RPC_CHECK_FOR_DEADLOCK("destroy", *this);
118       YieldRpcLock();
119     }
120 
121   } while (CleanUpIfRequired());
122 }
123 
MoveFrom(Call & other)124 void Call::MoveFrom(Call& other) {
125   PW_DCHECK(!active_locked());
126   PW_DCHECK(!awaiting_cleanup() && !other.awaiting_cleanup());
127 
128   if (!other.active_locked()) {
129     return;  // Nothing else to do; this call is already closed.
130   }
131 
132   // An active call with an executing callback cannot be moved. Derived call
133   // classes must wait for callbacks to finish before calling MoveFrom.
134   PW_DCHECK(!other.CallbacksAreRunning());
135 
136   // Copy all members from the other call.
137   endpoint_ = other.endpoint_;
138   channel_id_ = other.channel_id_;
139   id_ = other.id_;
140   service_id_ = other.service_id_;
141   method_id_ = other.method_id_;
142 
143   state_ = other.state_;
144 
145   // No need to move awaiting_cleanup_, since it is 0 in both calls here.
146 
147   properties_ = other.properties_;
148 
149   // callbacks_executing_ is not moved since it is associated with the object in
150   // memory, not the call.
151 
152   on_error_ = std::move(other.on_error_);
153   on_next_ = std::move(other.on_next_);
154 
155   // Mark the other call inactive, unregister it, and register this one.
156   other.MarkClosed();
157 
158   endpoint().UnregisterCall(other);
159   endpoint().RegisterUniqueCall(*this);
160 }
161 
WaitUntilReadyForMove(Call & destination,Call & source)162 void Call::WaitUntilReadyForMove(Call& destination, Call& source) {
163   do {
164     // Wait for the source's callbacks to finish if it is active.
165     int iterations = 0;
166     while (source.active_locked() && source.CallbacksAreRunning()) {
167       PW_RPC_CHECK_FOR_DEADLOCK("move", source);
168       YieldRpcLock();
169     }
170 
171     // At this point, no callbacks are running in the source call. If cleanup
172     // is required for the destination call, perform it and retry since
173     // cleanup releases and reacquires the RPC lock.
174   } while (source.CleanUpIfRequired() || destination.CleanUpIfRequired());
175 }
176 
CallOnError(Status error)177 void Call::CallOnError(Status error) {
178   auto on_error_local = std::move(on_error_);
179 
180   CallbackStarted();
181 
182   rpc_lock().unlock();
183   if (on_error_local) {
184     on_error_local(error);
185   }
186 
187   // This mutex lock could be avoided by making callbacks_executing_ atomic.
188   RpcLockGuard lock;
189   CallbackFinished();
190 }
191 
CleanUpIfRequired()192 bool Call::CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
193   if (!awaiting_cleanup()) {
194     return false;
195   }
196   endpoint_->CleanUpCall(*this);
197   rpc_lock().lock();
198   return true;
199 }
200 
SendPacket(PacketType type,ConstByteSpan payload,Status status)201 Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
202   if (!active_locked()) {
203     encoding_buffer.ReleaseIfAllocated();
204     return Status::FailedPrecondition();
205   }
206 
207   Channel* channel = endpoint_->GetInternalChannel(channel_id_);
208   if (channel == nullptr) {
209     encoding_buffer.ReleaseIfAllocated();
210     return Status::Unavailable();
211   }
212   return channel->Send(MakePacket(type, payload, status));
213 }
214 
CloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)215 Status Call::CloseAndSendFinalPacketLocked(PacketType type,
216                                            ConstByteSpan response,
217                                            Status status) {
218   const Status send_status = SendPacket(type, response, status);
219   UnregisterAndMarkClosed();
220   return send_status;
221 }
222 
TryCloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)223 Status Call::TryCloseAndSendFinalPacketLocked(PacketType type,
224                                               ConstByteSpan response,
225                                               Status status) {
226   const Status send_status = SendPacket(type, response, status);
227   // Only close the call if the final packet gets sent out successfully.
228   if (send_status.ok()) {
229     UnregisterAndMarkClosed();
230   }
231   return send_status;
232 }
233 
WriteLocked(ConstByteSpan payload)234 Status Call::WriteLocked(ConstByteSpan payload) {
235   return SendPacket(properties_.call_type() == kServerCall
236                         ? PacketType::SERVER_STREAM
237                         : PacketType::CLIENT_STREAM,
238                     payload);
239 }
240 
241 // This definition is in the .cc file because the Endpoint class is not defined
242 // in the Call header, due to circular dependencies between the two.
CloseAndMarkForCleanup(Status error)243 void Call::CloseAndMarkForCleanup(Status error) {
244   endpoint_->CloseCallAndMarkForCleanup(*this, error);
245 }
246 
HandlePayload(ConstByteSpan payload)247 void Call::HandlePayload(ConstByteSpan payload) {
248   // pw_rpc only supports handling packets for a particular RPC one at a time.
249   // Check if any callbacks are running and drop the packet if they are.
250   //
251   // The on_next callback cannot support multiple packets at once since it is
252   // moved before it is invoked. on_error and on_completed are only called
253   // after the call is closed.
254   if (CallbacksAreRunning()) {
255     PW_LOG_WARN(
256         "Received stream packet for %u:%08x/%08x before the callback for a "
257         "previous packet completed! This packet will be dropped. This can be "
258         "avoided by handling packets for a particular RPC on only one thread.",
259         static_cast<unsigned>(channel_id_),
260         static_cast<unsigned>(service_id_),
261         static_cast<unsigned>(method_id_));
262     rpc_lock().unlock();
263     return;
264   }
265 
266   if (on_next_ == nullptr) {
267     rpc_lock().unlock();
268     return;
269   }
270 
271   const uint32_t original_id = id();
272   auto on_next_local = std::move(on_next_);
273   CallbackStarted();
274 
275   if (hold_lock_while_invoking_callback_with_payload()) {
276     on_next_local(payload);
277   } else {
278     rpc_lock().unlock();
279     on_next_local(payload);
280     rpc_lock().lock();
281   }
282 
283   CallbackFinished();
284 
285   // Restore the original callback if the original call is still active and
286   // the callback has not been replaced.
287   // NOLINTNEXTLINE(bugprone-use-after-move)
288   if (active_locked() && id() == original_id && on_next_ == nullptr) {
289     on_next_ = std::move(on_next_local);
290   }
291 
292   // Clean up calls in case decoding failed.
293   endpoint_->CleanUpCalls();
294 }
295 
CloseClientCall()296 void Call::CloseClientCall() {
297   // When a client call is closed, for bidirectional and client streaming RPCs,
298   // the server may be waiting for client stream messages, so we need to notify
299   // the server that the client has requested for completion and no further
300   // requests should be expected from the client. For unary and server streaming
301   // RPCs, since the client is not sending messages, server does not need to be
302   // notified.
303   if (has_client_stream() && !client_requested_completion()) {
304     RequestCompletionLocked().IgnoreError();
305   }
306   UnregisterAndMarkClosed();
307 }
308 
UnregisterAndMarkClosed()309 void Call::UnregisterAndMarkClosed() {
310   if (active_locked()) {
311     endpoint().UnregisterCall(*this);
312     MarkClosed();
313   }
314 }
315 
DebugLog() const316 void Call::DebugLog() const PW_NO_LOCK_SAFETY_ANALYSIS {
317   PW_LOG_INFO(
318       "Call %p\n"
319       "\tEndpoint: %p\n"
320       "\tCall ID:  %8u\n"
321       "\tChannel:  %8u\n"
322       "\tService:  %08x\n"
323       "\tMethod:   %08x\n"
324       "\tState:    %8x\n"
325       "\tCleanup:  %8s\n"
326       "\tBusy CBs: %8x\n"
327       "\tType:     %8d\n"
328       "\tClient:   %8d\n"
329       "\tWrapped:  %8d\n"
330       "\ton_error: %8d\n"
331       "\ton_next:  %8d\n",
332       static_cast<const void*>(this),
333       static_cast<const void*>(endpoint_),
334       static_cast<unsigned>(id_),
335       static_cast<unsigned>(channel_id_),
336       static_cast<unsigned>(service_id_),
337       static_cast<unsigned>(method_id_),
338       static_cast<int>(state_),
339       Status(static_cast<Status::Code>(awaiting_cleanup_)).str(),
340       static_cast<int>(callbacks_executing_),
341       static_cast<int>(properties_.method_type()),
342       static_cast<int>(properties_.call_type()),
343       static_cast<int>(hold_lock_while_invoking_callback_with_payload()),
344       static_cast<int>(on_error_ == nullptr),
345       static_cast<int>(on_next_ == nullptr));
346 }
347 
348 }  // namespace pw::rpc::internal
349