• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_rpc/internal/call.h"
16 
17 #include "pw_assert/check.h"
18 #include "pw_log/log.h"
19 #include "pw_preprocessor/util.h"
20 #include "pw_rpc/client.h"
21 #include "pw_rpc/internal/encoding_buffer.h"
22 #include "pw_rpc/internal/endpoint.h"
23 #include "pw_rpc/internal/method.h"
24 #include "pw_rpc/server.h"
25 
26 // If the callback timeout is enabled, count the number of iterations of the
27 // waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS.
28 #if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
29 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
30   iterations += 1;                                      \
31   PW_CHECK(                                                                  \
32       iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS,                            \
33       "A callback for RPC %u:%08x/%08x has not finished after "              \
34       PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS)                            \
35       " ticks. This may indicate that an RPC callback attempted to "         \
36       timeout_source                                                         \
37       " its own call object, which is not permitted. Fix this condition or " \
38       "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this "     \
39       "crash. See https://pigweed.dev/pw_rpc"                                \
40       "#destructors-moves-wait-for-callbacks-to-complete for details.",      \
41       static_cast<unsigned>((call).channel_id_),                             \
42       static_cast<unsigned>((call).service_id_),                             \
43       static_cast<unsigned>((call).method_id_))
44 #else
45 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
46   static_cast<void>(iterations)
47 #endif  // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
48 
49 namespace pw::rpc::internal {
50 
51 using pwpb::PacketType;
52 
53 // Creates an active server-side Call.
Call(const LockedCallContext & context,CallProperties properties)54 Call::Call(const LockedCallContext& context, CallProperties properties)
55     : Call(context.server().ClaimLocked(),
56            context.call_id(),
57            context.channel_id(),
58            UnwrapServiceId(context.service().service_id()),
59            context.method().id(),
60            properties) {}
61 
62 // Creates an active client-side call, assigning it a new ID.
Call(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)63 Call::Call(LockedEndpoint& client,
64            uint32_t channel_id,
65            uint32_t service_id,
66            uint32_t method_id,
67            CallProperties properties)
68     : Call(client,
69            client.NewCallId(),
70            channel_id,
71            service_id,
72            method_id,
73            properties) {}
74 
Call(LockedEndpoint & endpoint_ref,uint32_t call_id,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)75 Call::Call(LockedEndpoint& endpoint_ref,
76            uint32_t call_id,
77            uint32_t channel_id,
78            uint32_t service_id,
79            uint32_t method_id,
80            CallProperties properties)
81     : endpoint_(&endpoint_ref),
82       channel_id_(channel_id),
83       id_(call_id),
84       service_id_(service_id),
85       method_id_(method_id),
86       state_(kActive | (HasClientStream(properties.method_type())
87                             ? static_cast<uint8_t>(kClientStreamActive)
88                             : 0u)),
89       awaiting_cleanup_(OkStatus().code()),
90       callbacks_executing_(0),
91       properties_(properties) {
92   PW_CHECK_UINT_NE(channel_id,
93                    Channel::kUnassignedChannelId,
94                    "Calls cannot be created with channel ID 0 "
95                    "(Channel::kUnassignedChannelId)");
96   endpoint().RegisterCall(*this);
97 }
98 
~Call()99 Call::~Call() {
100   // Note: this explicit deregistration is necessary to ensure that
101   // modifications to the endpoint call list occur while holding rpc_lock.
102   // Removing this explicit registration would result in unsynchronized
103   // modification of the endpoint call list via the destructor of the
104   // superclass `IntrusiveList<Call>::Item`.
105   RpcLockGuard lock;
106 
107   // This `active_locked()` guard is necessary to ensure that `endpoint()` is
108   // still valid.
109   if (active_locked()) {
110     endpoint().UnregisterCall(*this);
111   }
112 
113   do {
114     int iterations = 0;
115     while (CallbacksAreRunning()) {
116       PW_RPC_CHECK_FOR_DEADLOCK("destroy", *this);
117       YieldRpcLock();
118     }
119 
120   } while (CleanUpIfRequired());
121 
122   // Help prevent dangling references in callbacks by waiting for callbacks to
123   // complete before deleting this call.
124 }
125 
MoveFrom(Call & other)126 void Call::MoveFrom(Call& other) {
127   PW_DCHECK(!active_locked());
128   PW_DCHECK(!awaiting_cleanup() && !other.awaiting_cleanup());
129 
130   if (!other.active_locked()) {
131     return;  // Nothing else to do; this call is already closed.
132   }
133 
134   // An active call with an executing callback cannot be moved. Derived call
135   // classes must wait for callbacks to finish before calling MoveFrom.
136   PW_DCHECK(!other.CallbacksAreRunning());
137 
138   // Copy all members from the other call.
139   endpoint_ = other.endpoint_;
140   channel_id_ = other.channel_id_;
141   id_ = other.id_;
142   service_id_ = other.service_id_;
143   method_id_ = other.method_id_;
144 
145   state_ = other.state_;
146 
147   // No need to move awaiting_cleanup_, since it is 0 in both calls here.
148 
149   properties_ = other.properties_;
150 
151   // callbacks_executing_ is not moved since it is associated with the object in
152   // memory, not the call.
153 
154   on_error_ = std::move(other.on_error_);
155   on_next_ = std::move(other.on_next_);
156 
157   // Mark the other call inactive, unregister it, and register this one.
158   other.MarkClosed();
159 
160   endpoint().UnregisterCall(other);
161   endpoint().RegisterUniqueCall(*this);
162 }
163 
WaitUntilReadyForMove(Call & destination,Call & source)164 void Call::WaitUntilReadyForMove(Call& destination, Call& source) {
165   do {
166     // Wait for the source's callbacks to finish if it is active.
167     int iterations = 0;
168     while (source.active_locked() && source.CallbacksAreRunning()) {
169       PW_RPC_CHECK_FOR_DEADLOCK("move", source);
170       YieldRpcLock();
171     }
172 
173     // At this point, no callbacks are running in the source call. If cleanup
174     // is required for the destination call, perform it and retry since
175     // cleanup releases and reacquires the RPC lock.
176   } while (source.CleanUpIfRequired() || destination.CleanUpIfRequired());
177 }
178 
CallOnError(Status error)179 void Call::CallOnError(Status error) {
180   auto on_error_local = std::move(on_error_);
181 
182   CallbackStarted();
183 
184   rpc_lock().unlock();
185   if (on_error_local) {
186     on_error_local(error);
187   }
188 
189   // This mutex lock could be avoided by making callbacks_executing_ atomic.
190   RpcLockGuard lock;
191   CallbackFinished();
192 }
193 
CleanUpIfRequired()194 bool Call::CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
195   if (!awaiting_cleanup()) {
196     return false;
197   }
198   endpoint_->CleanUpCall(*this);
199   rpc_lock().lock();
200   return true;
201 }
202 
SendPacket(PacketType type,ConstByteSpan payload,Status status)203 Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
204   if (!active_locked()) {
205     encoding_buffer.ReleaseIfAllocated();
206     return Status::FailedPrecondition();
207   }
208 
209   Channel* channel = endpoint_->GetInternalChannel(channel_id_);
210   if (channel == nullptr) {
211     encoding_buffer.ReleaseIfAllocated();
212     return Status::Unavailable();
213   }
214   return channel->Send(MakePacket(type, payload, status));
215 }
216 
CloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)217 Status Call::CloseAndSendFinalPacketLocked(PacketType type,
218                                            ConstByteSpan response,
219                                            Status status) {
220   const Status send_status = SendPacket(type, response, status);
221   UnregisterAndMarkClosed();
222   return send_status;
223 }
224 
WriteLocked(ConstByteSpan payload)225 Status Call::WriteLocked(ConstByteSpan payload) {
226   return SendPacket(properties_.call_type() == kServerCall
227                         ? PacketType::SERVER_STREAM
228                         : PacketType::CLIENT_STREAM,
229                     payload);
230 }
231 
232 // This definition is in the .cc file because the Endpoint class is not defined
233 // in the Call header, due to circular dependencies between the two.
CloseAndMarkForCleanup(Status error)234 void Call::CloseAndMarkForCleanup(Status error) {
235   endpoint_->CloseCallAndMarkForCleanup(*this, error);
236 }
237 
HandlePayload(ConstByteSpan payload)238 void Call::HandlePayload(ConstByteSpan payload) {
239   // pw_rpc only supports handling packets for a particular RPC one at a time.
240   // Check if any callbacks are running and drop the packet if they are.
241   //
242   // The on_next callback cannot support multiple packets at once since it is
243   // moved before it is invoked. on_error and on_completed are only called
244   // after the call is closed.
245   if (CallbacksAreRunning()) {
246     PW_LOG_WARN(
247         "Received stream packet for %u:%08x/%08x before the callback for a "
248         "previous packet completed! This packet will be dropped. This can be "
249         "avoided by handling packets for a particular RPC on only one thread.",
250         static_cast<unsigned>(channel_id_),
251         static_cast<unsigned>(service_id_),
252         static_cast<unsigned>(method_id_));
253     rpc_lock().unlock();
254     return;
255   }
256 
257   if (on_next_ == nullptr) {
258     rpc_lock().unlock();
259     return;
260   }
261 
262   const uint32_t original_id = id();
263   auto on_next_local = std::move(on_next_);
264   CallbackStarted();
265 
266   if (hold_lock_while_invoking_callback_with_payload()) {
267     on_next_local(payload);
268   } else {
269     rpc_lock().unlock();
270     on_next_local(payload);
271     rpc_lock().lock();
272   }
273 
274   CallbackFinished();
275 
276   // Restore the original callback if the original call is still active and
277   // the callback has not been replaced.
278   // NOLINTNEXTLINE(bugprone-use-after-move)
279   if (active_locked() && id() == original_id && on_next_ == nullptr) {
280     on_next_ = std::move(on_next_local);
281   }
282 
283   // Clean up calls in case decoding failed.
284   endpoint_->CleanUpCalls();
285 }
286 
UnregisterAndMarkClosed()287 void Call::UnregisterAndMarkClosed() {
288   if (active_locked()) {
289     endpoint().UnregisterCall(*this);
290     MarkClosed();
291   }
292 }
293 
294 }  // namespace pw::rpc::internal
295