1 //===-- BackgroundQueue.cpp - Task queue for background index -------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8
9 #include "index/Background.h"
10 #include "support/Logger.h"
11
12 namespace clang {
13 namespace clangd {
14
15 static std::atomic<bool> PreventStarvation = {false};
16
preventThreadStarvationInTests()17 void BackgroundQueue::preventThreadStarvationInTests() {
18 PreventStarvation.store(true);
19 }
20
work(std::function<void ()> OnIdle)21 void BackgroundQueue::work(std::function<void()> OnIdle) {
22 while (true) {
23 llvm::Optional<Task> Task;
24 {
25 std::unique_lock<std::mutex> Lock(Mu);
26 CV.wait(Lock, [&] { return ShouldStop || !Queue.empty(); });
27 if (ShouldStop) {
28 Queue.clear();
29 CV.notify_all();
30 return;
31 }
32 ++Stat.Active;
33 std::pop_heap(Queue.begin(), Queue.end());
34 Task = std::move(Queue.back());
35 Queue.pop_back();
36 notifyProgress();
37 }
38
39 if (Task->ThreadPri != llvm::ThreadPriority::Default &&
40 !PreventStarvation.load())
41 llvm::set_thread_priority(Task->ThreadPri);
42 Task->Run();
43 if (Task->ThreadPri != llvm::ThreadPriority::Default)
44 llvm::set_thread_priority(llvm::ThreadPriority::Default);
45
46 {
47 std::unique_lock<std::mutex> Lock(Mu);
48 ++Stat.Completed;
49 if (Stat.Active == 1 && Queue.empty()) {
50 // We just finished the last item, the queue is going idle.
51 assert(ShouldStop || Stat.Completed == Stat.Enqueued);
52 Stat.LastIdle = Stat.Completed;
53 if (OnIdle) {
54 Lock.unlock();
55 OnIdle();
56 Lock.lock();
57 }
58 }
59 assert(Stat.Active > 0 && "before decrementing");
60 --Stat.Active;
61 notifyProgress();
62 }
63 CV.notify_all();
64 }
65 }
66
stop()67 void BackgroundQueue::stop() {
68 {
69 std::lock_guard<std::mutex> QueueLock(Mu);
70 ShouldStop = true;
71 }
72 CV.notify_all();
73 }
74
push(Task T)75 void BackgroundQueue::push(Task T) {
76 {
77 std::lock_guard<std::mutex> Lock(Mu);
78 T.QueuePri = std::max(T.QueuePri, Boosts.lookup(T.Tag));
79 Queue.push_back(std::move(T));
80 std::push_heap(Queue.begin(), Queue.end());
81 ++Stat.Enqueued;
82 notifyProgress();
83 }
84 CV.notify_all();
85 }
86
append(std::vector<Task> Tasks)87 void BackgroundQueue::append(std::vector<Task> Tasks) {
88 {
89 std::lock_guard<std::mutex> Lock(Mu);
90 for (Task &T : Tasks)
91 T.QueuePri = std::max(T.QueuePri, Boosts.lookup(T.Tag));
92 std::move(Tasks.begin(), Tasks.end(), std::back_inserter(Queue));
93 std::make_heap(Queue.begin(), Queue.end());
94 Stat.Enqueued += Tasks.size();
95 notifyProgress();
96 }
97 CV.notify_all();
98 }
99
boost(llvm::StringRef Tag,unsigned NewPriority)100 void BackgroundQueue::boost(llvm::StringRef Tag, unsigned NewPriority) {
101 std::lock_guard<std::mutex> Lock(Mu);
102 unsigned &Boost = Boosts[Tag];
103 bool Increase = NewPriority > Boost;
104 Boost = NewPriority;
105 if (!Increase)
106 return; // existing tasks unaffected
107
108 unsigned Changes = 0;
109 for (Task &T : Queue)
110 if (Tag == T.Tag && NewPriority > T.QueuePri) {
111 T.QueuePri = NewPriority;
112 ++Changes;
113 }
114 if (Changes)
115 std::make_heap(Queue.begin(), Queue.end());
116 // No need to signal, only rearranged items in the queue.
117 }
118
blockUntilIdleForTest(llvm::Optional<double> TimeoutSeconds)119 bool BackgroundQueue::blockUntilIdleForTest(
120 llvm::Optional<double> TimeoutSeconds) {
121 std::unique_lock<std::mutex> Lock(Mu);
122 return wait(Lock, CV, timeoutSeconds(TimeoutSeconds),
123 [&] { return Queue.empty() && Stat.Active == 0; });
124 }
125
notifyProgress() const126 void BackgroundQueue::notifyProgress() const {
127 dlog("Queue: {0}/{1} ({2} active). Last idle at {3}", Stat.Completed,
128 Stat.Enqueued, Stat.Active, Stat.LastIdle);
129 if (OnProgress)
130 OnProgress(Stat);
131 }
132
133 } // namespace clangd
134 } // namespace clang
135