// Copyright 2015 The Weave Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "examples/provider/event_task_runner.h" #include namespace weave { namespace examples { namespace { event_base* g_event_base = nullptr; } void EventTaskRunner::PostDelayedTask( const tracked_objects::Location& from_here, const base::Closure& task, base::TimeDelta delay) { base::Time new_time = base::Time::Now() + delay; if (queue_.empty() || new_time < queue_.top().first.first) { ReScheduleEvent(delay); } queue_.emplace(std::make_pair(new_time, ++counter_), task); } void EventTaskRunner::AddIoCompletionTask( int fd, int16_t what, const EventTaskRunner::IoCompletionCallback& task) { int16_t flags = EV_PERSIST | EV_ET; flags |= (what & kReadable) ? EV_READ : 0; flags |= (what & kWriteable) ? EV_WRITE : 0; #if LIBEVENT_VERSION_NUMBER >= 0x02010400 flags |= (what & kClosed) ? EV_CLOSED : 0; #endif event* ioevent = event_new(base_.get(), fd, flags, FdEventHandler, this); EventPtr ioeventPtr{ioevent}; fd_task_map_.insert( std::make_pair(fd, std::make_pair(std::move(ioeventPtr), task))); event_add(ioevent, nullptr); } void EventTaskRunner::RemoveIoCompletionTask(int fd) { fd_task_map_.erase(fd); } void EventTaskRunner::Run() { g_event_base = base_.get(); struct sigaction sa = {}; sa.sa_handler = [](int signal) { event_base_loopexit(g_event_base, nullptr); }; sigfillset(&sa.sa_mask); sigaction(SIGINT, &sa, nullptr); do { event_base_loop(g_event_base, EVLOOP_ONCE); } while (!event_base_got_exit(g_event_base)); g_event_base = nullptr; } void EventTaskRunner::ReScheduleEvent(base::TimeDelta delay) { timespec ts = delay.ToTimeSpec(); timeval tv = {ts.tv_sec, ts.tv_nsec / 1000}; event_add(task_event_.get(), &tv); } void EventTaskRunner::EventHandler(int /* fd */, int16_t /* what */, void* runner) { static_cast(runner)->Process(); } void EventTaskRunner::FreeEvent(event* evnt) { event_del(evnt); event_free(evnt); } void EventTaskRunner::Process() { while (!queue_.empty() && queue_.top().first.first <= base::Time::Now()) { auto cb = queue_.top().second; queue_.pop(); cb.Run(); } if (!queue_.empty()) { base::TimeDelta delta = std::max( base::TimeDelta(), queue_.top().first.first - base::Time::Now()); ReScheduleEvent(delta); } } void EventTaskRunner::FdEventHandler(int fd, int16_t what, void* runner) { static_cast(runner)->ProcessFd(fd, what); } void EventTaskRunner::ProcessFd(int fd, int16_t what) { auto it = fd_task_map_.find(fd); if (it != fd_task_map_.end()) { const IoCompletionCallback& callback = it->second.second; callback.Run(fd, what, this); } } } // namespace examples } // namespace weave