• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_VERIFIER_JOB_QUEUE_JOB_QUEUE_H
17 #define PANDA_VERIFIER_JOB_QUEUE_JOB_QUEUE_H
18 
19 #include "libpandabase/macros.h"
20 
21 #include "runtime/include/method.h"
22 #include "runtime/thread_pool.h"
23 
24 #include "verification/absint/panda_types.h"
25 #include "verification/jobs/cache.h"
26 #include "verification/jobs/job.h"
27 #include "verification/util/optional_ref.h"
28 
29 #include <atomic>
30 
31 namespace panda::verifier {
32 
33 class Task final : public TaskInterface {
34 public:
35     DEFAULT_COPY_SEMANTIC(Task);
36     DEFAULT_MOVE_SEMANTIC(Task);
37 
Task()38     Task() : opt_method_ {} {}
39 
Task(Method & method)40     explicit Task(Method &method) : opt_method_ {method} {}
41 
IsEmpty()42     bool IsEmpty() const
43     {
44         return !opt_method_.HasRef();
45     }
46 
GetMethod()47     Method &GetMethod()
48     {
49         return opt_method_.Get();
50     }
51 
52 private:
53     OptionalRef<Method> opt_method_;
54 };
55 
56 class TaskQueue final : public TaskQueueInterface<Task> {
57 public:
TaskQueue(mem::InternalAllocatorPtr allocator)58     explicit TaskQueue(mem::InternalAllocatorPtr allocator)
59         : TaskQueueInterface<Task>(std::numeric_limits<size_t>::max()), queue_(allocator->Adapter())
60     {
61     }
62 
63     ~TaskQueue() = default;
64     NO_COPY_SEMANTIC(TaskQueue);
65     NO_MOVE_SEMANTIC(TaskQueue);
66 
GetTask()67     Task GetTask() override
68     {
69         if (queue_.empty()) {
70             LOG(DEBUG, VERIFIER) << "No jobs in the verifier queue";
71             return {};
72         }
73 
74         Task task {std::move(queue_.front())};
75         queue_.pop_front();
76         return task;
77     }
78 
79     // NOLINTNEXTLINE(google-default-arguments)
80     void AddTask(Task task, [[maybe_unused]] size_t priority = 0) override
81     {
82         queue_.push_back(std::move(task));
83     }
84 
Finalize()85     void Finalize() override
86     {
87         queue_.clear();
88     }
89 
90 protected:
GetQueueSize()91     size_t GetQueueSize() override
92     {
93         return queue_.size();
94     }
95 
96 private:
97     PandaDeque<Task> queue_;
98 };
99 
100 class Processor final : public ProcessorInterface<Task, std::monostate> {
101 public:
Processor(std::monostate dummy)102     explicit Processor(std::monostate dummy)
103         // Atomic with acq_rel order reason: could be relaxed, but we need to allow reinitialization for tests
104         : types_ {next_thread_num_.fetch_add(1, std::memory_order_acq_rel)}
105     {
106         // GCC 8 doesn't accept [[maybe_unused]] in this particular case
107         (void)dummy;
108     }
109     ~Processor() = default;
110     NO_COPY_SEMANTIC(Processor);
111     NO_MOVE_SEMANTIC(Processor);
112 
113     bool Init() override;
114     bool Process(Task task) override;
115     bool Destroy() override;
116 
117 private:
118     PandaTypes types_;
119     inline static std::atomic<ThreadNum> next_thread_num_ {0};
120     friend class ThreadPool;
121 };
122 
123 class ThreadPool {
124 public:
125     NO_COPY_SEMANTIC(ThreadPool);
126     NO_MOVE_SEMANTIC(ThreadPool);
127 
128     static void Initialize(mem::InternalAllocatorPtr allocator, size_t num_threads);
129     static void Destroy();
130 
GetCache()131     static OptionalRef<LibCache> GetCache()
132     {
133         Data *data = GetData();
134         if (data == nullptr) {
135             return {};
136         }
137         return data->cache;
138     }
139 
140     static bool Enqueue(Method *method);
141 
142     // TODO(romanov) Current API implies waking everyone waiting for a method when _any_ method is verified.
143     // This means a lot of unnecessary wakeups. It would be good to reduce their number, but need to decide how exactly.
144     // E.g. pass Method& here and to SignalMethodVerified? Include a condition variable to wait on in a Job?
145     template <typename Handler, typename FailureHandler>
WaitForVerification(Handler && continue_waiting,FailureHandler && failure_handler)146     static void WaitForVerification(Handler &&continue_waiting, FailureHandler &&failure_handler)
147     {
148         Data *data = GetData();
149         if (data == nullptr) {
150             return;
151         }
152         panda::os::memory::LockHolder lck {data->lock};
153         while (continue_waiting()) {
154             if (GetData() == nullptr) {
155                 failure_handler();
156                 return;
157             }
158             constexpr uint64_t quantum = 500;
159             data->cond_var.TimedWait(&data->lock, quantum);
160         }
161     }
162     static void SignalMethodVerified();
163 
164 private:
165     struct Data {
166         LibCache cache;
167         TaskQueue queue;
168         panda::ThreadPool<Task, Processor, std::monostate> thread_pool;
169         panda::os::memory::Mutex lock;
170         panda::os::memory::ConditionVariable cond_var GUARDED_BY(lock);
171 
172         NO_COPY_SEMANTIC(Data);
173         NO_MOVE_SEMANTIC(Data);
DataData174         Data(mem::InternalAllocatorPtr allocator, size_t num_threads)
175             : cache {}, queue {allocator}, thread_pool {allocator, &queue, std::monostate {}, num_threads, "verifier"}
176         {
177         }
178     };
179 
180     static Data *GetData(bool allow_shutting_down = false)
181     {
182         // Atomic with acquire order reason: data race with shutdown_ with dependecies on reads after the load which
183         // should become visible
184         if (!allow_shutting_down && shutdown_.load(std::memory_order_acquire)) {
185             return nullptr;
186         }
187         // Atomic with seq_cst order reason: data race with data_ with requirement for sequentially consistent order
188         // where threads observe all modifications in the same order
189         return data_.load(std::memory_order_seq_cst);
190     }
191 
192     inline static mem::InternalAllocatorPtr allocator_ {nullptr};
193     inline static std::atomic<Data *> data_ {nullptr};
194     inline static std::atomic<bool> shutdown_ {false};
195 
196     friend class Processor;
197 };
198 
199 }  // namespace panda::verifier
200 
201 #endif  // !PANDA_VERIFIER_JOB_QUEUE_JOB_QUEUE_H
202