1 /**
2 * Copyright 2021-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "distributed/rpc/tcp/event_loop.h"
18
19 #include <arpa/inet.h>
20 #include <netinet/in.h>
21 #include <netinet/tcp.h>
22 #include <sys/eventfd.h>
23 #include <sys/socket.h>
24 #include <securec.h>
25 #include <unistd.h>
26 #include <utility>
27 #include <atomic>
28 #include <string>
29 #include <thread>
30
31 #include "actor/log.h"
32 #include "utils/convert_utils_base.h"
33 #include "include/backend/distributed/rpc/tcp/constants.h"
34
35 namespace mindspore {
36 namespace distributed {
37 namespace rpc {
EventLoopRun(EventLoop * evloop,int timeout)38 int EventLoopRun(EventLoop *evloop, int timeout) {
39 if (evloop == nullptr) {
40 return RPC_ERROR;
41 }
42 struct epoll_event *events = nullptr;
43 (void)sem_post(&evloop->sem_id_);
44
45 size_t size = sizeof(struct epoll_event) * EPOLL_EVENTS_SIZE;
46 events = (struct epoll_event *)malloc(size);
47 if (events == nullptr) {
48 MS_LOG(ERROR) << "Failed to call malloc events";
49 return RPC_ERROR;
50 }
51 if (memset_s(events, size, 0, size) != EOK) {
52 MS_LOG(ERROR) << "Failed to call memset_s.";
53 free(events);
54 return RPC_ERROR;
55 }
56
57 while (!evloop->is_stop_) {
58 /* free deleted event handlers */
59 evloop->RemoveDeletedEvents();
60 int nevent = epoll_wait(evloop->epoll_fd_, events, EPOLL_EVENTS_SIZE, timeout);
61 if (nevent < 0) {
62 if (errno != EINTR) {
63 MS_LOG(ERROR) << "Failed to call epoll_wait, epoll_fd_: " << evloop->epoll_fd_ << ", errno: " << errno;
64 free(events);
65 return RPC_ERROR;
66 } else {
67 continue;
68 }
69 } else if (nevent > 0) {
70 /* save the epoll modify in "stop" while dispatching handlers */
71 evloop->HandleEvent(events, IntToSize(nevent));
72 } else {
73 MS_LOG(ERROR) << "Failed to call epoll_wait, epoll_fd_: " << evloop->epoll_fd_ << ", ret: 0,errno: " << errno;
74 evloop->is_stop_ = true;
75 }
76 if (evloop->is_stop_) {
77 /* free deleted event handlers */
78 evloop->RemoveDeletedEvents();
79 }
80 }
81 evloop->is_stop_ = false;
82 MS_LOG(INFO) << "Event epoll loop run end";
83 free(events);
84
85 return RPC_OK;
86 }
87
EvloopRun(void * arg)88 void *EvloopRun(void *arg) {
89 if (arg == nullptr) {
90 MS_LOG(ERROR) << "Arg is null";
91 } else {
92 (void)EventLoopRun(reinterpret_cast<EventLoop *>(arg), -1);
93 }
94 return nullptr;
95 }
96
QueueReadyCallback(int fd,uint32_t events,void * arg)97 void QueueReadyCallback(int fd, uint32_t events, void *arg) {
98 EventLoop *evloop = reinterpret_cast<EventLoop *>(arg);
99 if (evloop == nullptr) {
100 MS_LOG(ERROR) << "The evloop is null fd:" << fd << ",events:" << events;
101 return;
102 }
103 uint64_t count;
104 ssize_t retval = read(evloop->task_queue_event_fd_, &count, sizeof(count));
105 if (retval > 0 && retval == sizeof(count)) {
106 // take out functions from the queue
107 std::queue<std::function<void()>> q;
108
109 evloop->task_queue_mutex_.lock();
110 evloop->task_queue_.swap(q);
111 evloop->task_queue_mutex_.unlock();
112
113 // invoke functions in the queue
114 while (!q.empty()) {
115 q.front()();
116 q.pop();
117 }
118 }
119 }
120
ReleaseResource()121 void EventLoop::ReleaseResource() {
122 if (task_queue_event_fd_ != -1) {
123 if (close(task_queue_event_fd_) != 0) {
124 MS_LOG(ERROR) << "Failed to close task queue event fd: " << task_queue_event_fd_;
125 }
126 task_queue_event_fd_ = -1;
127 }
128 if (epoll_fd_ != -1) {
129 if (close(epoll_fd_) != 0) {
130 MS_LOG(ERROR) << "Failed to close epoll fd: " << epoll_fd_;
131 }
132 epoll_fd_ = -1;
133 }
134 }
135
AddTask(std::function<int ()> && task)136 size_t EventLoop::AddTask(std::function<int()> &&task) {
137 // put func to the queue
138 task_queue_mutex_.lock();
139 (void)task_queue_.emplace(std::move(task));
140
141 // return the queue size to send's caller.
142 auto result = task_queue_.size();
143 task_queue_mutex_.unlock();
144
145 if (result == 1) {
146 // wakeup event loop
147 uint64_t one = 1;
148 ssize_t retval = write(task_queue_event_fd_, &one, sizeof(one));
149 if (retval <= 0 || retval != sizeof(one)) {
150 MS_LOG(WARNING) << "Failed to write queue Event fd: " << task_queue_event_fd_ << ",errno:" << errno;
151 }
152 }
153 return result;
154 }
155
RemainingTaskNum()156 size_t EventLoop::RemainingTaskNum() {
157 task_queue_mutex_.lock();
158 auto task_num = task_queue_.size();
159 task_queue_mutex_.unlock();
160 return task_num;
161 }
162
Initialize(const std::string & threadName)163 bool EventLoop::Initialize(const std::string &threadName) {
164 int retval = InitResource();
165 if (retval != RPC_OK) {
166 return false;
167 }
168 (void)sem_init(&sem_id_, 0, 0);
169
170 if (pthread_create(&loop_thread_, nullptr, EvloopRun, reinterpret_cast<void *>(this)) != 0) {
171 MS_LOG(ERROR) << "Failed to call pthread_create";
172 Finalize();
173 return false;
174 }
175
176 // wait EvloopRun
177 (void)sem_wait(&sem_id_);
178 #if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 12
179 std::string name = threadName;
180
181 if (name.empty()) {
182 name = "EventLoopThread";
183 }
184 retval = pthread_setname_np(loop_thread_, name.c_str());
185 if (retval != 0) {
186 MS_LOG(INFO) << "Set pthread name fail name:" << name.c_str() << ",retval:" << retval;
187 } else {
188 MS_LOG(INFO) << "Set pthread name success name:" << name.c_str() << ",loop_thread_:" << loop_thread_;
189 }
190 #endif
191
192 return true;
193 }
194
Finalize()195 void EventLoop::Finalize() {
196 if (loop_thread_ > 0) {
197 void *threadResult = nullptr;
198 Stop();
199
200 int ret = pthread_join(loop_thread_, &threadResult);
201 if (ret != 0) {
202 MS_LOG(INFO) << "Failed to call pthread_join loop_thread_";
203 }
204 loop_thread_ = 0;
205 }
206
207 RemoveDeletedEvents();
208 ReleaseResource();
209 MS_LOG(INFO) << "Stop loop succ";
210 }
211
DeleteEvent(int fd)212 void EventLoop::DeleteEvent(int fd) {
213 auto iter = events_.find(fd);
214 if (iter == events_.end()) {
215 return;
216 }
217
218 Event *eventData = iter->second;
219 if (eventData != nullptr) {
220 delete eventData;
221 }
222 (void)events_.erase(fd);
223 }
224
FindEvent(int fd)225 Event *EventLoop::FindEvent(int fd) {
226 auto iter = events_.find(fd);
227 if (iter == events_.end()) {
228 return nullptr;
229 }
230 return iter->second;
231 }
232
InitResource()233 int EventLoop::InitResource() {
234 int retval = 0;
235 is_stop_ = false;
236 epoll_fd_ = epoll_create(EPOLL_SIZE);
237 if (epoll_fd_ == -1) {
238 MS_LOG(ERROR) << "Failed to call epoll_create, errno:" << errno;
239 ReleaseResource();
240 return RPC_ERROR;
241 }
242
243 // create eventfd
244 task_queue_event_fd_ = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
245 if (task_queue_event_fd_ == -1) {
246 MS_LOG(ERROR) << "Failed to call eventfd, errno:" << errno;
247 ReleaseResource();
248 return RPC_ERROR;
249 }
250
251 retval = SetEventHandler(task_queue_event_fd_, EPOLLIN | EPOLLHUP | EPOLLERR, QueueReadyCallback,
252 reinterpret_cast<void *>(this));
253 if (retval != RPC_OK) {
254 MS_LOG(ERROR) << "Add queue event fail task_queue_event_fd_:" << task_queue_event_fd_;
255 ReleaseResource();
256 return RPC_ERROR;
257 }
258 return RPC_OK;
259 }
260
SetEventHandler(int fd,uint32_t events,EventHandler handler,void * data)261 int EventLoop::SetEventHandler(int fd, uint32_t events, EventHandler handler, void *data) {
262 struct epoll_event ev;
263 Event *evdata = nullptr;
264 int ret = 0;
265
266 if (memset_s(&ev, sizeof(ev), 0, sizeof(ev)) != EOK) {
267 MS_LOG(ERROR) << "Failed to call memset_s.";
268 return RPC_ERROR;
269 }
270 ev.events = events;
271
272 evdata = new (std::nothrow) Event();
273 if (evdata == nullptr) {
274 MS_LOG(ERROR) << "Failed to call malloc eventData, fd:" << fd << ",epollfd:" << epoll_fd_;
275 return RPC_ERROR;
276 }
277
278 evdata->data = data;
279 evdata->handler = handler;
280 evdata->fd = fd;
281
282 event_lock_.lock();
283 AddEvent(evdata);
284 event_lock_.unlock();
285
286 ev.data.ptr = evdata;
287 ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
288 if (ret > 0) {
289 event_lock_.lock();
290 DeleteEvent(fd);
291 event_lock_.unlock();
292
293 if (errno != EEXIST) {
294 MS_LOG(ERROR) << "Failed to call epoll add, fail fd:" << fd << ",epollfd:" << epoll_fd_ << ",errno:" << errno;
295 } else {
296 MS_LOG(ERROR) << "The fd already existed in epoll, fd:" << fd << ",epollfd:" << epoll_fd_ << ",errno:" << errno;
297 }
298 return RPC_ERROR;
299 }
300 return RPC_OK;
301 }
302
AddEvent(Event * event)303 void EventLoop::AddEvent(Event *event) {
304 if (event == nullptr) {
305 return;
306 }
307 DeleteEvent(event->fd);
308 (void)events_.emplace(event->fd, event);
309 }
310
DeleteEpollEvent(int fd)311 int EventLoop::DeleteEpollEvent(int fd) {
312 Event *tev = nullptr;
313 struct epoll_event ev;
314 int ret = 0;
315
316 event_lock_.lock();
317 tev = FindEvent(fd);
318 if (tev == nullptr) {
319 event_lock_.unlock();
320 return RPC_ERROR;
321 }
322 (void)events_.erase(tev->fd);
323
324 // Don't delete tev immediately, let's push it into deleted_events_, before next epoll_wait,we will free
325 // all events in deleted_events_.
326 AddDeletedEvent(tev);
327
328 event_lock_.unlock();
329 ev.events = 0;
330 ev.data.ptr = tev;
331
332 ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &ev);
333 if (ret < 0) {
334 MS_LOG(ERROR) << "Failed to call delete fd in epoll, fd:" << fd << ",epollfd:" << epoll_fd_ << ",errno:" << errno;
335 return RPC_ERROR;
336 }
337 return RPC_OK;
338 }
339
UpdateEpollEvent(int fd,uint32_t events)340 int EventLoop::UpdateEpollEvent(int fd, uint32_t events) {
341 struct epoll_event ev;
342 Event *tev = nullptr;
343 int ret;
344
345 tev = FindEvent(fd);
346 if (tev == nullptr) {
347 MS_LOG(ERROR) << "Failed to call event lookup, fd:" << fd << ",events:" << events_;
348 return RPC_ERROR;
349 }
350 if (memset_s(&ev, sizeof(ev), 0, sizeof(ev)) != EOK) {
351 MS_LOG(ERROR) << "Failed to call memset_s.";
352 return RPC_ERROR;
353 }
354
355 ev.events = events;
356 ev.data.ptr = tev;
357
358 ret = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
359 if (ret != 0) {
360 MS_LOG(ERROR) << "Failed to modify fd in epoll, fd:" << fd << ",events:" << events << ",errno:" << errno;
361 return RPC_ERROR;
362 }
363 return RPC_OK;
364 }
365
AddDeletedEvent(Event * event)366 void EventLoop::AddDeletedEvent(Event *event) {
367 if (event == nullptr) {
368 return;
369 }
370 // caller need check eventData is not nullptr
371 std::list<Event *> delete_event_list;
372
373 // if fd not found, push eventData into deleted_events_[fd]
374 std::map<int, std::list<Event *>>::iterator fdIter = deleted_events_.find(event->fd);
375 if (fdIter == deleted_events_.end()) {
376 deleted_events_[event->fd].push_back(event);
377 return;
378 }
379
380 // if fd found, check if same eventData ptr exists
381 delete_event_list = fdIter->second;
382 std::list<Event *>::iterator eventIter = delete_event_list.begin();
383 bool found = false;
384 while (eventIter != delete_event_list.end()) {
385 if (*eventIter == event) {
386 MS_LOG(WARNING) << "The fd has been deleted before fd:" << event->fd << ",epoll_fd_:" << epoll_fd_;
387 found = true;
388 break;
389 }
390 ++eventIter;
391 }
392
393 // if found same eventptr, do nothing
394 if (found) {
395 return;
396 }
397 deleted_events_[event->fd].push_back(event);
398 return;
399 }
400
RemoveDeletedEvents()401 void EventLoop::RemoveDeletedEvents() {
402 std::map<int, std::list<Event *>>::iterator fdIter = deleted_events_.begin();
403
404 while (fdIter != deleted_events_.end()) {
405 std::list<Event *> delete_event_list = fdIter->second;
406 std::list<Event *>::iterator eventIter = delete_event_list.begin();
407
408 while (eventIter != delete_event_list.end()) {
409 Event *deleteEv = *eventIter;
410 delete deleteEv;
411 deleteEv = nullptr;
412 ++eventIter;
413 }
414 (void)deleted_events_.erase(fdIter++);
415 }
416 deleted_events_.clear();
417 }
418
FindDeletedEvent(const Event * tev)419 int EventLoop::FindDeletedEvent(const Event *tev) {
420 if (tev == nullptr) {
421 return 0;
422 }
423 std::map<int, std::list<Event *>>::iterator fdIter = deleted_events_.find(tev->fd);
424 if (fdIter == deleted_events_.end()) {
425 return 0;
426 }
427
428 std::list<Event *> delete_event_list = fdIter->second;
429 std::list<Event *>::iterator eventIter = delete_event_list.begin();
430
431 while (eventIter != delete_event_list.end()) {
432 if (*eventIter == tev) {
433 return 1;
434 }
435 ++eventIter;
436 }
437 return 0;
438 }
439
HandleEvent(struct epoll_event * events,size_t nevent)440 void EventLoop::HandleEvent(struct epoll_event *events, size_t nevent) {
441 if (events == nullptr) {
442 return;
443 }
444 int found;
445 Event *tev = nullptr;
446
447 for (size_t i = 0; i < nevent; i++) {
448 tev = reinterpret_cast<Event *>(events[i].data.ptr);
449
450 if (tev != nullptr) {
451 found = FindDeletedEvent(tev);
452 if (found > 0) {
453 MS_LOG(WARNING) << "The fd has been deleted from epoll fd:" << tev->fd << ",epoll_fd_:" << epoll_fd_;
454 continue;
455 }
456 tev->handler(tev->fd, events[i].events, tev->data);
457 }
458 }
459 }
460
Stop()461 void EventLoop::Stop() {
462 if (is_stop_) {
463 return;
464 }
465
466 is_stop_ = true;
467 uint64_t one = 1;
468
469 auto retval = write(task_queue_event_fd_, &one, sizeof(one));
470 if (retval <= 0 || retval != sizeof(one)) {
471 MS_LOG(WARNING) << "Failed to write task_queue_event_fd_ fd:" << task_queue_event_fd_ << ",errno:" << errno;
472 }
473 return;
474 }
475 } // namespace rpc
476 } // namespace distributed
477 } // namespace mindspore
478