/* * Copyright (c) 2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "poller.h" #include "sched/execute_ctx.h" #include "dfx/log/ffrt_log_api.h" #ifdef FFRT_IO_TASK_SCHEDULER namespace ffrt { Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }, m_events(1024) { m_wakeData.cb = nullptr; m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast(&m_wakeData) } }; if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) { std::terminate(); } } Poller::~Poller() noexcept { ::close(m_wakeData.fd); ::close(m_epFd); timerHandle_ = -1; m_wakeDataMap.clear(); m_delCntMap.clear(); timerMap_.clear(); executedHandle_.clear(); flag_ = EpollStatus::TEARDOWN; } int Poller::AddFdEvent(uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept { auto wakeData = std::unique_ptr(new (std::nothrow) WakeDataWithCb(fd, data, cb)); epoll_event ev = { .events = events, .data = {.ptr = static_cast(wakeData.get())} }; if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) != 0) { FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno); return -1; } std::unique_lock lock(m_mapMutex); m_wakeDataMap[fd].emplace_back(std::move(wakeData)); fdEmpty_.store(false); return 0; } int Poller::DelFdEvent(int fd) noexcept { if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) { FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno); return -1; } std::unique_lock lock(m_mapMutex); m_delCntMap[fd]++; WakeUp(); return 0; } void Poller::WakeUp() noexcept { uint64_t one = 1; ssize_t n = ::write(m_wakeData.fd, &one, sizeof one); } PollerRet Poller::PollOnce(int timeout) noexcept { int realTimeout = timeout; int timerHandle = -1; PollerRet ret = PollerRet::RET_NULL; timerMutex_.lock(); if (!timerMap_.empty()) { auto cur = timerMap_.begin(); timerHandle = cur->second.handle; realTimeout = std::chrono::duration_cast( cur->first - std::chrono::steady_clock::now()).count(); if (realTimeout <= 0) { ExecuteTimerCb(cur); return PollerRet::RET_TIMER; } if (timeout != -1) { timerHandle = -1; realTimeout = timeout; } flag_ = EpollStatus::WAIT; } timerMutex_.unlock(); pollerCount_++; int nfds = epoll_wait(m_epFd, m_events.data(), m_events.size(), realTimeout); flag_ = EpollStatus::WAKE; if (nfds < 0) { FFRT_LOGE("epoll_wait error."); return PollerRet::RET_NULL; } if (nfds == 0) { if (timerHandle != -1) { timerMutex_.lock(); for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) { if (it->second.handle == timerHandle) { ExecuteTimerCb(it); return PollerRet::RET_TIMER; } } timerMutex_.unlock(); } return PollerRet::RET_NULL; } for (unsigned int i = 0; i < static_cast(nfds); ++i) { struct WakeDataWithCb *data = reinterpret_cast(m_events[i].data.ptr); int currFd = data->fd; if (currFd == m_wakeData.fd) { uint64_t one = 1; ssize_t n = ::read(m_wakeData.fd, &one, sizeof one); continue; } if (data->cb == nullptr) { continue; } data->cb(data->data, m_events[i].events, pollerCount_); } ReleaseFdWakeData(); return PollerRet::RET_EPOLL; } void Poller::ReleaseFdWakeData() noexcept { std::unique_lock lock(m_mapMutex); for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) { int delFd = delIter->first; int delCnt = delIter->second; auto& wakeDataList = m_wakeDataMap[delFd]; int diff = wakeDataList.size() - delCnt; if (diff == 0) { m_wakeDataMap.erase(delFd); m_delCntMap.erase(delIter++); continue; } else if (diff == 1) { for (int i = 0; i < delCnt - 1; i++) { wakeDataList.pop_front(); } m_delCntMap[delFd] = 1; } else { FFRT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt); } delIter++; } fdEmpty_.store(m_wakeDataMap.empty()); } void Poller::ExecuteTimerCb(std::multimap::iterator& timer) noexcept { std::vector timerData; for (auto iter = timerMap_.begin(); iter != timerMap_.end();) { if (iter->first <= timer->first) { timerData.emplace_back(iter->second); executedHandle_[iter->second.handle] = TimerStatus::EXECUTING; iter = timerMap_.erase(iter); continue; } break; } timerEmpty_.store(timerMap_.empty()); timerMutex_.unlock(); for (const auto& data : timerData) { data.cb(data.data); executedHandle_[data.handle] = TimerStatus::EXECUTED; } } int Poller::RegisterTimer(uint64_t timeout, void* data, void(*cb)(void*)) noexcept { if (cb == nullptr || flag_ == EpollStatus::TEARDOWN) { return -1; } std::lock_guard lock(timerMutex_); time_point_t absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout); bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT); TimerDataWithCb timerMapValue(data, cb); timerHandle_ += 1; timerMapValue.handle = timerHandle_; timerMap_.emplace(absoluteTime, timerMapValue); timerEmpty_.store(false); if (wake) { WakeUp(); } return timerHandle_; } void Poller::DeregisterTimer(int handle) noexcept { if (flag_ == EpollStatus::TEARDOWN) { return; } std::lock_guard lock(timerMutex_); auto it = executedHandle_.find(handle); if (it != executedHandle_.end()) { while (it->second == TimerStatus::EXECUTING) { std::this_thread::yield(); } executedHandle_.erase(it); return; } bool wake = false; for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) { if (cur->second.handle == handle) { if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) { wake = true; } timerMap_.erase(cur); break; } } timerEmpty_.store(timerMap_.empty()); if (wake) { WakeUp(); } } bool Poller::DetermineEmptyMap() noexcept { return fdEmpty_ && timerEmpty_; } ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept { if (flag_ == EpollStatus::TEARDOWN) { return ffrt_timer_notfound; } std::lock_guard lock(timerMutex_); for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) { if (cur->second.handle == handle) { return ffrt_timer_not_executed; } } auto it = executedHandle_.find(handle); if (it != executedHandle_.end()) { while (it->second == TimerStatus::EXECUTING) { std::this_thread::yield(); } return ffrt_timer_executed; } return ffrt_timer_notfound; } } #endif