1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 // clang-format off
16 #include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
17
18 #include "pw_rpc/internal/endpoint.h"
19 // clang-format on
20
21 #include "pw_log/log.h"
22 #include "pw_rpc/internal/lock.h"
23
24 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_BUSY_LOOP
25
26 static_assert(
27 PW_RPC_USE_GLOBAL_MUTEX == 0,
28 "The RPC global mutex is enabled, but no pw_rpc yield mode is selected! "
29 "Because the global mutex is in use, pw_rpc may be used from multiple "
30 "threads. This could result in thread starvation. To fix this, set "
31 "PW_RPC_YIELD to PW_RPC_YIELD_MODE_SLEEP and add a dependency on "
32 "pw_thread:sleep.");
33
34 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
35
36 #include <chrono>
37
38 #if !__has_include("pw_thread/sleep.h")
39
40 static_assert(false,
41 "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_SLEEP "
42 "(pw::this_thread::sleep_for()), but no backend is set for "
43 "pw_thread:sleep. Set a pw_thread:sleep backend or use a "
44 "different PW_RPC_YIELD_MODE setting.");
45
46 #endif // !__has_include("pw_thread/sleep.h")
47
48 #include "pw_thread/sleep.h"
49
50 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
51
52 #if !__has_include("pw_thread/yield.h")
53
54 static_assert(false,
55 "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_YIELD "
56 "(pw::this_thread::yield()), but no backend is set for "
57 "pw_thread:yield. Set a pw_thread:yield backend or use a "
58 "different PW_RPC_YIELD_MODE setting.");
59
60 #endif // !__has_include("pw_thread/yield.h")
61
62 #include "pw_thread/yield.h"
63
64 #else
65
66 static_assert(
67 false,
68 "PW_RPC_YIELD_MODE macro must be set to PW_RPC_YIELD_MODE_BUSY_LOOP, "
69 "PW_RPC_YIELD_MODE_SLEEP (pw::this_thread::sleep_for()), or "
70 "PW_RPC_YIELD_MODE_YIELD (pw::this_thread::yield())");
71
72 #endif // PW_RPC_YIELD_MODE
73
74 namespace pw::rpc::internal {
75
YieldRpcLock()76 void YieldRpcLock() {
77 rpc_lock().unlock();
78 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
79 static constexpr chrono::SystemClock::duration kSleepDuration =
80 PW_RPC_YIELD_SLEEP_DURATION;
81 this_thread::sleep_for(kSleepDuration);
82 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
83 this_thread::yield();
84 #endif // PW_RPC_YIELD_MODE
85 rpc_lock().lock();
86 }
87
ProcessPacket(span<const std::byte> data,Packet::Destination destination)88 Result<Packet> Endpoint::ProcessPacket(span<const std::byte> data,
89 Packet::Destination destination) {
90 Result<Packet> result = Packet::FromBuffer(data);
91
92 if (!result.ok()) {
93 PW_LOG_WARN("Failed to decode pw_rpc packet");
94 return Status::DataLoss();
95 }
96
97 Packet& packet = *result;
98
99 if (packet.channel_id() == Channel::kUnassignedChannelId ||
100 packet.service_id() == 0 || packet.method_id() == 0) {
101 PW_LOG_WARN("Received malformed pw_rpc packet");
102 return Status::DataLoss();
103 }
104
105 if (packet.destination() != destination) {
106 return Status::InvalidArgument();
107 }
108
109 return result;
110 }
111
RegisterCall(Call & new_call)112 void Endpoint::RegisterCall(Call& new_call) {
113 // Mark any exisitng duplicate calls as cancelled.
114 auto [before_call, call] = FindIteratorsForCall(new_call);
115 if (call != calls_.end()) {
116 CloseCallAndMarkForCleanup(before_call, call, Status::Cancelled());
117 }
118
119 // Register the new call.
120 calls_.push_front(new_call);
121 }
122
123 std::tuple<IntrusiveList<Call>::iterator, IntrusiveList<Call>::iterator>
FindIteratorsForCall(uint32_t channel_id,uint32_t service_id,uint32_t method_id,uint32_t call_id)124 Endpoint::FindIteratorsForCall(uint32_t channel_id,
125 uint32_t service_id,
126 uint32_t method_id,
127 uint32_t call_id) {
128 auto previous = calls_.before_begin();
129 auto call = calls_.begin();
130
131 while (call != calls_.end()) {
132 if (channel_id == call->channel_id_locked() &&
133 service_id == call->service_id() && method_id == call->method_id()) {
134 if (call_id == call->id() || call_id == kOpenCallId) {
135 break;
136 }
137 if (call->id() == kOpenCallId) {
138 // Calls with ID of `kOpenCallId` were unrequested, and
139 // are updated to have the call ID of the first matching request.
140 call->set_id(call_id);
141 break;
142 }
143 }
144 previous = call;
145 ++call;
146 }
147
148 return {previous, call};
149 }
150
CloseChannel(uint32_t channel_id)151 Status Endpoint::CloseChannel(uint32_t channel_id) {
152 rpc_lock().lock();
153
154 Channel* channel = channels_.Get(channel_id);
155 if (channel == nullptr) {
156 rpc_lock().unlock();
157 return Status::NotFound();
158 }
159 channel->Close();
160
161 // Close pending calls on the channel that's going away.
162 AbortCalls(AbortIdType::kChannel, channel_id);
163
164 CleanUpCalls();
165
166 return OkStatus();
167 }
168
AbortCalls(AbortIdType type,uint32_t id)169 void Endpoint::AbortCalls(AbortIdType type, uint32_t id) {
170 auto previous = calls_.before_begin();
171 auto current = calls_.begin();
172
173 while (current != calls_.end()) {
174 if (id == (type == AbortIdType::kChannel ? current->channel_id_locked()
175 : current->service_id())) {
176 current =
177 CloseCallAndMarkForCleanup(previous, current, Status::Aborted());
178 } else {
179 previous = current;
180 ++current;
181 }
182 }
183 }
184
CleanUpCalls()185 void Endpoint::CleanUpCalls() {
186 if (to_cleanup_.empty()) {
187 rpc_lock().unlock();
188 return;
189 }
190
191 // Drain the to_cleanup_ list. This while loop is structured to avoid
192 // unnecessarily acquiring the lock after popping the last call.
193 while (true) {
194 Call& call = to_cleanup_.front();
195 to_cleanup_.pop_front();
196
197 const bool done = to_cleanup_.empty();
198
199 call.CleanUpFromEndpoint();
200
201 if (done) {
202 return;
203 }
204
205 rpc_lock().lock();
206 }
207 }
208
RemoveAllCalls()209 void Endpoint::RemoveAllCalls() {
210 RpcLockGuard lock;
211
212 // Close all calls without invoking on_error callbacks, since the calls should
213 // have been closed before the Endpoint was deleted.
214 while (!calls_.empty()) {
215 calls_.front().CloseFromDeletedEndpoint();
216 calls_.pop_front();
217 }
218 while (!to_cleanup_.empty()) {
219 to_cleanup_.front().CloseFromDeletedEndpoint();
220 to_cleanup_.pop_front();
221 }
222 }
223
224 } // namespace pw::rpc::internal
225