• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_EV_LOOP_H_
18 #define MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_EV_LOOP_H_
19 
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #include <semaphore.h>
23 #include <functional>
24 #include <list>
25 #include <mutex>
26 #include <queue>
27 #include <map>
28 #include <string>
29 
30 namespace mindspore {
31 namespace distributed {
32 namespace rpc {
33 class EventLoop;
34 using Duration = uint64_t;
35 
36 // Max epoll set size
37 constexpr auto EPOLL_SIZE = 4096;
38 
39 // Max epoll event size
40 constexpr auto EPOLL_EVENTS_SIZE = 64;
41 
42 typedef void (*EventHandler)(int fd, uint32_t events, void *data);
43 int EventLoopRun(EventLoop *evloop, int timeout);
44 
45 /*
46  * The event occurred on the fd.
47  */
48 typedef struct {
49   int fd;
50   void *data;
51   EventHandler handler;
52 } Event;
53 
54 /*
55  * The class EventLoop monitors a certain file descriptor created by eventfd function call,
56  * and triggers tasks when any event occurred on the file descriptor.
57  */
58 class EventLoop {
59  public:
EventLoop()60   EventLoop() : epoll_fd_(-1), is_stop_(false), loop_thread_(0), task_queue_event_fd_(-1) {}
61   EventLoop(const EventLoop &) = delete;
62   EventLoop &operator=(const EventLoop &) = delete;
63   ~EventLoop() = default;
64 
65   bool Initialize(const std::string &threadName);
66   void Finalize();
67 
68   // Add task (eg. send message, reconnect etc.) to task queue of the event loop.
69   // These tasks are executed asynchronously.
70   size_t AddTask(std::function<int()> &&task);
71 
72   // The number of tasks in the pending task queue.
73   size_t RemainingTaskNum();
74 
75   // Set event handler for events(read/write/..) occurred on the socket fd.
76   int SetEventHandler(int sock_fd, uint32_t events, EventHandler handler, void *data);
77 
78   // Modify the evnets monitored by epoll.
79   int UpdateEpollEvent(int fd, uint32_t events);
80   int DeleteEpollEvent(int fd);
81 
82  private:
83   void AddEvent(Event *event);
84 
85   // Allocate the resources of epoll and tasks.
86   int InitResource();
87 
88   // Release the resources of epoll and tasks.
89   void ReleaseResource();
90 
91   // Stop the event loop.
92   void Stop();
93 
94   // Operate the soft deleted events.
95   void AddDeletedEvent(Event *event);
96   int FindDeletedEvent(const Event *event);
97   void RemoveDeletedEvents();
98 
99   // Event operations.
100   void HandleEvent(struct epoll_event *events, size_t nevent);
101   void DeleteEvent(int fd);
102   Event *FindEvent(int fd);
103 
104   // Listen on some sockets.
105   int epoll_fd_;
106 
107   // Whether the event loop should stop.
108   bool is_stop_;
109 
110   sem_t sem_id_;
111   std::mutex task_queue_mutex_;
112 
113   // The loop thread.
114   pthread_t loop_thread_;
115 
116   // eventfd to trigger task_queue__.
117   int task_queue_event_fd_;
118 
119   // Queue tasks like send message, reconnect, collect metrics, etc.
120   // This tasks will be triggered by task_queue_event_fd_.
121   std::queue<std::function<void()>> task_queue_;
122 
123   // Events on the socket.
124   std::mutex event_lock_;
125   std::map<int, Event *> events_;
126 
127   // To be safe, use a list to preserve deleted events rather than a map. Because the caller may
128   // delete events on the same fd twice in once epoll_wait.
129   std::map<int, std::list<Event *>> deleted_events_;
130 
131   friend int EventLoopRun(EventLoop *evloop, int timeout);
132   friend void QueueReadyCallback(int fd, uint32_t events, void *arg);
133 };
134 }  // namespace rpc
135 }  // namespace distributed
136 }  // namespace mindspore
137 
138 #endif
139