1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_rpc/internal/call.h"
16
17 #include "pw_assert/check.h"
18 #include "pw_log/log.h"
19 #include "pw_preprocessor/util.h"
20 #include "pw_rpc/client.h"
21 #include "pw_rpc/internal/encoding_buffer.h"
22 #include "pw_rpc/internal/endpoint.h"
23 #include "pw_rpc/internal/method.h"
24 #include "pw_rpc/server.h"
25
26 // If the callback timeout is enabled, count the number of iterations of the
27 // waiting loop and crash if it exceeds PW_RPC_CALLBACK_TIMEOUT_TICKS.
28 #if PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
29 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
30 iterations += 1; \
31 PW_CHECK( \
32 iterations < PW_RPC_CALLBACK_TIMEOUT_TICKS, \
33 "A callback for RPC %u:%08x/%08x has not finished after " \
34 PW_STRINGIFY(PW_RPC_CALLBACK_TIMEOUT_TICKS) \
35 " ticks. This may indicate that an RPC callback attempted to " \
36 timeout_source \
37 " its own call object, which is not permitted. Fix this condition or " \
38 "change the value of PW_RPC_CALLBACK_TIMEOUT_TICKS to avoid this " \
39 "crash. See https://pigweed.dev/pw_rpc" \
40 "#destructors-moves-wait-for-callbacks-to-complete for details.", \
41 static_cast<unsigned>((call).channel_id_), \
42 static_cast<unsigned>((call).service_id_), \
43 static_cast<unsigned>((call).method_id_))
44 #else
45 #define PW_RPC_CHECK_FOR_DEADLOCK(timeout_source, call) \
46 static_cast<void>(iterations)
47 #endif // PW_RPC_CALLBACK_TIMEOUT_TICKS > 0
48
49 namespace pw::rpc::internal {
50
51 using pwpb::PacketType;
52
53 // Creates an active server-side Call.
Call(const LockedCallContext & context,CallProperties properties)54 Call::Call(const LockedCallContext& context, CallProperties properties)
55 : Call(context.server().ClaimLocked(),
56 context.call_id(),
57 context.channel_id(),
58 UnwrapServiceId(context.service().service_id()),
59 context.method().id(),
60 properties) {}
61
62 // Creates an active client-side call, assigning it a new ID.
Call(LockedEndpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)63 Call::Call(LockedEndpoint& client,
64 uint32_t channel_id,
65 uint32_t service_id,
66 uint32_t method_id,
67 CallProperties properties)
68 : Call(client,
69 client.NewCallId(),
70 channel_id,
71 service_id,
72 method_id,
73 properties) {}
74
Call(LockedEndpoint & endpoint_ref,uint32_t call_id,uint32_t channel_id,uint32_t service_id,uint32_t method_id,CallProperties properties)75 Call::Call(LockedEndpoint& endpoint_ref,
76 uint32_t call_id,
77 uint32_t channel_id,
78 uint32_t service_id,
79 uint32_t method_id,
80 CallProperties properties)
81 : endpoint_(&endpoint_ref),
82 channel_id_(channel_id),
83 id_(call_id),
84 service_id_(service_id),
85 method_id_(method_id),
86 // Note: Bit kActive set to 1 and kClientRequestedCompletion is set to 0.
87 state_(kActive),
88 awaiting_cleanup_(OkStatus().code()),
89 callbacks_executing_(0),
90 properties_(properties) {
91 PW_CHECK_UINT_NE(channel_id,
92 Channel::kUnassignedChannelId,
93 "Calls cannot be created with channel ID 0 "
94 "(Channel::kUnassignedChannelId)");
95 endpoint().RegisterCall(*this);
96 }
97
DestroyServerCall()98 void Call::DestroyServerCall() {
99 RpcLockGuard lock;
100 // Any errors are logged in Channel::Send.
101 CloseAndSendResponseLocked(OkStatus()).IgnoreError();
102 WaitForCallbacksToComplete();
103 state_ |= kHasBeenDestroyed;
104 }
105
DestroyClientCall()106 void Call::DestroyClientCall() {
107 RpcLockGuard lock;
108 CloseClientCall();
109 WaitForCallbacksToComplete();
110 state_ |= kHasBeenDestroyed;
111 }
112
WaitForCallbacksToComplete()113 void Call::WaitForCallbacksToComplete() {
114 do {
115 int iterations = 0;
116 while (CallbacksAreRunning()) {
117 PW_RPC_CHECK_FOR_DEADLOCK("destroy", *this);
118 YieldRpcLock();
119 }
120
121 } while (CleanUpIfRequired());
122 }
123
MoveFrom(Call & other)124 void Call::MoveFrom(Call& other) {
125 PW_DCHECK(!active_locked());
126 PW_DCHECK(!awaiting_cleanup() && !other.awaiting_cleanup());
127
128 if (!other.active_locked()) {
129 return; // Nothing else to do; this call is already closed.
130 }
131
132 // An active call with an executing callback cannot be moved. Derived call
133 // classes must wait for callbacks to finish before calling MoveFrom.
134 PW_DCHECK(!other.CallbacksAreRunning());
135
136 // Copy all members from the other call.
137 endpoint_ = other.endpoint_;
138 channel_id_ = other.channel_id_;
139 id_ = other.id_;
140 service_id_ = other.service_id_;
141 method_id_ = other.method_id_;
142
143 state_ = other.state_;
144
145 // No need to move awaiting_cleanup_, since it is 0 in both calls here.
146
147 properties_ = other.properties_;
148
149 // callbacks_executing_ is not moved since it is associated with the object in
150 // memory, not the call.
151
152 on_error_ = std::move(other.on_error_);
153 on_next_ = std::move(other.on_next_);
154
155 // Mark the other call inactive, unregister it, and register this one.
156 other.MarkClosed();
157
158 endpoint().UnregisterCall(other);
159 endpoint().RegisterUniqueCall(*this);
160 }
161
WaitUntilReadyForMove(Call & destination,Call & source)162 void Call::WaitUntilReadyForMove(Call& destination, Call& source) {
163 do {
164 // Wait for the source's callbacks to finish if it is active.
165 int iterations = 0;
166 while (source.active_locked() && source.CallbacksAreRunning()) {
167 PW_RPC_CHECK_FOR_DEADLOCK("move", source);
168 YieldRpcLock();
169 }
170
171 // At this point, no callbacks are running in the source call. If cleanup
172 // is required for the destination call, perform it and retry since
173 // cleanup releases and reacquires the RPC lock.
174 } while (source.CleanUpIfRequired() || destination.CleanUpIfRequired());
175 }
176
CallOnError(Status error)177 void Call::CallOnError(Status error) {
178 auto on_error_local = std::move(on_error_);
179
180 CallbackStarted();
181
182 rpc_lock().unlock();
183 if (on_error_local) {
184 on_error_local(error);
185 }
186
187 // This mutex lock could be avoided by making callbacks_executing_ atomic.
188 RpcLockGuard lock;
189 CallbackFinished();
190 }
191
CleanUpIfRequired()192 bool Call::CleanUpIfRequired() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
193 if (!awaiting_cleanup()) {
194 return false;
195 }
196 endpoint_->CleanUpCall(*this);
197 rpc_lock().lock();
198 return true;
199 }
200
SendPacket(PacketType type,ConstByteSpan payload,Status status)201 Status Call::SendPacket(PacketType type, ConstByteSpan payload, Status status) {
202 if (!active_locked()) {
203 encoding_buffer.ReleaseIfAllocated();
204 return Status::FailedPrecondition();
205 }
206
207 Channel* channel = endpoint_->GetInternalChannel(channel_id_);
208 if (channel == nullptr) {
209 encoding_buffer.ReleaseIfAllocated();
210 return Status::Unavailable();
211 }
212 return channel->Send(MakePacket(type, payload, status));
213 }
214
CloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)215 Status Call::CloseAndSendFinalPacketLocked(PacketType type,
216 ConstByteSpan response,
217 Status status) {
218 const Status send_status = SendPacket(type, response, status);
219 UnregisterAndMarkClosed();
220 return send_status;
221 }
222
TryCloseAndSendFinalPacketLocked(PacketType type,ConstByteSpan response,Status status)223 Status Call::TryCloseAndSendFinalPacketLocked(PacketType type,
224 ConstByteSpan response,
225 Status status) {
226 const Status send_status = SendPacket(type, response, status);
227 // Only close the call if the final packet gets sent out successfully.
228 if (send_status.ok()) {
229 UnregisterAndMarkClosed();
230 }
231 return send_status;
232 }
233
WriteLocked(ConstByteSpan payload)234 Status Call::WriteLocked(ConstByteSpan payload) {
235 return SendPacket(properties_.call_type() == kServerCall
236 ? PacketType::SERVER_STREAM
237 : PacketType::CLIENT_STREAM,
238 payload);
239 }
240
241 // This definition is in the .cc file because the Endpoint class is not defined
242 // in the Call header, due to circular dependencies between the two.
CloseAndMarkForCleanup(Status error)243 void Call::CloseAndMarkForCleanup(Status error) {
244 endpoint_->CloseCallAndMarkForCleanup(*this, error);
245 }
246
HandlePayload(ConstByteSpan payload)247 void Call::HandlePayload(ConstByteSpan payload) {
248 // pw_rpc only supports handling packets for a particular RPC one at a time.
249 // Check if any callbacks are running and drop the packet if they are.
250 //
251 // The on_next callback cannot support multiple packets at once since it is
252 // moved before it is invoked. on_error and on_completed are only called
253 // after the call is closed.
254 if (CallbacksAreRunning()) {
255 PW_LOG_WARN(
256 "Received stream packet for %u:%08x/%08x before the callback for a "
257 "previous packet completed! This packet will be dropped. This can be "
258 "avoided by handling packets for a particular RPC on only one thread.",
259 static_cast<unsigned>(channel_id_),
260 static_cast<unsigned>(service_id_),
261 static_cast<unsigned>(method_id_));
262 rpc_lock().unlock();
263 return;
264 }
265
266 if (on_next_ == nullptr) {
267 rpc_lock().unlock();
268 return;
269 }
270
271 const uint32_t original_id = id();
272 auto on_next_local = std::move(on_next_);
273 CallbackStarted();
274
275 if (hold_lock_while_invoking_callback_with_payload()) {
276 on_next_local(payload);
277 } else {
278 rpc_lock().unlock();
279 on_next_local(payload);
280 rpc_lock().lock();
281 }
282
283 CallbackFinished();
284
285 // Restore the original callback if the original call is still active and
286 // the callback has not been replaced.
287 // NOLINTNEXTLINE(bugprone-use-after-move)
288 if (active_locked() && id() == original_id && on_next_ == nullptr) {
289 on_next_ = std::move(on_next_local);
290 }
291
292 // Clean up calls in case decoding failed.
293 endpoint_->CleanUpCalls();
294 }
295
CloseClientCall()296 void Call::CloseClientCall() {
297 // When a client call is closed, for bidirectional and client streaming RPCs,
298 // the server may be waiting for client stream messages, so we need to notify
299 // the server that the client has requested for completion and no further
300 // requests should be expected from the client. For unary and server streaming
301 // RPCs, since the client is not sending messages, server does not need to be
302 // notified.
303 if (has_client_stream() && !client_requested_completion()) {
304 RequestCompletionLocked().IgnoreError();
305 }
306 UnregisterAndMarkClosed();
307 }
308
UnregisterAndMarkClosed()309 void Call::UnregisterAndMarkClosed() {
310 if (active_locked()) {
311 endpoint().UnregisterCall(*this);
312 MarkClosed();
313 }
314 }
315
DebugLog() const316 void Call::DebugLog() const PW_NO_LOCK_SAFETY_ANALYSIS {
317 PW_LOG_INFO(
318 "Call %p\n"
319 "\tEndpoint: %p\n"
320 "\tCall ID: %8u\n"
321 "\tChannel: %8u\n"
322 "\tService: %08x\n"
323 "\tMethod: %08x\n"
324 "\tState: %8x\n"
325 "\tCleanup: %8s\n"
326 "\tBusy CBs: %8x\n"
327 "\tType: %8d\n"
328 "\tClient: %8d\n"
329 "\tWrapped: %8d\n"
330 "\ton_error: %8d\n"
331 "\ton_next: %8d\n",
332 static_cast<const void*>(this),
333 static_cast<const void*>(endpoint_),
334 static_cast<unsigned>(id_),
335 static_cast<unsigned>(channel_id_),
336 static_cast<unsigned>(service_id_),
337 static_cast<unsigned>(method_id_),
338 static_cast<int>(state_),
339 Status(static_cast<Status::Code>(awaiting_cleanup_)).str(),
340 static_cast<int>(callbacks_executing_),
341 static_cast<int>(properties_.method_type()),
342 static_cast<int>(properties_.call_type()),
343 static_cast<int>(hold_lock_while_invoking_callback_with_payload()),
344 static_cast<int>(on_error_ == nullptr),
345 static_cast<int>(on_next_ == nullptr));
346 }
347
348 } // namespace pw::rpc::internal
349