• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *    Copyright (c) 2021, The OpenThread Authors.
3  *    All rights reserved.
4  *
5  *    Redistribution and use in source and binary forms, with or without
6  *    modification, are permitted provided that the following conditions are met:
7  *    1. Redistributions of source code must retain the above copyright
8  *       notice, this list of conditions and the following disclaimer.
9  *    2. Redistributions in binary form must reproduce the above copyright
10  *       notice, this list of conditions and the following disclaimer in the
11  *       documentation and/or other materials provided with the distribution.
12  *    3. Neither the name of the copyright holder nor the
13  *       names of its contributors may be used to endorse or promote products
14  *       derived from this software without specific prior written permission.
15  *
16  *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  *    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  *    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  *    ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20  *    LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  *    CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  *    SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  *    INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  *    CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  *    ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  *    POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /**
30  * @file
31  * This file implements the Task Runner that executes tasks on the mainloop.
32  */
33 
34 #include "common/task_runner.hpp"
35 
36 #include <algorithm>
37 
38 #include <fcntl.h>
39 #include <unistd.h>
40 
41 #include "common/code_utils.hpp"
42 
43 namespace otbr {
44 
TaskRunner(void)45 TaskRunner::TaskRunner(void)
46     : mTaskQueue(DelayedTask::Comparator{})
47 {
48     int flags;
49 
50     // We do not handle failures when creating a pipe, simply die.
51     VerifyOrDie(pipe(mEventFd) != -1, strerror(errno));
52 
53     flags = fcntl(mEventFd[kRead], F_GETFL, 0);
54     VerifyOrDie(fcntl(mEventFd[kRead], F_SETFL, flags | O_NONBLOCK) != -1, strerror(errno));
55     flags = fcntl(mEventFd[kWrite], F_GETFL, 0);
56     VerifyOrDie(fcntl(mEventFd[kWrite], F_SETFL, flags | O_NONBLOCK) != -1, strerror(errno));
57 }
58 
~TaskRunner(void)59 TaskRunner::~TaskRunner(void)
60 {
61     if (mEventFd[kRead] != -1)
62     {
63         close(mEventFd[kRead]);
64         mEventFd[kRead] = -1;
65     }
66     if (mEventFd[kWrite] != -1)
67     {
68         close(mEventFd[kWrite]);
69         mEventFd[kWrite] = -1;
70     }
71 }
72 
Post(Task<void> aTask)73 void TaskRunner::Post(Task<void> aTask)
74 {
75     Post(Milliseconds::zero(), std::move(aTask));
76 }
77 
Post(Milliseconds aDelay,Task<void> aTask)78 TaskRunner::TaskId TaskRunner::Post(Milliseconds aDelay, Task<void> aTask)
79 {
80     return PushTask(aDelay, std::move(aTask));
81 }
82 
Update(MainloopContext & aMainloop)83 void TaskRunner::Update(MainloopContext &aMainloop)
84 {
85     FD_SET(mEventFd[kRead], &aMainloop.mReadFdSet);
86     aMainloop.mMaxFd = std::max(mEventFd[kRead], aMainloop.mMaxFd);
87 
88     {
89         std::lock_guard<std::mutex> _(mTaskQueueMutex);
90 
91         if (!mTaskQueue.empty())
92         {
93             auto  now     = Clock::now();
94             auto &task    = mTaskQueue.top();
95             auto  delay   = std::chrono::duration_cast<Microseconds>(task.GetTimeExecute() - now);
96             auto  timeout = FromTimeval<Microseconds>(aMainloop.mTimeout);
97 
98             if (task.GetTimeExecute() < now)
99             {
100                 delay = Microseconds::zero();
101             }
102 
103             if (delay <= timeout)
104             {
105                 aMainloop.mTimeout.tv_sec  = delay.count() / 1000000;
106                 aMainloop.mTimeout.tv_usec = delay.count() % 1000000;
107             }
108         }
109     }
110 }
111 
Process(const MainloopContext & aMainloop)112 void TaskRunner::Process(const MainloopContext &aMainloop)
113 {
114     OTBR_UNUSED_VARIABLE(aMainloop);
115 
116     ssize_t rval;
117 
118     // Read any data in the pipe.
119     do
120     {
121         uint8_t n;
122 
123         rval = read(mEventFd[kRead], &n, sizeof(n));
124     } while (rval > 0 || (rval == -1 && errno == EINTR));
125 
126     // Critical error happens, simply die.
127     VerifyOrDie(errno == EAGAIN || errno == EWOULDBLOCK, strerror(errno));
128 
129     PopTasks();
130 }
131 
PushTask(Milliseconds aDelay,Task<void> aTask)132 TaskRunner::TaskId TaskRunner::PushTask(Milliseconds aDelay, Task<void> aTask)
133 {
134     ssize_t       rval;
135     const uint8_t kOne = 1;
136     TaskId        taskId;
137 
138     {
139         std::lock_guard<std::mutex> _(mTaskQueueMutex);
140 
141         taskId = mNextTaskId++;
142 
143         mActiveTaskIds.insert(taskId);
144         mTaskQueue.emplace(taskId, aDelay, std::move(aTask));
145     }
146 
147     do
148     {
149         rval = write(mEventFd[kWrite], &kOne, sizeof(kOne));
150     } while (rval == -1 && errno == EINTR);
151 
152     VerifyOrExit(rval == -1);
153 
154     // Critical error happens, simply die.
155     VerifyOrDie(errno == EAGAIN || errno == EWOULDBLOCK, strerror(errno));
156 
157     // We are blocked because there are already data (written by other concurrent callers in
158     // different threads) in the pipe, and the mEventFd[kRead] should be readable now.
159     otbrLogWarning("Failed to write fd %d: %s", mEventFd[kWrite], strerror(errno));
160 
161 exit:
162     return taskId;
163 }
164 
Cancel(TaskRunner::TaskId aTaskId)165 void TaskRunner::Cancel(TaskRunner::TaskId aTaskId)
166 {
167     std::lock_guard<std::mutex> _(mTaskQueueMutex);
168 
169     mActiveTaskIds.erase(aTaskId);
170 }
171 
PopTasks(void)172 void TaskRunner::PopTasks(void)
173 {
174     while (true)
175     {
176         Task<void> task;
177         bool       canceled;
178 
179         // The braces here are necessary for auto-releasing of the mutex.
180         {
181             std::lock_guard<std::mutex> _(mTaskQueueMutex);
182 
183             if (!mTaskQueue.empty() && mTaskQueue.top().GetTimeExecute() <= Clock::now())
184             {
185                 const DelayedTask &top    = mTaskQueue.top();
186                 TaskId             taskId = top.mTaskId;
187 
188                 task = std::move(top.mTask);
189                 mTaskQueue.pop();
190                 canceled = (mActiveTaskIds.erase(taskId) == 0);
191             }
192             else
193             {
194                 break;
195             }
196         }
197 
198         if (!canceled)
199         {
200             task();
201         }
202     }
203 }
204 
205 } // namespace otbr
206