• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "distributed/rpc/tcp/event_loop.h"
18 
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/tcp.h>
22 #include <sys/eventfd.h>
23 #include <sys/socket.h>
24 #include <securec.h>
25 #include <unistd.h>
26 #include <utility>
27 #include <atomic>
28 #include <string>
29 #include <thread>
30 
31 #include "actor/log.h"
32 #include "utils/convert_utils_base.h"
33 #include "include/backend/distributed/rpc/tcp/constants.h"
34 
35 namespace mindspore {
36 namespace distributed {
37 namespace rpc {
EventLoopRun(EventLoop * evloop,int timeout)38 int EventLoopRun(EventLoop *evloop, int timeout) {
39   if (evloop == nullptr) {
40     return RPC_ERROR;
41   }
42   struct epoll_event *events = nullptr;
43   (void)sem_post(&evloop->sem_id_);
44 
45   size_t size = sizeof(struct epoll_event) * EPOLL_EVENTS_SIZE;
46   events = (struct epoll_event *)malloc(size);
47   if (events == nullptr) {
48     MS_LOG(ERROR) << "Failed to call malloc events";
49     return RPC_ERROR;
50   }
51   if (memset_s(events, size, 0, size) != EOK) {
52     MS_LOG(ERROR) << "Failed to call memset_s.";
53     free(events);
54     return RPC_ERROR;
55   }
56 
57   while (!evloop->is_stop_) {
58     /* free deleted event handlers */
59     evloop->RemoveDeletedEvents();
60     int nevent = epoll_wait(evloop->epoll_fd_, events, EPOLL_EVENTS_SIZE, timeout);
61     if (nevent < 0) {
62       if (errno != EINTR) {
63         MS_LOG(ERROR) << "Failed to call epoll_wait, epoll_fd_: " << evloop->epoll_fd_ << ", errno: " << errno;
64         free(events);
65         return RPC_ERROR;
66       } else {
67         continue;
68       }
69     } else if (nevent > 0) {
70       /* save the epoll modify in "stop" while dispatching handlers */
71       evloop->HandleEvent(events, IntToSize(nevent));
72     } else {
73       MS_LOG(ERROR) << "Failed to call epoll_wait, epoll_fd_: " << evloop->epoll_fd_ << ", ret: 0,errno: " << errno;
74       evloop->is_stop_ = true;
75     }
76     if (evloop->is_stop_) {
77       /* free deleted event handlers */
78       evloop->RemoveDeletedEvents();
79     }
80   }
81   evloop->is_stop_ = false;
82   MS_LOG(INFO) << "Event epoll loop run end";
83   free(events);
84 
85   return RPC_OK;
86 }
87 
EvloopRun(void * arg)88 void *EvloopRun(void *arg) {
89   if (arg == nullptr) {
90     MS_LOG(ERROR) << "Arg is null";
91   } else {
92     (void)EventLoopRun(reinterpret_cast<EventLoop *>(arg), -1);
93   }
94   return nullptr;
95 }
96 
QueueReadyCallback(int fd,uint32_t events,void * arg)97 void QueueReadyCallback(int fd, uint32_t events, void *arg) {
98   EventLoop *evloop = reinterpret_cast<EventLoop *>(arg);
99   if (evloop == nullptr) {
100     MS_LOG(ERROR) << "The evloop is null fd:" << fd << ",events:" << events;
101     return;
102   }
103   uint64_t count;
104   ssize_t retval = read(evloop->task_queue_event_fd_, &count, sizeof(count));
105   if (retval > 0 && retval == sizeof(count)) {
106     // take out functions from the queue
107     std::queue<std::function<void()>> q;
108 
109     evloop->task_queue_mutex_.lock();
110     evloop->task_queue_.swap(q);
111     evloop->task_queue_mutex_.unlock();
112 
113     // invoke functions in the queue
114     while (!q.empty()) {
115       q.front()();
116       q.pop();
117     }
118   }
119 }
120 
ReleaseResource()121 void EventLoop::ReleaseResource() {
122   if (task_queue_event_fd_ != -1) {
123     if (close(task_queue_event_fd_) != 0) {
124       MS_LOG(ERROR) << "Failed to close task queue event fd: " << task_queue_event_fd_;
125     }
126     task_queue_event_fd_ = -1;
127   }
128   if (epoll_fd_ != -1) {
129     if (close(epoll_fd_) != 0) {
130       MS_LOG(ERROR) << "Failed to close epoll fd: " << epoll_fd_;
131     }
132     epoll_fd_ = -1;
133   }
134 }
135 
AddTask(std::function<int ()> && task)136 size_t EventLoop::AddTask(std::function<int()> &&task) {
137   // put func to the queue
138   task_queue_mutex_.lock();
139   (void)task_queue_.emplace(std::move(task));
140 
141   // return the queue size to send's caller.
142   auto result = task_queue_.size();
143   task_queue_mutex_.unlock();
144 
145   if (result == 1) {
146     // wakeup event loop
147     uint64_t one = 1;
148     ssize_t retval = write(task_queue_event_fd_, &one, sizeof(one));
149     if (retval <= 0 || retval != sizeof(one)) {
150       MS_LOG(WARNING) << "Failed to write queue Event fd: " << task_queue_event_fd_ << ",errno:" << errno;
151     }
152   }
153   return result;
154 }
155 
RemainingTaskNum()156 size_t EventLoop::RemainingTaskNum() {
157   task_queue_mutex_.lock();
158   auto task_num = task_queue_.size();
159   task_queue_mutex_.unlock();
160   return task_num;
161 }
162 
Initialize(const std::string & threadName)163 bool EventLoop::Initialize(const std::string &threadName) {
164   int retval = InitResource();
165   if (retval != RPC_OK) {
166     return false;
167   }
168   (void)sem_init(&sem_id_, 0, 0);
169 
170   if (pthread_create(&loop_thread_, nullptr, EvloopRun, reinterpret_cast<void *>(this)) != 0) {
171     MS_LOG(ERROR) << "Failed to call pthread_create";
172     Finalize();
173     return false;
174   }
175 
176   // wait EvloopRun
177   (void)sem_wait(&sem_id_);
178 #if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 12
179   std::string name = threadName;
180 
181   if (name.empty()) {
182     name = "EventLoopThread";
183   }
184   retval = pthread_setname_np(loop_thread_, name.c_str());
185   if (retval != 0) {
186     MS_LOG(INFO) << "Set pthread name fail name:" << name.c_str() << ",retval:" << retval;
187   } else {
188     MS_LOG(INFO) << "Set pthread name success name:" << name.c_str() << ",loop_thread_:" << loop_thread_;
189   }
190 #endif
191 
192   return true;
193 }
194 
Finalize()195 void EventLoop::Finalize() {
196   if (loop_thread_ > 0) {
197     void *threadResult = nullptr;
198     Stop();
199 
200     int ret = pthread_join(loop_thread_, &threadResult);
201     if (ret != 0) {
202       MS_LOG(INFO) << "Failed to call pthread_join loop_thread_";
203     }
204     loop_thread_ = 0;
205   }
206 
207   RemoveDeletedEvents();
208   ReleaseResource();
209   MS_LOG(INFO) << "Stop loop succ";
210 }
211 
DeleteEvent(int fd)212 void EventLoop::DeleteEvent(int fd) {
213   auto iter = events_.find(fd);
214   if (iter == events_.end()) {
215     return;
216   }
217 
218   Event *eventData = iter->second;
219   if (eventData != nullptr) {
220     delete eventData;
221   }
222   (void)events_.erase(fd);
223 }
224 
FindEvent(int fd)225 Event *EventLoop::FindEvent(int fd) {
226   auto iter = events_.find(fd);
227   if (iter == events_.end()) {
228     return nullptr;
229   }
230   return iter->second;
231 }
232 
InitResource()233 int EventLoop::InitResource() {
234   int retval = 0;
235   is_stop_ = false;
236   epoll_fd_ = epoll_create(EPOLL_SIZE);
237   if (epoll_fd_ == -1) {
238     MS_LOG(ERROR) << "Failed to call epoll_create, errno:" << errno;
239     ReleaseResource();
240     return RPC_ERROR;
241   }
242 
243   // create eventfd
244   task_queue_event_fd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
245   if (task_queue_event_fd_ == -1) {
246     MS_LOG(ERROR) << "Failed to call eventfd, errno:" << errno;
247     ReleaseResource();
248     return RPC_ERROR;
249   }
250 
251   retval = SetEventHandler(task_queue_event_fd_, EPOLLIN | EPOLLHUP | EPOLLERR, QueueReadyCallback,
252                            reinterpret_cast<void *>(this));
253   if (retval != RPC_OK) {
254     MS_LOG(ERROR) << "Add queue event fail task_queue_event_fd_:" << task_queue_event_fd_;
255     ReleaseResource();
256     return RPC_ERROR;
257   }
258   return RPC_OK;
259 }
260 
SetEventHandler(int fd,uint32_t events,EventHandler handler,void * data)261 int EventLoop::SetEventHandler(int fd, uint32_t events, EventHandler handler, void *data) {
262   struct epoll_event ev;
263   Event *evdata = nullptr;
264   int ret = 0;
265 
266   if (memset_s(&ev, sizeof(ev), 0, sizeof(ev)) != EOK) {
267     MS_LOG(ERROR) << "Failed to call memset_s.";
268     return RPC_ERROR;
269   }
270   ev.events = events;
271 
272   evdata = new (std::nothrow) Event();
273   if (evdata == nullptr) {
274     MS_LOG(ERROR) << "Failed to call malloc eventData, fd:" << fd << ",epollfd:" << epoll_fd_;
275     return RPC_ERROR;
276   }
277 
278   evdata->data = data;
279   evdata->handler = handler;
280   evdata->fd = fd;
281 
282   event_lock_.lock();
283   AddEvent(evdata);
284   event_lock_.unlock();
285 
286   ev.data.ptr = evdata;
287   ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
288   if (ret > 0) {
289     event_lock_.lock();
290     DeleteEvent(fd);
291     event_lock_.unlock();
292 
293     if (errno != EEXIST) {
294       MS_LOG(ERROR) << "Failed to call epoll add, fail fd:" << fd << ",epollfd:" << epoll_fd_ << ",errno:" << errno;
295     } else {
296       MS_LOG(ERROR) << "The fd already existed in epoll, fd:" << fd << ",epollfd:" << epoll_fd_ << ",errno:" << errno;
297     }
298     return RPC_ERROR;
299   }
300   return RPC_OK;
301 }
302 
AddEvent(Event * event)303 void EventLoop::AddEvent(Event *event) {
304   if (event == nullptr) {
305     return;
306   }
307   DeleteEvent(event->fd);
308   (void)events_.emplace(event->fd, event);
309 }
310 
DeleteEpollEvent(int fd)311 int EventLoop::DeleteEpollEvent(int fd) {
312   Event *tev = nullptr;
313   struct epoll_event ev;
314   int ret = 0;
315 
316   event_lock_.lock();
317   tev = FindEvent(fd);
318   if (tev == nullptr) {
319     event_lock_.unlock();
320     return RPC_ERROR;
321   }
322   (void)events_.erase(tev->fd);
323 
324   // Don't delete tev immediately, let's push it into deleted_events_, before next epoll_wait,we will free
325   // all events in deleted_events_.
326   AddDeletedEvent(tev);
327 
328   event_lock_.unlock();
329   ev.events = 0;
330   ev.data.ptr = tev;
331 
332   ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &ev);
333   if (ret < 0) {
334     MS_LOG(ERROR) << "Failed to call delete fd in epoll, fd:" << fd << ",epollfd:" << epoll_fd_ << ",errno:" << errno;
335     return RPC_ERROR;
336   }
337   return RPC_OK;
338 }
339 
UpdateEpollEvent(int fd,uint32_t events)340 int EventLoop::UpdateEpollEvent(int fd, uint32_t events) {
341   struct epoll_event ev;
342   Event *tev = nullptr;
343   int ret;
344 
345   tev = FindEvent(fd);
346   if (tev == nullptr) {
347     MS_LOG(ERROR) << "Failed to call event lookup, fd:" << fd << ",events:" << events_;
348     return RPC_ERROR;
349   }
350   if (memset_s(&ev, sizeof(ev), 0, sizeof(ev)) != EOK) {
351     MS_LOG(ERROR) << "Failed to call memset_s.";
352     return RPC_ERROR;
353   }
354 
355   ev.events = events;
356   ev.data.ptr = tev;
357 
358   ret = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
359   if (ret != 0) {
360     MS_LOG(ERROR) << "Failed to modify fd in epoll, fd:" << fd << ",events:" << events << ",errno:" << errno;
361     return RPC_ERROR;
362   }
363   return RPC_OK;
364 }
365 
AddDeletedEvent(Event * event)366 void EventLoop::AddDeletedEvent(Event *event) {
367   if (event == nullptr) {
368     return;
369   }
370   // caller need check eventData is not nullptr
371   std::list<Event *> delete_event_list;
372 
373   // if fd not found, push eventData into deleted_events_[fd]
374   std::map<int, std::list<Event *>>::iterator fdIter = deleted_events_.find(event->fd);
375   if (fdIter == deleted_events_.end()) {
376     deleted_events_[event->fd].push_back(event);
377     return;
378   }
379 
380   // if fd found, check if same eventData ptr exists
381   delete_event_list = fdIter->second;
382   std::list<Event *>::iterator eventIter = delete_event_list.begin();
383   bool found = false;
384   while (eventIter != delete_event_list.end()) {
385     if (*eventIter == event) {
386       MS_LOG(WARNING) << "The fd has been deleted before fd:" << event->fd << ",epoll_fd_:" << epoll_fd_;
387       found = true;
388       break;
389     }
390     ++eventIter;
391   }
392 
393   // if found same eventptr, do nothing
394   if (found) {
395     return;
396   }
397   deleted_events_[event->fd].push_back(event);
398   return;
399 }
400 
RemoveDeletedEvents()401 void EventLoop::RemoveDeletedEvents() {
402   std::map<int, std::list<Event *>>::iterator fdIter = deleted_events_.begin();
403 
404   while (fdIter != deleted_events_.end()) {
405     std::list<Event *> delete_event_list = fdIter->second;
406     std::list<Event *>::iterator eventIter = delete_event_list.begin();
407 
408     while (eventIter != delete_event_list.end()) {
409       Event *deleteEv = *eventIter;
410       delete deleteEv;
411       deleteEv = nullptr;
412       ++eventIter;
413     }
414     (void)deleted_events_.erase(fdIter++);
415   }
416   deleted_events_.clear();
417 }
418 
FindDeletedEvent(const Event * tev)419 int EventLoop::FindDeletedEvent(const Event *tev) {
420   if (tev == nullptr) {
421     return 0;
422   }
423   std::map<int, std::list<Event *>>::iterator fdIter = deleted_events_.find(tev->fd);
424   if (fdIter == deleted_events_.end()) {
425     return 0;
426   }
427 
428   std::list<Event *> delete_event_list = fdIter->second;
429   std::list<Event *>::iterator eventIter = delete_event_list.begin();
430 
431   while (eventIter != delete_event_list.end()) {
432     if (*eventIter == tev) {
433       return 1;
434     }
435     ++eventIter;
436   }
437   return 0;
438 }
439 
HandleEvent(struct epoll_event * events,size_t nevent)440 void EventLoop::HandleEvent(struct epoll_event *events, size_t nevent) {
441   if (events == nullptr) {
442     return;
443   }
444   int found;
445   Event *tev = nullptr;
446 
447   for (size_t i = 0; i < nevent; i++) {
448     tev = reinterpret_cast<Event *>(events[i].data.ptr);
449 
450     if (tev != nullptr) {
451       found = FindDeletedEvent(tev);
452       if (found > 0) {
453         MS_LOG(WARNING) << "The fd has been deleted from epoll fd:" << tev->fd << ",epoll_fd_:" << epoll_fd_;
454         continue;
455       }
456       tev->handler(tev->fd, events[i].events, tev->data);
457     }
458   }
459 }
460 
Stop()461 void EventLoop::Stop() {
462   if (is_stop_) {
463     return;
464   }
465 
466   is_stop_ = true;
467   uint64_t one = 1;
468 
469   auto retval = write(task_queue_event_fd_, &one, sizeof(one));
470   if (retval <= 0 || retval != sizeof(one)) {
471     MS_LOG(WARNING) << "Failed to write task_queue_event_fd_ fd:" << task_queue_event_fd_ << ",errno:" << errno;
472   }
473   return;
474 }
475 }  // namespace rpc
476 }  // namespace distributed
477 }  // namespace mindspore
478