• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_H
16 #include <grpc/event_engine/endpoint_config.h>
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/event_engine/memory_allocator.h>
19 #include <grpc/support/port_platform.h>
20 
21 #include <atomic>
22 #include <cstdint>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/base/thread_annotations.h"
29 #include "absl/container/flat_hash_map.h"
30 #include "absl/functional/any_invocable.h"
31 #include "absl/hash/hash.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/string_view.h"
35 #include "src/core/lib/event_engine/handle_containers.h"
36 #include "src/core/lib/event_engine/posix.h"
37 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
38 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
39 #include "src/core/lib/event_engine/ref_counted_dns_resolver_interface.h"
40 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
41 #include "src/core/lib/iomgr/port.h"
42 #include "src/core/lib/surface/init_internally.h"
43 #include "src/core/util/orphanable.h"
44 #include "src/core/util/sync.h"
45 
46 #ifdef GRPC_POSIX_SOCKET_TCP
47 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
48 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
49 #endif  // GRPC_POSIX_SOCKET_TCP
50 
51 namespace grpc_event_engine {
52 namespace experimental {
53 
54 #ifdef GRPC_POSIX_SOCKET_TCP
55 // A helper class to handle asynchronous connect operations.
56 class AsyncConnect {
57  public:
AsyncConnect(EventEngine::OnConnectCallback on_connect,std::shared_ptr<EventEngine> engine,ThreadPool * executor,grpc_event_engine::experimental::EventHandle * fd,MemoryAllocator && allocator,const grpc_event_engine::experimental::PosixTcpOptions & options,std::string resolved_addr_str,int64_t connection_handle)58   AsyncConnect(EventEngine::OnConnectCallback on_connect,
59                std::shared_ptr<EventEngine> engine, ThreadPool* executor,
60                grpc_event_engine::experimental::EventHandle* fd,
61                MemoryAllocator&& allocator,
62                const grpc_event_engine::experimental::PosixTcpOptions& options,
63                std::string resolved_addr_str, int64_t connection_handle)
64       : on_connect_(std::move(on_connect)),
65         engine_(engine),
66         executor_(executor),
67         fd_(fd),
68         allocator_(std::move(allocator)),
69         options_(options),
70         resolved_addr_str_(resolved_addr_str),
71         connection_handle_(connection_handle),
72         connect_cancelled_(false) {}
73 
74   void Start(EventEngine::Duration timeout);
75   ~AsyncConnect();
76 
77  private:
78   friend class PosixEventEngine;
79   void OnTimeoutExpired(absl::Status status);
80 
81   void OnWritable(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS;
82 
83   grpc_core::Mutex mu_;
84   grpc_event_engine::experimental::PosixEngineClosure* on_writable_ = nullptr;
85   EventEngine::OnConnectCallback on_connect_;
86   std::shared_ptr<EventEngine> engine_;
87   ThreadPool* executor_;
88   EventEngine::TaskHandle alarm_handle_;
89   int refs_{2};
90   grpc_event_engine::experimental::EventHandle* fd_;
91   MemoryAllocator allocator_;
92   grpc_event_engine::experimental::PosixTcpOptions options_;
93   std::string resolved_addr_str_;
94   int64_t connection_handle_;
95   bool connect_cancelled_;
96 };
97 
98 // A helper class to manager lifetime of the poller associated with the
99 // posix EventEngine.
100 class PosixEnginePollerManager
101     : public grpc_event_engine::experimental::Scheduler {
102  public:
103   explicit PosixEnginePollerManager(std::shared_ptr<ThreadPool> executor);
104   explicit PosixEnginePollerManager(
105       std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
106           poller);
Poller()107   grpc_event_engine::experimental::PosixEventPoller* Poller() {
108     return poller_.get();
109   }
110 
Executor()111   ThreadPool* Executor() { return executor_.get(); }
112 
113   void Run(experimental::EventEngine::Closure* closure) override;
114   void Run(absl::AnyInvocable<void()>) override;
115 
IsShuttingDown()116   bool IsShuttingDown() {
117     return poller_state_.load(std::memory_order_acquire) ==
118            PollerState::kShuttingDown;
119   }
120   void TriggerShutdown();
121 
122   ~PosixEnginePollerManager() override;
123 
124  private:
125   enum class PollerState { kExternal, kOk, kShuttingDown };
126   std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> poller_;
127   std::atomic<PollerState> poller_state_{PollerState::kOk};
128   std::shared_ptr<ThreadPool> executor_;
129   bool trigger_shutdown_called_;
130 };
131 #endif  // GRPC_POSIX_SOCKET_TCP
132 
133 // An iomgr-based Posix EventEngine implementation.
134 // All methods require an ExecCtx to already exist on the thread's stack.
135 class PosixEventEngine final : public PosixEventEngineWithFdSupport,
136                                public grpc_core::KeepsGrpcInitialized {
137  public:
138   class PosixDNSResolver : public EventEngine::DNSResolver {
139    public:
140     explicit PosixDNSResolver(
141         grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver);
142     void LookupHostname(LookupHostnameCallback on_resolve,
143                         absl::string_view name,
144                         absl::string_view default_port) override;
145     void LookupSRV(LookupSRVCallback on_resolve,
146                    absl::string_view name) override;
147     void LookupTXT(LookupTXTCallback on_resolve,
148                    absl::string_view name) override;
149 
150    private:
151     grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver_;
152   };
153 
154 #ifdef GRPC_POSIX_SOCKET_TCP
155   // Constructs an EventEngine which has a shared ownership of the poller. Do
156   // not call this constructor directly. Instead use the
157   // MakeTestOnlyPosixEventEngine static method. Its expected to be used only in
158   // tests.
159   explicit PosixEventEngine(
160       std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
161           poller);
162   PosixEventEngine();
163 #else   // GRPC_POSIX_SOCKET_TCP
164   PosixEventEngine();
165 #endif  // GRPC_POSIX_SOCKET_TCP
166 
167   ~PosixEventEngine() override;
168 
169   std::unique_ptr<EventEngine::Endpoint> CreatePosixEndpointFromFd(
170       int fd, const EndpointConfig& config,
171       MemoryAllocator memory_allocator) override;
172   std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd(
173       int fd, const EndpointConfig& config) override;
174 
175   ConnectionHandle CreateEndpointFromUnconnectedFd(
176       int fd, EventEngine::OnConnectCallback on_connect,
177       const EventEngine::ResolvedAddress& addr, const EndpointConfig& config,
178       MemoryAllocator memory_allocator, EventEngine::Duration timeout) override;
179 
180   absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
181       Listener::AcceptCallback on_accept,
182       absl::AnyInvocable<void(absl::Status)> on_shutdown,
183       const EndpointConfig& config,
184       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
185       override;
186 
187   absl::StatusOr<std::unique_ptr<EventEngine::Listener>> CreatePosixListener(
188       PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
189       absl::AnyInvocable<void(absl::Status)> on_shutdown,
190       const EndpointConfig& config,
191       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
192       override;
193 
194   ConnectionHandle Connect(OnConnectCallback on_connect,
195                            const ResolvedAddress& addr,
196                            const EndpointConfig& args,
197                            MemoryAllocator memory_allocator,
198                            Duration timeout) override;
199 
200   bool CancelConnect(ConnectionHandle handle) override;
201   bool IsWorkerThread() override;
202   absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
203       GRPC_UNUSED const DNSResolver::ResolverOptions& options) override;
204   void Run(Closure* closure) override;
205   void Run(absl::AnyInvocable<void()> closure) override;
206   // Caution!! The timer implementation cannot create any fds. See #20418.
207   TaskHandle RunAfter(Duration when, Closure* closure) override;
208   TaskHandle RunAfter(Duration when,
209                       absl::AnyInvocable<void()> closure) override;
210   bool Cancel(TaskHandle handle) override;
211 
212 #ifdef GRPC_POSIX_SOCKET_TCP
213   // The posix EventEngine returned by this method would have a shared ownership
214   // of the poller and would not be in-charge of driving the poller by calling
215   // its Work(..) method. Instead its upto the test to drive the poller. The
216   // returned posix EventEngine will also not attempt to shutdown the poller
217   // since it does not own it.
MakeTestOnlyPosixEventEngine(std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> test_only_poller)218   static std::shared_ptr<PosixEventEngine> MakeTestOnlyPosixEventEngine(
219       std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller>
220           test_only_poller) {
221     return std::make_shared<PosixEventEngine>(std::move(test_only_poller));
222   }
223 #endif  // GRPC_POSIX_SOCKET_TCP
224 
225  private:
226   struct ClosureData;
227   EventEngine::TaskHandle RunAfterInternal(Duration when,
228                                            absl::AnyInvocable<void()> cb);
229 
230 #ifdef GRPC_POSIX_SOCKET_TCP
231   friend class AsyncConnect;
232   struct ConnectionShard {
233     grpc_core::Mutex mu;
234     absl::flat_hash_map<int64_t, AsyncConnect*> pending_connections
235         ABSL_GUARDED_BY(&mu);
236   };
237 
238   static void PollerWorkInternal(
239       std::shared_ptr<PosixEnginePollerManager> poller_manager);
240 
241   ConnectionHandle CreateEndpointFromUnconnectedFdInternal(
242       int fd, EventEngine::OnConnectCallback on_connect,
243       const EventEngine::ResolvedAddress& addr, const PosixTcpOptions& options,
244       MemoryAllocator memory_allocator, EventEngine::Duration timeout);
245 
246   void OnConnectFinishInternal(int connection_handle);
247 
248   std::vector<ConnectionShard> connection_shards_;
249   std::atomic<int64_t> last_connection_id_{1};
250 
251 #endif  // GRPC_POSIX_SOCKET_TCP
252 
253   grpc_core::Mutex mu_;
254   TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
255   std::atomic<intptr_t> aba_token_{0};
256   std::shared_ptr<ThreadPool> executor_;
257   std::shared_ptr<TimerManager> timer_manager_;
258 #ifdef GRPC_POSIX_SOCKET_TCP
259   std::shared_ptr<PosixEnginePollerManager> poller_manager_;
260 #endif  // GRPC_POSIX_SOCKET_TCP
261 };
262 
263 }  // namespace experimental
264 }  // namespace grpc_event_engine
265 
266 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_H
267