• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H
16 #define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H
17 
18 #include <memory>
19 #include <string>
20 #include <utility>
21 #include <vector>
22 
23 #include "absl/base/thread_annotations.h"
24 #include "absl/functional/any_invocable.h"
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 
28 #include <grpc/event_engine/endpoint_config.h>
29 #include <grpc/event_engine/event_engine.h>
30 #include <grpc/event_engine/memory_allocator.h>
31 #include <grpc/event_engine/slice_buffer.h>
32 
33 #include "src/core/lib/gprpp/crash.h"
34 #include "src/core/lib/gprpp/notification.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/gprpp/thd.h"
37 #include "test/core/event_engine/event_engine_test_utils.h"
38 
39 namespace grpc_event_engine {
40 namespace experimental {
41 
42 class PosixOracleEndpoint : public EventEngine::Endpoint {
43  public:
44   explicit PosixOracleEndpoint(int socket_fd);
45   static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd);
46   ~PosixOracleEndpoint() override;
47   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
48             const ReadArgs* args) override;
49   bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
50              SliceBuffer* data, const WriteArgs* args) override;
51   void Shutdown();
GetPeerAddress()52   EventEngine::ResolvedAddress& GetPeerAddress() const override {
53     grpc_core::Crash("unimplemented");
54   }
GetLocalAddress()55   EventEngine::ResolvedAddress& GetLocalAddress() const override {
56     grpc_core::Crash("unimplemented");
57   }
58 
59  private:
60   // An internal helper class definition of Read operations to be performed
61   // by the TCPServerEndpoint.
62   class ReadOperation {
63    public:
ReadOperation()64     ReadOperation()
65         : num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {}
ReadOperation(int num_bytes_to_read,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)66     ReadOperation(int num_bytes_to_read, SliceBuffer* buffer,
67                   absl::AnyInvocable<void(absl::Status)>&& on_complete)
68         : num_bytes_to_read_(num_bytes_to_read),
69           buffer_(buffer),
70           on_complete_(std::move(on_complete)) {}
IsValid()71     bool IsValid() { return num_bytes_to_read_ >= 0 && buffer_ != nullptr; }
GetNumBytesToRead()72     int GetNumBytesToRead() const { return num_bytes_to_read_; }
operator()73     void operator()(std::string read_data, absl::Status status) {
74       if (on_complete_ != nullptr) {
75         AppendStringToSliceBuffer(std::exchange(buffer_, nullptr), read_data);
76         std::exchange(on_complete_, nullptr)(status);
77       }
78     }
79 
80    private:
81     int num_bytes_to_read_;
82     SliceBuffer* buffer_;
83     absl::AnyInvocable<void(absl::Status)> on_complete_;
84   };
85 
86   // An internal helper class definition of Write operations to be performed
87   // by the TCPServerEndpoint.
88   class WriteOperation {
89    public:
WriteOperation()90     WriteOperation() : bytes_to_write_(std::string()), on_complete_(nullptr) {}
WriteOperation(SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)91     WriteOperation(SliceBuffer* buffer,
92                    absl::AnyInvocable<void(absl::Status)>&& on_complete)
93         : bytes_to_write_(ExtractSliceBufferIntoString(buffer)),
94           on_complete_(std::move(on_complete)) {}
IsValid()95     bool IsValid() { return bytes_to_write_.length() > 0; }
GetBytesToWrite()96     std::string GetBytesToWrite() const { return bytes_to_write_; }
operator()97     void operator()(absl::Status status) {
98       if (on_complete_ != nullptr) {
99         std::exchange(on_complete_, nullptr)(status);
100       }
101     }
102 
103    private:
104     std::string bytes_to_write_;
105     absl::AnyInvocable<void(absl::Status)> on_complete_;
106   };
107 
108   void ProcessReadOperations();
109   void ProcessWriteOperations();
110 
111   mutable grpc_core::Mutex mu_;
112   bool is_shutdown_ = false;
113   int socket_fd_;
114   ReadOperation read_ops_channel_;
115   WriteOperation write_ops_channel_;
116   std::unique_ptr<grpc_core::Notification> read_op_signal_{
117       new grpc_core::Notification()};
118   std::unique_ptr<grpc_core::Notification> write_op_signal_{
119       new grpc_core::Notification()};
120   grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_);
121   grpc_core::Thread write_ops_ ABSL_GUARDED_BY(mu_);
122 };
123 
124 class PosixOracleListener : public EventEngine::Listener {
125  public:
126   PosixOracleListener(
127       EventEngine::Listener::AcceptCallback on_accept,
128       absl::AnyInvocable<void(absl::Status)> on_shutdown,
129       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory);
130   ~PosixOracleListener() override;
131   absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr) override;
132   absl::Status Start() override;
133 
134  private:
135   void HandleIncomingConnections();
136 
137   mutable grpc_core::Mutex mu_;
138   EventEngine::Listener::AcceptCallback on_accept_;
139   absl::AnyInvocable<void(absl::Status)> on_shutdown_;
140   std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_;
141   grpc_core::Thread serve_;
142   int pipefd_[2];
143   bool is_started_ = false;
144   std::vector<int> listener_fds_;
145 };
146 
147 // A posix based oracle EventEngine.
148 class PosixOracleEventEngine final : public EventEngine {
149  public:
150   PosixOracleEventEngine() = default;
151   ~PosixOracleEventEngine() override = default;
152 
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)153   absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
154       Listener::AcceptCallback on_accept,
155       absl::AnyInvocable<void(absl::Status)> on_shutdown,
156       const EndpointConfig& /*config*/,
157       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
158       override {
159     return std::make_unique<PosixOracleListener>(
160         std::move(on_accept), std::move(on_shutdown),
161         std::move(memory_allocator_factory));
162   }
163 
164   ConnectionHandle Connect(OnConnectCallback on_connect,
165                            const ResolvedAddress& addr,
166                            const EndpointConfig& args,
167                            MemoryAllocator memory_allocator,
168                            EventEngine::Duration timeout) override;
169 
CancelConnect(ConnectionHandle)170   bool CancelConnect(ConnectionHandle /*handle*/) override {
171     grpc_core::Crash("unimplemented");
172   }
IsWorkerThread()173   bool IsWorkerThread() override { return false; };
GetDNSResolver(const DNSResolver::ResolverOptions &)174   absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
175       const DNSResolver::ResolverOptions& /*options*/) override {
176     grpc_core::Crash("unimplemented");
177   }
Run(Closure *)178   void Run(Closure* /*closure*/) override { grpc_core::Crash("unimplemented"); }
Run(absl::AnyInvocable<void ()>)179   void Run(absl::AnyInvocable<void()> /*closure*/) override {
180     grpc_core::Crash("unimplemented");
181   }
RunAfter(EventEngine::Duration,Closure *)182   TaskHandle RunAfter(EventEngine::Duration /*duration*/,
183                       Closure* /*closure*/) override {
184     grpc_core::Crash("unimplemented");
185   }
RunAfter(EventEngine::Duration,absl::AnyInvocable<void ()>)186   TaskHandle RunAfter(EventEngine::Duration /*duration*/,
187                       absl::AnyInvocable<void()> /*closure*/) override {
188     grpc_core::Crash("unimplemented");
189   }
Cancel(TaskHandle)190   bool Cancel(TaskHandle /*handle*/) override {
191     grpc_core::Crash("unimplemented");
192   }
193 };
194 
195 }  // namespace experimental
196 }  // namespace grpc_event_engine
197 
198 #endif  // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H
199