1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_EV_LOOP_H_ 18 #define MINDSPORE_CCSRC_DISTRIBUTED_RPC_TCP_EV_LOOP_H_ 19 20 #include <sys/epoll.h> 21 #include <sys/eventfd.h> 22 #include <semaphore.h> 23 #include <functional> 24 #include <list> 25 #include <mutex> 26 #include <queue> 27 #include <map> 28 #include <string> 29 30 namespace mindspore { 31 namespace distributed { 32 namespace rpc { 33 class EventLoop; 34 using Duration = uint64_t; 35 36 // Max epoll set size 37 constexpr auto EPOLL_SIZE = 4096; 38 39 // Max epoll event size 40 constexpr auto EPOLL_EVENTS_SIZE = 64; 41 42 typedef void (*EventHandler)(int fd, uint32_t events, void *data); 43 int EventLoopRun(EventLoop *evloop, int timeout); 44 45 /* 46 * The event occurred on the fd. 47 */ 48 typedef struct { 49 int fd; 50 void *data; 51 EventHandler handler; 52 } Event; 53 54 /* 55 * The class EventLoop monitors a certain file descriptor created by eventfd function call, 56 * and triggers tasks when any event occurred on the file descriptor. 57 */ 58 class EventLoop { 59 public: EventLoop()60 EventLoop() : epoll_fd_(-1), is_stop_(false), loop_thread_(0), task_queue_event_fd_(-1) {} 61 EventLoop(const EventLoop &) = delete; 62 EventLoop &operator=(const EventLoop &) = delete; 63 ~EventLoop() = default; 64 65 bool Initialize(const std::string &threadName); 66 void Finalize(); 67 68 // Add task (eg. send message, reconnect etc.) to task queue of the event loop. 69 // These tasks are executed asynchronously. 70 size_t AddTask(std::function<int()> &&task); 71 72 // The number of tasks in the pending task queue. 73 size_t RemainingTaskNum(); 74 75 // Set event handler for events(read/write/..) occurred on the socket fd. 76 int SetEventHandler(int sock_fd, uint32_t events, EventHandler handler, void *data); 77 78 // Modify the evnets monitored by epoll. 79 int UpdateEpollEvent(int fd, uint32_t events); 80 int DeleteEpollEvent(int fd); 81 82 private: 83 void AddEvent(Event *event); 84 85 // Allocate the resources of epoll and tasks. 86 int InitResource(); 87 88 // Release the resources of epoll and tasks. 89 void ReleaseResource(); 90 91 // Stop the event loop. 92 void Stop(); 93 94 // Operate the soft deleted events. 95 void AddDeletedEvent(Event *event); 96 int FindDeletedEvent(const Event *event); 97 void RemoveDeletedEvents(); 98 99 // Event operations. 100 void HandleEvent(struct epoll_event *events, size_t nevent); 101 void DeleteEvent(int fd); 102 Event *FindEvent(int fd); 103 104 // Listen on some sockets. 105 int epoll_fd_; 106 107 // Whether the event loop should stop. 108 bool is_stop_; 109 110 sem_t sem_id_; 111 std::mutex task_queue_mutex_; 112 113 // The loop thread. 114 pthread_t loop_thread_; 115 116 // eventfd to trigger task_queue__. 117 int task_queue_event_fd_; 118 119 // Queue tasks like send message, reconnect, collect metrics, etc. 120 // This tasks will be triggered by task_queue_event_fd_. 121 std::queue<std::function<void()>> task_queue_; 122 123 // Events on the socket. 124 std::mutex event_lock_; 125 std::map<int, Event *> events_; 126 127 // To be safe, use a list to preserve deleted events rather than a map. Because the caller may 128 // delete events on the same fd twice in once epoll_wait. 129 std::map<int, std::list<Event *>> deleted_events_; 130 131 friend int EventLoopRun(EventLoop *evloop, int timeout); 132 friend void QueueReadyCallback(int fd, uint32_t events, void *arg); 133 }; 134 } // namespace rpc 135 } // namespace distributed 136 } // namespace mindspore 137 138 #endif 139