• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
16 #include <grpc/support/port_platform.h>
17 
18 #ifdef GPR_WINDOWS
19 
20 #include <grpc/event_engine/event_engine.h>
21 
22 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
23 #include "src/core/lib/event_engine/windows/win_socket.h"
24 
25 namespace grpc_event_engine {
26 namespace experimental {
27 
28 class WindowsEndpoint : public EventEngine::Endpoint {
29  public:
30   WindowsEndpoint(const EventEngine::ResolvedAddress& peer_address,
31                   std::unique_ptr<WinSocket> socket,
32                   MemoryAllocator&& allocator, const EndpointConfig& config,
33                   ThreadPool* thread_pool, std::shared_ptr<EventEngine> engine);
34   ~WindowsEndpoint() override;
35   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
36             const ReadArgs* args) override;
37   bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
38              SliceBuffer* data, const WriteArgs* args) override;
39   const EventEngine::ResolvedAddress& GetPeerAddress() const override;
40   const EventEngine::ResolvedAddress& GetLocalAddress() const override;
41 
42  private:
43   struct AsyncIOState;
44 
45   // Permanent closure type for Read callbacks
46   class HandleReadClosure : public EventEngine::Closure {
47    public:
48     void Run() override;
49     void Prime(std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
50                absl::AnyInvocable<void(absl::Status)> cb);
51     // Resets the per-request data, releasing the ref on io_state_.
52     // Returns the previous callback.
53     ABSL_MUST_USE_RESULT absl::AnyInvocable<void(absl::Status)>
54     ResetAndReturnCallback();
55     // Run the callback with whatever data is available, and reset state.
56     //
57     // Returns true if the callback has been called with some data. Returns
58     // false if no data has been read.
59     bool MaybeFinishIfDataHasAlreadyBeenRead();
60     // Swap any leftover slices into the provided buffer
61     void DonateSpareSlices(SliceBuffer* buffer);
62 
63    private:
64     std::shared_ptr<AsyncIOState> io_state_;
65     absl::AnyInvocable<void(absl::Status)> cb_;
66     SliceBuffer* buffer_ = nullptr;
67     SliceBuffer last_read_buffer_;
68   };
69 
70   // Permanent closure type for Write callbacks
71   class HandleWriteClosure : public EventEngine::Closure {
72    public:
73     void Run() override;
74     void Prime(std::shared_ptr<AsyncIOState> io_state, SliceBuffer* buffer,
75                absl::AnyInvocable<void(absl::Status)> cb);
76     // Resets the per-request data, releasing the ref on io_state_.
77     // Returns the previous callback.
78     ABSL_MUST_USE_RESULT absl::AnyInvocable<void(absl::Status)>
79     ResetAndReturnCallback();
80 
81    private:
82     std::shared_ptr<AsyncIOState> io_state_;
83     absl::AnyInvocable<void(absl::Status)> cb_;
84     SliceBuffer* buffer_ = nullptr;
85   };
86 
87   // A class to manage the data that must outlive the Endpoint.
88   //
89   // Once an endpoint is done and destroyed, there still may be overlapped
90   // operations pending. To clean up safely, this data must outlive the
91   // Endpoint, and be destroyed asynchronously when all pending overlapped
92   // events are complete.
93   struct AsyncIOState {
94     AsyncIOState(WindowsEndpoint* endpoint, std::unique_ptr<WinSocket> socket,
95                  std::shared_ptr<EventEngine> engine, ThreadPool* thread_pool);
96     ~AsyncIOState();
97 
98     // Perform the low-level calls and execute the HandleReadClosure
99     // asynchronously.
100     void DoTcpRead(SliceBuffer* buffer);
101 
102     WindowsEndpoint* const endpoint;
103     std::unique_ptr<WinSocket> socket;
104     HandleReadClosure handle_read_event;
105     HandleWriteClosure handle_write_event;
106     std::shared_ptr<EventEngine> engine;
107     ThreadPool* thread_pool;
108   };
109 
110   EventEngine::ResolvedAddress peer_address_;
111   std::string peer_address_string_;
112   EventEngine::ResolvedAddress local_address_;
113   std::string local_address_string_;
114   MemoryAllocator allocator_;
115   std::shared_ptr<AsyncIOState> io_state_;
116 };
117 
118 }  // namespace experimental
119 }  // namespace grpc_event_engine
120 
121 #endif
122 
123 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENDPOINT_H
124