1 /*
2 * Copyright (c) 2021, The OpenThread Authors.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * 3. Neither the name of the copyright holder nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26 * POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 /**
30 * @file
31 * This file implements the Task Runner that executes tasks on the mainloop.
32 */
33
34 #include "common/task_runner.hpp"
35
36 #include <algorithm>
37
38 #include <fcntl.h>
39 #include <unistd.h>
40
41 #include "common/code_utils.hpp"
42
43 namespace otbr {
44
TaskRunner(void)45 TaskRunner::TaskRunner(void)
46 : mTaskQueue(DelayedTask::Comparator{})
47 {
48 int flags;
49
50 // We do not handle failures when creating a pipe, simply die.
51 VerifyOrDie(pipe(mEventFd) != -1, strerror(errno));
52
53 flags = fcntl(mEventFd[kRead], F_GETFL, 0);
54 VerifyOrDie(fcntl(mEventFd[kRead], F_SETFL, flags | O_NONBLOCK) != -1, strerror(errno));
55 flags = fcntl(mEventFd[kWrite], F_GETFL, 0);
56 VerifyOrDie(fcntl(mEventFd[kWrite], F_SETFL, flags | O_NONBLOCK) != -1, strerror(errno));
57 }
58
~TaskRunner(void)59 TaskRunner::~TaskRunner(void)
60 {
61 if (mEventFd[kRead] != -1)
62 {
63 close(mEventFd[kRead]);
64 mEventFd[kRead] = -1;
65 }
66 if (mEventFd[kWrite] != -1)
67 {
68 close(mEventFd[kWrite]);
69 mEventFd[kWrite] = -1;
70 }
71 }
72
Post(Task<void> aTask)73 void TaskRunner::Post(Task<void> aTask)
74 {
75 Post(Milliseconds::zero(), std::move(aTask));
76 }
77
Post(Milliseconds aDelay,Task<void> aTask)78 TaskRunner::TaskId TaskRunner::Post(Milliseconds aDelay, Task<void> aTask)
79 {
80 return PushTask(aDelay, std::move(aTask));
81 }
82
Update(MainloopContext & aMainloop)83 void TaskRunner::Update(MainloopContext &aMainloop)
84 {
85 FD_SET(mEventFd[kRead], &aMainloop.mReadFdSet);
86 aMainloop.mMaxFd = std::max(mEventFd[kRead], aMainloop.mMaxFd);
87
88 {
89 std::lock_guard<std::mutex> _(mTaskQueueMutex);
90
91 if (!mTaskQueue.empty())
92 {
93 auto now = Clock::now();
94 auto &task = mTaskQueue.top();
95 auto delay = std::chrono::duration_cast<Microseconds>(task.GetTimeExecute() - now);
96 auto timeout = FromTimeval<Microseconds>(aMainloop.mTimeout);
97
98 if (task.GetTimeExecute() < now)
99 {
100 delay = Microseconds::zero();
101 }
102
103 if (delay <= timeout)
104 {
105 aMainloop.mTimeout.tv_sec = delay.count() / 1000000;
106 aMainloop.mTimeout.tv_usec = delay.count() % 1000000;
107 }
108 }
109 }
110 }
111
Process(const MainloopContext & aMainloop)112 void TaskRunner::Process(const MainloopContext &aMainloop)
113 {
114 OTBR_UNUSED_VARIABLE(aMainloop);
115
116 ssize_t rval;
117
118 // Read any data in the pipe.
119 do
120 {
121 uint8_t n;
122
123 rval = read(mEventFd[kRead], &n, sizeof(n));
124 } while (rval > 0 || (rval == -1 && errno == EINTR));
125
126 // Critical error happens, simply die.
127 VerifyOrDie(errno == EAGAIN || errno == EWOULDBLOCK, strerror(errno));
128
129 PopTasks();
130 }
131
PushTask(Milliseconds aDelay,Task<void> aTask)132 TaskRunner::TaskId TaskRunner::PushTask(Milliseconds aDelay, Task<void> aTask)
133 {
134 ssize_t rval;
135 const uint8_t kOne = 1;
136 TaskId taskId;
137
138 {
139 std::lock_guard<std::mutex> _(mTaskQueueMutex);
140
141 taskId = mNextTaskId++;
142
143 mActiveTaskIds.insert(taskId);
144 mTaskQueue.emplace(taskId, aDelay, std::move(aTask));
145 }
146
147 do
148 {
149 rval = write(mEventFd[kWrite], &kOne, sizeof(kOne));
150 } while (rval == -1 && errno == EINTR);
151
152 VerifyOrExit(rval == -1);
153
154 // Critical error happens, simply die.
155 VerifyOrDie(errno == EAGAIN || errno == EWOULDBLOCK, strerror(errno));
156
157 // We are blocked because there are already data (written by other concurrent callers in
158 // different threads) in the pipe, and the mEventFd[kRead] should be readable now.
159 otbrLogWarning("Failed to write fd %d: %s", mEventFd[kWrite], strerror(errno));
160
161 exit:
162 return taskId;
163 }
164
Cancel(TaskRunner::TaskId aTaskId)165 void TaskRunner::Cancel(TaskRunner::TaskId aTaskId)
166 {
167 std::lock_guard<std::mutex> _(mTaskQueueMutex);
168
169 mActiveTaskIds.erase(aTaskId);
170 }
171
PopTasks(void)172 void TaskRunner::PopTasks(void)
173 {
174 while (true)
175 {
176 Task<void> task;
177 bool canceled;
178
179 // The braces here are necessary for auto-releasing of the mutex.
180 {
181 std::lock_guard<std::mutex> _(mTaskQueueMutex);
182
183 if (!mTaskQueue.empty() && mTaskQueue.top().GetTimeExecute() <= Clock::now())
184 {
185 const DelayedTask &top = mTaskQueue.top();
186 TaskId taskId = top.mTaskId;
187
188 task = std::move(top.mTask);
189 mTaskQueue.pop();
190 canceled = (mActiveTaskIds.erase(taskId) == 0);
191 }
192 else
193 {
194 break;
195 }
196 }
197
198 if (!canceled)
199 {
200 task();
201 }
202 }
203 }
204
205 } // namespace otbr
206