• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H
16 #define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H
17 
18 #include <grpc/event_engine/endpoint_config.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/event_engine/memory_allocator.h>
21 #include <grpc/event_engine/slice_buffer.h>
22 
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/base/thread_annotations.h"
29 #include "absl/functional/any_invocable.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "src/core/util/crash.h"
33 #include "src/core/util/notification.h"
34 #include "src/core/util/sync.h"
35 #include "src/core/util/thd.h"
36 #include "test/core/event_engine/event_engine_test_utils.h"
37 
38 namespace grpc_event_engine {
39 namespace experimental {
40 
41 class PosixOracleEndpoint : public EventEngine::Endpoint {
42  public:
43   explicit PosixOracleEndpoint(int socket_fd);
44   static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd);
45   ~PosixOracleEndpoint() override;
46   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
47             const ReadArgs* args) override;
48   bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
49              SliceBuffer* data, const WriteArgs* args) override;
50   void Shutdown();
GetPeerAddress()51   EventEngine::ResolvedAddress& GetPeerAddress() const override {
52     grpc_core::Crash("unimplemented");
53   }
GetLocalAddress()54   EventEngine::ResolvedAddress& GetLocalAddress() const override {
55     grpc_core::Crash("unimplemented");
56   }
57 
58  private:
59   // An internal helper class definition of Read operations to be performed
60   // by the TCPServerEndpoint.
61   class ReadOperation {
62    public:
ReadOperation()63     ReadOperation()
64         : num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {}
ReadOperation(int num_bytes_to_read,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)65     ReadOperation(int num_bytes_to_read, SliceBuffer* buffer,
66                   absl::AnyInvocable<void(absl::Status)>&& on_complete)
67         : num_bytes_to_read_(num_bytes_to_read),
68           buffer_(buffer),
69           on_complete_(std::move(on_complete)) {}
IsValid()70     bool IsValid() { return num_bytes_to_read_ >= 0 && buffer_ != nullptr; }
GetNumBytesToRead()71     int GetNumBytesToRead() const { return num_bytes_to_read_; }
operator()72     void operator()(std::string read_data, absl::Status status) {
73       if (on_complete_ != nullptr) {
74         AppendStringToSliceBuffer(std::exchange(buffer_, nullptr), read_data);
75         std::exchange(on_complete_, nullptr)(status);
76       }
77     }
78 
79    private:
80     int num_bytes_to_read_;
81     SliceBuffer* buffer_;
82     absl::AnyInvocable<void(absl::Status)> on_complete_;
83   };
84 
85   // An internal helper class definition of Write operations to be performed
86   // by the TCPServerEndpoint.
87   class WriteOperation {
88    public:
WriteOperation()89     WriteOperation() : bytes_to_write_(std::string()), on_complete_(nullptr) {}
WriteOperation(SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)90     WriteOperation(SliceBuffer* buffer,
91                    absl::AnyInvocable<void(absl::Status)>&& on_complete)
92         : bytes_to_write_(ExtractSliceBufferIntoString(buffer)),
93           on_complete_(std::move(on_complete)) {}
IsValid()94     bool IsValid() { return !bytes_to_write_.empty(); }
GetBytesToWrite()95     std::string GetBytesToWrite() const { return bytes_to_write_; }
operator()96     void operator()(absl::Status status) {
97       if (on_complete_ != nullptr) {
98         std::exchange(on_complete_, nullptr)(status);
99       }
100     }
101 
102    private:
103     std::string bytes_to_write_;
104     absl::AnyInvocable<void(absl::Status)> on_complete_;
105   };
106 
107   void ProcessReadOperations();
108   void ProcessWriteOperations();
109 
110   mutable grpc_core::Mutex mu_;
111   bool is_shutdown_ = false;
112   int socket_fd_;
113   ReadOperation read_ops_channel_;
114   WriteOperation write_ops_channel_;
115   std::unique_ptr<grpc_core::Notification> read_op_signal_{
116       new grpc_core::Notification()};
117   std::unique_ptr<grpc_core::Notification> write_op_signal_{
118       new grpc_core::Notification()};
119   grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_);
120   grpc_core::Thread write_ops_ ABSL_GUARDED_BY(mu_);
121 };
122 
123 class PosixOracleListener : public EventEngine::Listener {
124  public:
125   PosixOracleListener(
126       EventEngine::Listener::AcceptCallback on_accept,
127       absl::AnyInvocable<void(absl::Status)> on_shutdown,
128       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory);
129   ~PosixOracleListener() override;
130   absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr) override;
131   absl::Status Start() override;
132 
133  private:
134   void HandleIncomingConnections();
135 
136   mutable grpc_core::Mutex mu_;
137   EventEngine::Listener::AcceptCallback on_accept_;
138   absl::AnyInvocable<void(absl::Status)> on_shutdown_;
139   std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_;
140   grpc_core::Thread serve_;
141   int pipefd_[2];
142   bool is_started_ = false;
143   std::vector<int> listener_fds_;
144 };
145 
146 // A posix based oracle EventEngine.
147 class PosixOracleEventEngine final : public EventEngine {
148  public:
149   PosixOracleEventEngine() = default;
150   ~PosixOracleEventEngine() override = default;
151 
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)152   absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
153       Listener::AcceptCallback on_accept,
154       absl::AnyInvocable<void(absl::Status)> on_shutdown,
155       const EndpointConfig& /*config*/,
156       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
157       override {
158     return std::make_unique<PosixOracleListener>(
159         std::move(on_accept), std::move(on_shutdown),
160         std::move(memory_allocator_factory));
161   }
162 
163   ConnectionHandle Connect(OnConnectCallback on_connect,
164                            const ResolvedAddress& addr,
165                            const EndpointConfig& args,
166                            MemoryAllocator memory_allocator,
167                            EventEngine::Duration timeout) override;
168 
CancelConnect(ConnectionHandle)169   bool CancelConnect(ConnectionHandle /*handle*/) override {
170     grpc_core::Crash("unimplemented");
171   }
IsWorkerThread()172   bool IsWorkerThread() override { return false; };
GetDNSResolver(const DNSResolver::ResolverOptions &)173   absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
174       const DNSResolver::ResolverOptions& /*options*/) override {
175     grpc_core::Crash("unimplemented");
176   }
Run(Closure *)177   void Run(Closure* /*closure*/) override { grpc_core::Crash("unimplemented"); }
Run(absl::AnyInvocable<void ()>)178   void Run(absl::AnyInvocable<void()> /*closure*/) override {
179     grpc_core::Crash("unimplemented");
180   }
RunAfter(EventEngine::Duration,Closure *)181   TaskHandle RunAfter(EventEngine::Duration /*duration*/,
182                       Closure* /*closure*/) override {
183     grpc_core::Crash("unimplemented");
184   }
RunAfter(EventEngine::Duration,absl::AnyInvocable<void ()>)185   TaskHandle RunAfter(EventEngine::Duration /*duration*/,
186                       absl::AnyInvocable<void()> /*closure*/) override {
187     grpc_core::Crash("unimplemented");
188   }
Cancel(TaskHandle)189   bool Cancel(TaskHandle /*handle*/) override {
190     grpc_core::Crash("unimplemented");
191   }
192 };
193 
194 }  // namespace experimental
195 }  // namespace grpc_event_engine
196 
197 #endif  // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H
198