1 // Copyright 2019 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef PLATFORM_IMPL_TASK_RUNNER_H_ 6 #define PLATFORM_IMPL_TASK_RUNNER_H_ 7 8 #include <condition_variable> // NOLINT 9 #include <map> 10 #include <memory> 11 #include <mutex> 12 #include <thread> 13 #include <utility> 14 #include <vector> 15 16 #include "absl/base/thread_annotations.h" 17 #include "absl/types/optional.h" 18 #include "platform/api/task_runner.h" 19 #include "platform/api/time.h" 20 #include "platform/base/error.h" 21 #include "util/trace_logging.h" 22 23 namespace openscreen { 24 25 class TaskRunnerImpl final : public TaskRunner { 26 public: 27 using Task = TaskRunner::Task; 28 29 class TaskWaiter { 30 public: 31 virtual ~TaskWaiter() = default; 32 33 // These calls should be thread-safe. The absolute minimum is that 34 // OnTaskPosted must be safe to call from another thread while this is 35 // inside WaitForTaskToBePosted. NOTE: There may be spurious wakeups from 36 // WaitForTaskToBePosted depending on whether the specific implementation 37 // chooses to clear queued WakeUps before entering WaitForTaskToBePosted. 38 39 // Blocks until some event occurs, which means new tasks may have been 40 // posted. Wait may only block up to |timeout| where 0 means don't block at 41 // all (not block forever). 42 virtual Error WaitForTaskToBePosted(Clock::duration timeout) = 0; 43 44 // If a WaitForTaskToBePosted call is currently blocking, unblock it 45 // immediately. 46 virtual void OnTaskPosted() = 0; 47 }; 48 49 explicit TaskRunnerImpl( 50 ClockNowFunctionPtr now_function, 51 TaskWaiter* event_waiter = nullptr, 52 Clock::duration waiter_timeout = std::chrono::milliseconds(100)); 53 54 // TaskRunner overrides 55 ~TaskRunnerImpl() final; 56 void PostPackagedTask(Task task) final; 57 void PostPackagedTaskWithDelay(Task task, Clock::duration delay) final; 58 bool IsRunningOnTaskRunner() final; 59 60 // Blocks the current thread, executing tasks from the queue with the desired 61 // timing; and does not return until some time after RequestStopSoon() is 62 // called. 63 void RunUntilStopped(); 64 65 // Blocks the current thread, executing tasks from the queue with the desired 66 // timing; and does not return until some time after the current process is 67 // signaled with SIGINT or SIGTERM, or after RequestStopSoon() is called. 68 void RunUntilSignaled(); 69 70 // Thread-safe method for requesting the TaskRunner to stop running after all 71 // non-delayed tasks in the queue have run. This behavior allows final 72 // clean-up tasks to be executed before the TaskRunner stops. 73 // 74 // If any non-delayed tasks post additional non-delayed tasks, those will be 75 // run as well before returning. 76 void RequestStopSoon(); 77 78 private: 79 #if defined(ENABLE_TRACE_LOGGING) 80 // Wrapper around a Task used to store the TraceId Metadata along with the 81 // task itself, and to set the current TraceIdHierarchy before executing the 82 // task. 83 class TaskWithMetadata { 84 public: 85 // NOTE: 'explicit' keyword omitted so that conversion construtor can be 86 // used. This simplifies switching between 'Task' and 'TaskWithMetadata' 87 // based on the compilation flag. TaskWithMetadata(Task task)88 TaskWithMetadata(Task task) // NOLINT 89 : task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {} 90 operator()91 void operator()() { 92 TRACE_SET_HIERARCHY(trace_ids_); 93 std::move(task_)(); 94 } 95 96 private: 97 Task task_; 98 TraceIdHierarchy trace_ids_; 99 }; 100 #else // !defined(ENABLE_TRACE_LOGGING) 101 using TaskWithMetadata = Task; 102 #endif // defined(ENABLE_TRACE_LOGGING) 103 104 // Helper that runs all tasks in |running_tasks_| and then clears it. 105 void RunRunnableTasks(); 106 107 // Look at all tasks in the delayed task queue, then schedule them if the 108 // minimum delay time has elapsed. 109 void ScheduleDelayedTasks(); 110 111 // Transfers all ready-to-run tasks from |tasks_| to |running_tasks_|. If 112 // there are no ready-to-run tasks, and |is_running_| is true, this method 113 // will block waiting for new tasks. Returns true if any tasks were 114 // transferred. 115 bool GrabMoreRunnableTasks(); 116 117 const ClockNowFunctionPtr now_function_; 118 119 // Flag that indicates whether the task runner loop should continue. This is 120 // only meant to be read/written on the thread executing RunUntilStopped(). 121 bool is_running_; 122 123 // This mutex is used for |tasks_| and |delayed_tasks_|, and also for 124 // notifying the run loop to wake up when it is waiting for a task to be added 125 // to the queue in |run_loop_wakeup_|. 126 std::mutex task_mutex_; 127 std::vector<TaskWithMetadata> tasks_ GUARDED_BY(task_mutex_); 128 std::multimap<Clock::time_point, TaskWithMetadata> delayed_tasks_ 129 GUARDED_BY(task_mutex_); 130 131 // When |task_waiter_| is nullptr, |run_loop_wakeup_| is used for sleeping the 132 // task runner. Otherwise, |run_loop_wakeup_| isn't used and |task_waiter_| 133 // is used instead (along with |waiter_timeout_|). 134 std::condition_variable run_loop_wakeup_; 135 TaskWaiter* const task_waiter_; 136 Clock::duration waiter_timeout_; 137 138 // To prevent excessive re-allocation of the underlying array of the |tasks_| 139 // vector, use an A/B vector-swap mechanism. |running_tasks_| starts out 140 // empty, and is swapped with |tasks_| when it is time to run the Tasks. 141 std::vector<TaskWithMetadata> running_tasks_; 142 143 std::thread::id task_runner_thread_id_; 144 145 OSP_DISALLOW_COPY_AND_ASSIGN(TaskRunnerImpl); 146 }; 147 } // namespace openscreen 148 149 #endif // PLATFORM_IMPL_TASK_RUNNER_H_ 150