// Copyright 2018 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "net/base/prioritized_task_runner.h" #include #include #include #include #include "base/functional/bind.h" #include "base/functional/callback_helpers.h" #include "base/location.h" #include "base/rand_util.h" #include "base/run_loop.h" #include "base/strings/string_number_conversions.h" #include "base/strings/string_util.h" #include "base/synchronization/lock.h" #include "base/synchronization/waitable_event.h" #include "base/task/sequenced_task_runner.h" #include "base/task/thread_pool.h" #include "base/test/task_environment.h" #include "base/threading/thread_restrictions.h" #include "testing/gtest/include/gtest/gtest.h" namespace net { namespace { class PrioritizedTaskRunnerTest : public testing::Test { public: PrioritizedTaskRunnerTest() = default; PrioritizedTaskRunnerTest(const PrioritizedTaskRunnerTest&) = delete; PrioritizedTaskRunnerTest& operator=(const PrioritizedTaskRunnerTest&) = delete; void PushName(const std::string& task_name) { base::AutoLock auto_lock(callback_names_lock_); callback_names_.push_back(task_name); } std::string PushNameWithResult(const std::string& task_name) { PushName(task_name); std::string reply_name = task_name; base::ReplaceSubstringsAfterOffset(&reply_name, 0, "Task", "Reply"); return reply_name; } std::vector TaskOrder() { std::vector out; for (const std::string& name : callback_names_) { if (name.starts_with("Task")) { out.push_back(name); } } return out; } std::vector ReplyOrder() { std::vector out; for (const std::string& name : callback_names_) { if (name.starts_with("Reply")) { out.push_back(name); } } return out; } // Adds a task to the task runner and waits for it to execute. void ProcessTaskRunner(base::TaskRunner* task_runner) { // Use a waitable event instead of a run loop as we need to be careful not // to run any tasks on this task runner while waiting. base::WaitableEvent waitable_event; task_runner->PostTask(FROM_HERE, base::BindOnce( [](base::WaitableEvent* waitable_event) { waitable_event->Signal(); }, &waitable_event)); base::ScopedAllowBaseSyncPrimitivesForTesting sync; waitable_event.Wait(); } // Adds a task to the |task_runner|, forcing it to wait for a conditional. // Call ReleaseTaskRunner to continue. void BlockTaskRunner(base::TaskRunner* task_runner) { waitable_event_.Reset(); auto wait_function = [](base::WaitableEvent* waitable_event) { base::ScopedAllowBaseSyncPrimitivesForTesting sync; waitable_event->Wait(); }; task_runner->PostTask(FROM_HERE, base::BindOnce(wait_function, &waitable_event_)); } // Signals the task runner's conditional so that it can continue after calling // BlockTaskRunner. void ReleaseTaskRunner() { waitable_event_.Signal(); } protected: base::test::TaskEnvironment task_environment_; std::vector callback_names_; base::Lock callback_names_lock_; base::WaitableEvent waitable_event_; }; TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyThreadCheck) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); base::RunLoop run_loop; auto thread_check = [](scoped_refptr expected_task_runner, base::OnceClosure callback) { EXPECT_TRUE(expected_task_runner->RunsTasksInCurrentSequence()); std::move(callback).Run(); }; prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(thread_check, task_runner, base::DoNothing()), base::BindOnce(thread_check, task_environment_.GetMainThreadTaskRunner(), run_loop.QuitClosure()), 0); run_loop.Run(); } TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyRunsBothTasks) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Task"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Reply"), 0); // Run the TaskRunner and both the Task and Reply should run. task_environment_.RunUntilIdle(); EXPECT_EQ((std::vector{"Task", "Reply"}), callback_names_); } TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyTestPriority) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); BlockTaskRunner(task_runner.get()); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Task5"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Reply5"), 5); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Task0"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Reply0"), 0); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Task7"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Reply7"), 7); ReleaseTaskRunner(); // Run the TaskRunner and all of the tasks and replies should have run, in // priority order. task_environment_.RunUntilIdle(); EXPECT_EQ((std::vector{"Task0", "Task5", "Task7"}), TaskOrder()); EXPECT_EQ((std::vector{"Reply0", "Reply5", "Reply7"}), ReplyOrder()); } // Ensure that replies are run in priority order. TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyTestReplyPriority) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); // Add a couple of tasks to run right away, but don't run their replies yet. BlockTaskRunner(task_runner.get()); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Task2"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Reply2"), 2); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Task1"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Reply1"), 1); ReleaseTaskRunner(); // Run the current tasks (but not their replies). ProcessTaskRunner(task_runner.get()); // Now post task 0 (highest priority) and run it. None of the replies have // been processed yet, so its reply should skip to the head of the queue. prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Task0"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "Reply0"), 0); ProcessTaskRunner(task_runner.get()); // Run the replies. task_environment_.RunUntilIdle(); EXPECT_EQ((std::vector{"Task1", "Task2", "Task0"}), TaskOrder()); EXPECT_EQ((std::vector{"Reply0", "Reply1", "Reply2"}), ReplyOrder()); } TEST_F(PrioritizedTaskRunnerTest, PriorityOverflow) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); const uint32_t kMaxPriority = std::numeric_limits::max(); BlockTaskRunner(task_runner.get()); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "TaskMinus1"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "ReplyMinus1"), kMaxPriority - 1); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "TaskMax"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "ReplyMax"), kMaxPriority); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "TaskMaxPlus1"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), "ReplyMaxPlus1"), kMaxPriority + 1); ReleaseTaskRunner(); // Run the TaskRunner and all of the tasks and replies should have run, in // priority order. task_environment_.RunUntilIdle(); EXPECT_EQ((std::vector{"TaskMaxPlus1", "TaskMinus1", "TaskMax"}), TaskOrder()); EXPECT_EQ( (std::vector{"ReplyMaxPlus1", "ReplyMinus1", "ReplyMax"}), ReplyOrder()); } TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyWithResultRunsBothTasks) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); prioritized_task_runner->PostTaskAndReplyWithResult( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushNameWithResult, base::Unretained(this), "Task"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this)), 0); // Run the TaskRunner and both the Task and Reply should run. task_environment_.RunUntilIdle(); EXPECT_EQ((std::vector{"Task", "Reply"}), callback_names_); } TEST_F(PrioritizedTaskRunnerTest, PostTaskAndReplyWithResultTestPriority) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); BlockTaskRunner(task_runner.get()); prioritized_task_runner->PostTaskAndReplyWithResult( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushNameWithResult, base::Unretained(this), "Task0"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this)), 0); prioritized_task_runner->PostTaskAndReplyWithResult( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushNameWithResult, base::Unretained(this), "Task7"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this)), 7); prioritized_task_runner->PostTaskAndReplyWithResult( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushNameWithResult, base::Unretained(this), "Task3"), base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this)), 3); ReleaseTaskRunner(); // Run the TaskRunner and both the Task and Reply should run. task_environment_.RunUntilIdle(); EXPECT_EQ((std::vector{"Task0", "Task3", "Task7"}), TaskOrder()); EXPECT_EQ((std::vector{"Reply0", "Reply3", "Reply7"}), ReplyOrder()); } TEST_F(PrioritizedTaskRunnerTest, OrderSamePriorityByPostOrder) { auto task_runner = base::ThreadPool::CreateSequencedTaskRunner({}); auto prioritized_task_runner = base::MakeRefCounted(base::TaskTraits()); prioritized_task_runner->SetTaskRunnerForTesting(task_runner); std::vector expected; // Create 1000 tasks with random priorities between 1 and 3. Those that have // the same priorities should run in posting order. BlockTaskRunner(task_runner.get()); for (int i = 0; i < 1000; i++) { int priority = base::RandInt(0, 2); int id = (priority * 1000) + i; expected.push_back(id); prioritized_task_runner->PostTaskAndReply( FROM_HERE, base::BindOnce(&PrioritizedTaskRunnerTest::PushName, base::Unretained(this), base::NumberToString(id)), base::DoNothing(), priority); } ReleaseTaskRunner(); // This is the order the tasks should run on the queue. std::sort(expected.begin(), expected.end()); task_environment_.RunUntilIdle(); // This is the order that the tasks ran on the queue. std::vector results; for (const std::string& result : callback_names_) { int result_id; EXPECT_TRUE(base::StringToInt(result, &result_id)); results.push_back(result_id); } EXPECT_EQ(expected, results); } } // namespace } // namespace net