• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // clang-format off
16 #include "pw_rpc/internal/log_config.h"  // PW_LOG_* macros must be first.
17 
18 #include "pw_rpc/internal/endpoint.h"
19 // clang-format on
20 
21 #include "pw_log/log.h"
22 #include "pw_rpc/internal/lock.h"
23 
24 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_BUSY_LOOP
25 
26 static_assert(
27     PW_RPC_USE_GLOBAL_MUTEX == 0,
28     "The RPC global mutex is enabled, but no pw_rpc yield mode is selected! "
29     "Because the global mutex is in use, pw_rpc may be used from multiple "
30     "threads. This could result in thread starvation. To fix this, set "
31     "PW_RPC_YIELD to PW_RPC_YIELD_MODE_SLEEP and add a dependency on "
32     "pw_thread:sleep.");
33 
34 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
35 
36 #include <chrono>
37 
38 #if !__has_include("pw_thread/sleep.h")
39 
40 static_assert(false,
41               "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_SLEEP "
42               "(pw::this_thread::sleep_for()), but no backend is set for "
43               "pw_thread:sleep. Set a pw_thread:sleep backend or use a "
44               "different PW_RPC_YIELD_MODE setting.");
45 
46 #endif  // !__has_include("pw_thread/sleep.h")
47 
48 #include "pw_thread/sleep.h"
49 
50 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
51 
52 #if !__has_include("pw_thread/yield.h")
53 
54 static_assert(false,
55               "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_YIELD "
56               "(pw::this_thread::yield()), but no backend is set for "
57               "pw_thread:yield. Set a pw_thread:yield backend or use a "
58               "different PW_RPC_YIELD_MODE setting.");
59 
60 #endif  // !__has_include("pw_thread/yield.h")
61 
62 #include "pw_thread/yield.h"
63 
64 #else
65 
66 static_assert(
67     false,
68     "PW_RPC_YIELD_MODE macro must be set to PW_RPC_YIELD_MODE_BUSY_LOOP, "
69     "PW_RPC_YIELD_MODE_SLEEP (pw::this_thread::sleep_for()), or "
70     "PW_RPC_YIELD_MODE_YIELD (pw::this_thread::yield())");
71 
72 #endif  // PW_RPC_YIELD_MODE
73 
74 namespace pw::rpc::internal {
75 
YieldRpcLock()76 void YieldRpcLock() {
77   rpc_lock().unlock();
78 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
79   static constexpr chrono::SystemClock::duration kSleepDuration =
80       PW_RPC_YIELD_SLEEP_DURATION;
81   this_thread::sleep_for(kSleepDuration);
82 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
83   this_thread::yield();
84 #endif  // PW_RPC_YIELD_MODE
85   rpc_lock().lock();
86 }
87 
ProcessPacket(span<const std::byte> data,Packet::Destination destination)88 Result<Packet> Endpoint::ProcessPacket(span<const std::byte> data,
89                                        Packet::Destination destination) {
90   Result<Packet> result = Packet::FromBuffer(data);
91 
92   if (!result.ok()) {
93     PW_LOG_WARN("Failed to decode pw_rpc packet");
94     return Status::DataLoss();
95   }
96 
97   Packet& packet = *result;
98 
99   if (packet.channel_id() == Channel::kUnassignedChannelId ||
100       packet.service_id() == 0 || packet.method_id() == 0) {
101     PW_LOG_WARN("Received malformed pw_rpc packet");
102     return Status::DataLoss();
103   }
104 
105   if (packet.destination() != destination) {
106     return Status::InvalidArgument();
107   }
108 
109   return result;
110 }
111 
RegisterCall(Call & new_call)112 void Endpoint::RegisterCall(Call& new_call) {
113   // Mark any exisitng duplicate calls as cancelled.
114   auto [before_call, call] = FindIteratorsForCall(new_call);
115   if (call != calls_.end()) {
116     CloseCallAndMarkForCleanup(before_call, call, Status::Cancelled());
117   }
118 
119   // Register the new call.
120   calls_.push_front(new_call);
121 }
122 
123 std::tuple<IntrusiveList<Call>::iterator, IntrusiveList<Call>::iterator>
FindIteratorsForCall(uint32_t channel_id,uint32_t service_id,uint32_t method_id,uint32_t call_id)124 Endpoint::FindIteratorsForCall(uint32_t channel_id,
125                                uint32_t service_id,
126                                uint32_t method_id,
127                                uint32_t call_id) {
128   auto previous = calls_.before_begin();
129   auto call = calls_.begin();
130 
131   while (call != calls_.end()) {
132     if (channel_id == call->channel_id_locked() &&
133         service_id == call->service_id() && method_id == call->method_id()) {
134       if (call_id == call->id() || call_id == kOpenCallId) {
135         break;
136       }
137       if (call->id() == kOpenCallId) {
138         // Calls with ID of `kOpenCallId` were unrequested, and
139         // are updated to have the call ID of the first matching request.
140         call->set_id(call_id);
141         break;
142       }
143     }
144     previous = call;
145     ++call;
146   }
147 
148   return {previous, call};
149 }
150 
CloseChannel(uint32_t channel_id)151 Status Endpoint::CloseChannel(uint32_t channel_id) {
152   rpc_lock().lock();
153 
154   Channel* channel = channels_.Get(channel_id);
155   if (channel == nullptr) {
156     rpc_lock().unlock();
157     return Status::NotFound();
158   }
159   channel->Close();
160 
161   // Close pending calls on the channel that's going away.
162   AbortCalls(AbortIdType::kChannel, channel_id);
163 
164   CleanUpCalls();
165 
166   return OkStatus();
167 }
168 
AbortCalls(AbortIdType type,uint32_t id)169 void Endpoint::AbortCalls(AbortIdType type, uint32_t id) {
170   auto previous = calls_.before_begin();
171   auto current = calls_.begin();
172 
173   while (current != calls_.end()) {
174     if (id == (type == AbortIdType::kChannel ? current->channel_id_locked()
175                                              : current->service_id())) {
176       current =
177           CloseCallAndMarkForCleanup(previous, current, Status::Aborted());
178     } else {
179       previous = current;
180       ++current;
181     }
182   }
183 }
184 
CleanUpCalls()185 void Endpoint::CleanUpCalls() {
186   if (to_cleanup_.empty()) {
187     rpc_lock().unlock();
188     return;
189   }
190 
191   // Drain the to_cleanup_ list. This while loop is structured to avoid
192   // unnecessarily acquiring the lock after popping the last call.
193   while (true) {
194     Call& call = to_cleanup_.front();
195     to_cleanup_.pop_front();
196 
197     const bool done = to_cleanup_.empty();
198 
199     call.CleanUpFromEndpoint();
200 
201     if (done) {
202       return;
203     }
204 
205     rpc_lock().lock();
206   }
207 }
208 
RemoveAllCalls()209 void Endpoint::RemoveAllCalls() {
210   RpcLockGuard lock;
211 
212   // Close all calls without invoking on_error callbacks, since the calls should
213   // have been closed before the Endpoint was deleted.
214   while (!calls_.empty()) {
215     calls_.front().CloseFromDeletedEndpoint();
216     calls_.pop_front();
217   }
218   while (!to_cleanup_.empty()) {
219     to_cleanup_.front().CloseFromDeletedEndpoint();
220     to_cleanup_.pop_front();
221   }
222 }
223 
224 }  // namespace pw::rpc::internal
225