• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <signal.h>
17 
18 #include <atomic>
19 #include <mutex>
20 
21 #include "pw_assert/assert.h"
22 #include "pw_bytes/span.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_rpc_transport/rpc_transport.h"
25 #include "pw_status/status.h"
26 #include "pw_status/try.h"
27 #include "pw_stream/socket_stream.h"
28 #include "pw_sync/condition_variable.h"
29 #include "pw_sync/lock_annotations.h"
30 #include "pw_sync/mutex.h"
31 #include "pw_sync/thread_notification.h"
32 #include "pw_thread/sleep.h"
33 #include "pw_thread/thread_core.h"
34 
35 namespace pw::rpc {
36 
37 namespace internal {
38 
39 void LogSocketListenError(Status);
40 void LogSocketAcceptError(Status);
41 void LogSocketConnectError(Status);
42 void LogSocketReadError(Status);
43 void LogSocketIngressHandlerError(Status);
44 
45 }  // namespace internal
46 
47 template <size_t kReadBufferSize>
48 class SocketRpcTransport : public RpcFrameSender, public thread::ThreadCore {
49  public:
50   struct AsServer {};
51   struct AsClient {};
52 
53   static constexpr AsServer kAsServer{};
54   static constexpr AsClient kAsClient{};
55 
SocketRpcTransport(AsServer,uint16_t port)56   SocketRpcTransport(AsServer, uint16_t port)
57       : role_(ClientServerRole::kServer), port_(port) {}
58 
SocketRpcTransport(AsServer,uint16_t port,RpcIngressHandler & ingress)59   SocketRpcTransport(AsServer, uint16_t port, RpcIngressHandler& ingress)
60       : role_(ClientServerRole::kServer), port_(port), ingress_(&ingress) {}
61 
SocketRpcTransport(AsClient,std::string_view host,uint16_t port)62   SocketRpcTransport(AsClient, std::string_view host, uint16_t port)
63       : role_(ClientServerRole::kClient), host_(host), port_(port) {}
64 
SocketRpcTransport(AsClient,std::string_view host,uint16_t port,RpcIngressHandler & ingress)65   SocketRpcTransport(AsClient,
66                      std::string_view host,
67                      uint16_t port,
68                      RpcIngressHandler& ingress)
69       : role_(ClientServerRole::kClient),
70         host_(host),
71         port_(port),
72         ingress_(&ingress) {}
73 
MaximumTransmissionUnit()74   size_t MaximumTransmissionUnit() const override { return kReadBufferSize; }
port()75   size_t port() const { return port_; }
set_ingress(RpcIngressHandler & ingress)76   void set_ingress(RpcIngressHandler& ingress) { ingress_ = &ingress; }
77 
Send(RpcFrame frame)78   Status Send(RpcFrame frame) override {
79     std::lock_guard lock(write_mutex_);
80     PW_TRY(socket_stream_.Write(frame.header));
81     PW_TRY(socket_stream_.Write(frame.payload));
82     return OkStatus();
83   }
84 
85   // Returns once the transport is connected to its peer.
WaitUntilConnected()86   void WaitUntilConnected() {
87     std::unique_lock lock(connected_mutex_);
88     connected_cv_.wait(lock, [this]() { return connected_; });
89   }
90 
91   // Returns once the transport is ready to be used (e.g. the server is
92   // listening on the port or the client is ready to connect).
WaitUntilReady()93   void WaitUntilReady() {
94     std::unique_lock lock(ready_mutex_);
95     ready_cv_.wait(lock, [this]() { return ready_; });
96   }
97 
Start()98   void Start() {
99     while (!stopped_) {
100       const auto connect_status = EstablishConnection();
101       if (!connect_status.ok()) {
102         this_thread::sleep_for(kConnectionRetryPeriod);
103         continue;
104       }
105       NotifyConnected();
106 
107       while (!stopped_) {
108         const auto read_status = ReadData();
109         // Break if ReadData was cancelled after the transport was stopped.
110         if (stopped_) {
111           break;
112         }
113         if (!read_status.ok()) {
114           internal::LogSocketReadError(read_status);
115         }
116         if (read_status.IsOutOfRange()) {
117           // Need to reconnect (we don't close the stream here because it's
118           // already done in SocketStream::DoRead).
119           {
120             std::lock_guard lock(connected_mutex_);
121             connected_ = false;
122           }
123           break;
124         }
125       }
126     }
127   }
128 
Stop()129   void Stop() {
130     stopped_ = true;
131     socket_stream_.Close();
132     server_socket_.Close();
133   }
134 
135  private:
136   enum class ClientServerRole { kClient, kServer };
137   static constexpr chrono::SystemClock::duration kConnectionRetryPeriod =
138       std::chrono::milliseconds(100);
139 
Run()140   void Run() override { Start(); }
141 
142   // Establishes or accepts a new socket connection. Returns when socket_stream_
143   // contains a valid socket connection, or when the transport is stopped.
EstablishConnection()144   Status EstablishConnection() {
145     if (role_ == ClientServerRole::kServer) {
146       return Serve();
147     }
148     return Connect();
149   }
150 
Serve()151   Status Serve() {
152     PW_DASSERT(role_ == ClientServerRole::kServer);
153 
154     if (!listening_) {
155       const auto listen_status = server_socket_.Listen(port_);
156       if (!listen_status.ok()) {
157         internal::LogSocketListenError(listen_status);
158         return listen_status;
159       }
160     }
161 
162     listening_ = true;
163     port_ = server_socket_.port();
164     NotifyReady();
165 
166     Result<stream::SocketStream> stream = server_socket_.Accept();
167     // If Accept was cancelled due to stopping the transport, return without
168     // error.
169     if (stopped_) {
170       return OkStatus();
171     }
172     if (!stream.ok()) {
173       internal::LogSocketAcceptError(stream.status());
174       return stream.status();
175     }
176     // Ensure that the writer is done writing before updating the stream.
177     std::lock_guard lock(write_mutex_);
178     socket_stream_ = std::move(*stream);
179     return OkStatus();
180   }
181 
Connect()182   Status Connect() {
183     PW_DASSERT(role_ == ClientServerRole::kClient);
184     NotifyReady();
185 
186     std::lock_guard lock(write_mutex_);
187     auto connect_status = socket_stream_.Connect(host_.c_str(), port_);
188     if (!connect_status.ok()) {
189       internal::LogSocketConnectError(connect_status);
190     }
191     return connect_status;
192   }
193 
ReadData()194   Status ReadData() {
195     PW_DASSERT(ingress_ != nullptr);
196     PW_TRY_ASSIGN(auto buffer, socket_stream_.Read(read_buffer_));
197     const auto ingress_status = ingress_->ProcessIncomingData(buffer);
198     if (!ingress_status.ok()) {
199       internal::LogSocketIngressHandlerError(ingress_status);
200     }
201     // ReadData only returns socket stream read errors; ingress errors are only
202     // logged.
203     return OkStatus();
204   }
205 
NotifyConnected()206   void NotifyConnected() {
207     {
208       std::lock_guard lock(connected_mutex_);
209       connected_ = true;
210     }
211     connected_cv_.notify_all();
212   }
213 
NotifyReady()214   void NotifyReady() {
215     {
216       std::lock_guard lock(ready_mutex_);
217       ready_ = true;
218     }
219     ready_cv_.notify_all();
220   }
221 
222   ClientServerRole role_;
223   const std::string host_;
224   std::atomic<uint16_t> port_;
225   RpcIngressHandler* ingress_ = nullptr;
226 
227   // write_mutex_ must be held by the thread performing socket writes.
228   sync::Mutex write_mutex_;
229   stream::SocketStream socket_stream_;
230   stream::ServerSocket server_socket_;
231 
232   sync::Mutex ready_mutex_;
233   sync::ConditionVariable ready_cv_;
234   bool ready_ = false;
235 
236   sync::Mutex connected_mutex_;
237   sync::ConditionVariable connected_cv_;
238   bool connected_ = false;
239 
240   std::atomic<bool> stopped_ = false;
241   bool listening_ = false;
242   std::array<std::byte, kReadBufferSize> read_buffer_{};
243 };
244 
245 }  // namespace pw::rpc
246