1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <cstdint> 17 18 #include "pw_bytes/span.h" 19 #include "pw_function/function.h" 20 #include "pw_rpc/internal/call.h" 21 #include "pw_rpc/internal/lock.h" 22 23 namespace pw::rpc::internal { 24 25 // A Call object, as used by an RPC client. 26 class ClientCall : public Call { 27 public: ~ClientCall()28 ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) { 29 rpc_lock().lock(); 30 CloseClientCall(); 31 rpc_lock().unlock(); 32 } 33 34 protected: 35 constexpr ClientCall() = default; 36 ClientCall(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type)37 ClientCall(Endpoint& client, 38 uint32_t channel_id, 39 uint32_t service_id, 40 uint32_t method_id, 41 MethodType type) 42 : Call(client, channel_id, service_id, method_id, type) {} 43 44 // Sends CLIENT_STREAM_END if applicable, releases any held payload buffer, 45 // and marks the call as closed. 46 void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); 47 MoveClientCallFrom(ClientCall & other)48 void MoveClientCallFrom(ClientCall& other) 49 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 50 CloseClientCall(); 51 MoveFrom(other); 52 } 53 }; 54 55 // Unary response client calls receive both a payload and the status in their 56 // on_completed callback. The on_next callback is not used. 57 class UnaryResponseClientCall : public ClientCall { 58 public: 59 template <typename CallType> Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,Function<void (ConstByteSpan,Status)> && on_completed,Function<void (Status)> && on_error,ConstByteSpan request)60 static CallType Start(Endpoint& client, 61 uint32_t channel_id, 62 uint32_t service_id, 63 uint32_t method_id, 64 Function<void(ConstByteSpan, Status)>&& on_completed, 65 Function<void(Status)>&& on_error, 66 ConstByteSpan request) PW_LOCKS_EXCLUDED(rpc_lock()) { 67 rpc_lock().lock(); 68 CallType call(client, channel_id, service_id, method_id); 69 call.set_on_completed_locked(std::move(on_completed)); 70 call.set_on_error_locked(std::move(on_error)); 71 72 call.SendInitialClientRequest(request); 73 return call; 74 } 75 HandleCompleted(ConstByteSpan response,Status status)76 void HandleCompleted(ConstByteSpan response, Status status) 77 PW_UNLOCK_FUNCTION(rpc_lock()) { 78 const bool invoke_callback = on_completed_ != nullptr; 79 UnregisterAndMarkClosed(); 80 81 rpc_lock().unlock(); 82 if (invoke_callback) { 83 on_completed_(response, status); 84 } 85 } 86 87 protected: 88 constexpr UnaryResponseClientCall() = default; 89 UnaryResponseClientCall(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type)90 UnaryResponseClientCall(Endpoint& client, 91 uint32_t channel_id, 92 uint32_t service_id, 93 uint32_t method_id, 94 MethodType type) 95 : ClientCall(client, channel_id, service_id, method_id, type) {} 96 UnaryResponseClientCall(UnaryResponseClientCall && other)97 UnaryResponseClientCall(UnaryResponseClientCall&& other) { 98 *this = std::move(other); 99 } 100 101 UnaryResponseClientCall& operator=(UnaryResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())102 PW_LOCKS_EXCLUDED(rpc_lock()) { 103 LockGuard lock(rpc_lock()); 104 MoveUnaryResponseClientCallFrom(other); 105 return *this; 106 } 107 MoveUnaryResponseClientCallFrom(UnaryResponseClientCall & other)108 void MoveUnaryResponseClientCallFrom(UnaryResponseClientCall& other) 109 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 110 MoveClientCallFrom(other); 111 on_completed_ = std::move(other.on_completed_); 112 } 113 set_on_completed(Function<void (ConstByteSpan,Status)> && on_completed)114 void set_on_completed(Function<void(ConstByteSpan, Status)>&& on_completed) 115 PW_LOCKS_EXCLUDED(rpc_lock()) { 116 // TODO(pwbug/597): Ensure on_completed_ is properly guarded. 117 LockGuard lock(rpc_lock()); 118 set_on_completed_locked(std::move(on_completed)); 119 } 120 set_on_completed_locked(Function<void (ConstByteSpan,Status)> && on_completed)121 void set_on_completed_locked( 122 Function<void(ConstByteSpan, Status)>&& on_completed) 123 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 124 on_completed_ = std::move(on_completed); 125 } 126 127 private: 128 using internal::ClientCall::set_on_next; // Not used in unary response calls. 129 130 Function<void(ConstByteSpan, Status)> on_completed_; 131 }; 132 133 // Stream response client calls only receive the status in their on_completed 134 // callback. Payloads are sent through the on_next callback. 135 class StreamResponseClientCall : public ClientCall { 136 public: 137 template <typename CallType> Start(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,Function<void (ConstByteSpan)> && on_next,Function<void (Status)> && on_completed,Function<void (Status)> && on_error,ConstByteSpan request)138 static CallType Start(Endpoint& client, 139 uint32_t channel_id, 140 uint32_t service_id, 141 uint32_t method_id, 142 Function<void(ConstByteSpan)>&& on_next, 143 Function<void(Status)>&& on_completed, 144 Function<void(Status)>&& on_error, 145 ConstByteSpan request) { 146 rpc_lock().lock(); 147 CallType call(client, channel_id, service_id, method_id); 148 149 call.set_on_next_locked(std::move(on_next)); 150 call.set_on_completed_locked(std::move(on_completed)); 151 call.set_on_error_locked(std::move(on_error)); 152 153 call.SendInitialClientRequest(request); 154 return call; 155 } 156 HandleCompleted(Status status)157 void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) { 158 const bool invoke_callback = on_completed_ != nullptr; 159 160 UnregisterAndMarkClosed(); 161 rpc_lock().unlock(); 162 163 // TODO(pwbug/597): Ensure on_completed_ is properly guarded. 164 if (invoke_callback) { 165 on_completed_(status); 166 } 167 } 168 169 protected: 170 constexpr StreamResponseClientCall() = default; 171 StreamResponseClientCall(Endpoint & client,uint32_t channel_id,uint32_t service_id,uint32_t method_id,MethodType type)172 StreamResponseClientCall(Endpoint& client, 173 uint32_t channel_id, 174 uint32_t service_id, 175 uint32_t method_id, 176 MethodType type) 177 : ClientCall(client, channel_id, service_id, method_id, type) {} 178 StreamResponseClientCall(StreamResponseClientCall && other)179 StreamResponseClientCall(StreamResponseClientCall&& other) { 180 *this = std::move(other); 181 } 182 183 StreamResponseClientCall& operator=(StreamResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock ())184 PW_LOCKS_EXCLUDED(rpc_lock()) { 185 LockGuard lock(rpc_lock()); 186 MoveStreamResponseClientCallFrom(other); 187 return *this; 188 } 189 MoveStreamResponseClientCallFrom(StreamResponseClientCall & other)190 void MoveStreamResponseClientCallFrom(StreamResponseClientCall& other) 191 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 192 MoveClientCallFrom(other); 193 on_completed_ = std::move(other.on_completed_); 194 } 195 set_on_completed(Function<void (Status)> && on_completed)196 void set_on_completed(Function<void(Status)>&& on_completed) 197 PW_LOCKS_EXCLUDED(rpc_lock()) { 198 // TODO(pwbug/597): Ensure on_completed_ is properly guarded. 199 LockGuard lock(rpc_lock()); 200 set_on_completed_locked(std::move(on_completed)); 201 } 202 set_on_completed_locked(Function<void (Status)> && on_completed)203 void set_on_completed_locked(Function<void(Status)>&& on_completed) 204 PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { 205 on_completed_ = std::move(on_completed); 206 } 207 208 private: 209 Function<void(Status)> on_completed_; 210 }; 211 212 } // namespace pw::rpc::internal 213