1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_async/fake_dispatcher.h"
16
17 #include "pw_async/task.h"
18 #include "pw_log/log.h"
19
20 using namespace std::chrono_literals;
21
22 namespace pw::async::test::backend {
23
NativeFakeDispatcher(Dispatcher & dispatcher)24 NativeFakeDispatcher::NativeFakeDispatcher(Dispatcher& dispatcher)
25 : dispatcher_(dispatcher) {}
26
~NativeFakeDispatcher()27 NativeFakeDispatcher::~NativeFakeDispatcher() {
28 RequestStop();
29 DrainTaskQueue();
30 }
31
RunUntilIdle()32 void NativeFakeDispatcher::RunUntilIdle() {
33 ExecuteDueTasks();
34 if (stop_requested_) {
35 DrainTaskQueue();
36 }
37 }
38
RunUntil(chrono::SystemClock::time_point end_time)39 void NativeFakeDispatcher::RunUntil(chrono::SystemClock::time_point end_time) {
40 while (!task_queue_.empty() && task_queue_.front().due_time() <= end_time &&
41 !stop_requested_) {
42 now_ = task_queue_.front().due_time();
43 ExecuteDueTasks();
44 }
45
46 if (stop_requested_) {
47 DrainTaskQueue();
48 return;
49 }
50
51 if (now_ < end_time) {
52 now_ = end_time;
53 }
54 }
55
RunFor(chrono::SystemClock::duration duration)56 void NativeFakeDispatcher::RunFor(chrono::SystemClock::duration duration) {
57 RunUntil(now() + duration);
58 }
59
ExecuteDueTasks()60 void NativeFakeDispatcher::ExecuteDueTasks() {
61 while (!task_queue_.empty() && task_queue_.front().due_time() <= now() &&
62 !stop_requested_) {
63 ::pw::async::backend::NativeTask& task = task_queue_.front();
64 task_queue_.pop_front();
65
66 if (task.interval().has_value()) {
67 PostTaskInternal(task, task.due_time() + task.interval().value());
68 }
69
70 Context ctx{&dispatcher_, &task.task_};
71 task(ctx, OkStatus());
72 }
73 }
74
RequestStop()75 void NativeFakeDispatcher::RequestStop() {
76 PW_LOG_DEBUG("stop requested");
77 stop_requested_ = true;
78 }
79
DrainTaskQueue()80 void NativeFakeDispatcher::DrainTaskQueue() {
81 while (!task_queue_.empty()) {
82 ::pw::async::backend::NativeTask& task = task_queue_.front();
83 task_queue_.pop_front();
84
85 PW_LOG_DEBUG("running cancelled task");
86 Context ctx{&dispatcher_, &task.task_};
87 task(ctx, Status::Cancelled());
88 }
89 }
90
Post(Task & task)91 void NativeFakeDispatcher::Post(Task& task) { PostAt(task, now()); }
92
PostAfter(Task & task,chrono::SystemClock::duration delay)93 void NativeFakeDispatcher::PostAfter(Task& task,
94 chrono::SystemClock::duration delay) {
95 PostAt(task, now() + delay);
96 }
97
PostAt(Task & task,chrono::SystemClock::time_point time)98 void NativeFakeDispatcher::PostAt(Task& task,
99 chrono::SystemClock::time_point time) {
100 PW_LOG_DEBUG("posting task");
101 PostTaskInternal(task.native_type(), time);
102 }
103
PostPeriodic(Task & task,chrono::SystemClock::duration interval)104 void NativeFakeDispatcher::PostPeriodic(
105 Task& task, chrono::SystemClock::duration interval) {
106 PostPeriodicAt(task, interval, now());
107 }
108
PostPeriodicAt(Task & task,chrono::SystemClock::duration interval,chrono::SystemClock::time_point start_time)109 void NativeFakeDispatcher::PostPeriodicAt(
110 Task& task,
111 chrono::SystemClock::duration interval,
112 chrono::SystemClock::time_point start_time) {
113 task.native_type().set_interval(interval);
114 PostAt(task, start_time);
115 }
116
Cancel(Task & task)117 bool NativeFakeDispatcher::Cancel(Task& task) {
118 return task_queue_.remove(task.native_type());
119 }
120
PostTaskInternal(::pw::async::backend::NativeTask & task,chrono::SystemClock::time_point time_due)121 void NativeFakeDispatcher::PostTaskInternal(
122 ::pw::async::backend::NativeTask& task,
123 chrono::SystemClock::time_point time_due) {
124 task.set_due_time(time_due);
125 auto it_front = task_queue_.begin();
126 auto it_behind = task_queue_.before_begin();
127 while (it_front != task_queue_.end() && time_due > it_front->due_time()) {
128 ++it_front;
129 ++it_behind;
130 }
131 task_queue_.insert_after(it_behind, task);
132 }
133
134 } // namespace pw::async::test::backend
135