1 /** 2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_VERIFIER_JOB_QUEUE_JOB_QUEUE_H 17 #define PANDA_VERIFIER_JOB_QUEUE_JOB_QUEUE_H 18 19 #include "libpandabase/macros.h" 20 21 #include "runtime/include/method.h" 22 #include "runtime/thread_pool.h" 23 24 #include "verification/absint/panda_types.h" 25 #include "verification/jobs/cache.h" 26 #include "verification/jobs/job.h" 27 #include "verification/util/optional_ref.h" 28 29 #include <atomic> 30 31 namespace panda::verifier { 32 33 class Task final : public TaskInterface { 34 public: 35 DEFAULT_COPY_SEMANTIC(Task); 36 DEFAULT_MOVE_SEMANTIC(Task); 37 Task()38 Task() : opt_method_ {} {} 39 Task(Method & method)40 explicit Task(Method &method) : opt_method_ {method} {} 41 IsEmpty()42 bool IsEmpty() const 43 { 44 return !opt_method_.HasRef(); 45 } 46 GetMethod()47 Method &GetMethod() 48 { 49 return opt_method_.Get(); 50 } 51 52 private: 53 OptionalRef<Method> opt_method_; 54 }; 55 56 class TaskQueue final : public TaskQueueInterface<Task> { 57 public: TaskQueue(mem::InternalAllocatorPtr allocator)58 explicit TaskQueue(mem::InternalAllocatorPtr allocator) 59 : TaskQueueInterface<Task>(std::numeric_limits<size_t>::max()), queue_(allocator->Adapter()) 60 { 61 } 62 63 ~TaskQueue() = default; 64 NO_COPY_SEMANTIC(TaskQueue); 65 NO_MOVE_SEMANTIC(TaskQueue); 66 GetTask()67 Task GetTask() override 68 { 69 if (queue_.empty()) { 70 LOG(DEBUG, VERIFIER) << "No jobs in the verifier queue"; 71 return {}; 72 } 73 74 Task task {std::move(queue_.front())}; 75 queue_.pop_front(); 76 return task; 77 } 78 79 // NOLINTNEXTLINE(google-default-arguments) 80 void AddTask(Task task, [[maybe_unused]] size_t priority = 0) override 81 { 82 queue_.push_back(std::move(task)); 83 } 84 Finalize()85 void Finalize() override 86 { 87 queue_.clear(); 88 } 89 90 protected: GetQueueSize()91 size_t GetQueueSize() override 92 { 93 return queue_.size(); 94 } 95 96 private: 97 PandaDeque<Task> queue_; 98 }; 99 100 class Processor final : public ProcessorInterface<Task, std::monostate> { 101 public: Processor(std::monostate dummy)102 explicit Processor(std::monostate dummy) 103 // Atomic with acq_rel order reason: could be relaxed, but we need to allow reinitialization for tests 104 : types_ {next_thread_num_.fetch_add(1, std::memory_order_acq_rel)} 105 { 106 // GCC 8 doesn't accept [[maybe_unused]] in this particular case 107 (void)dummy; 108 } 109 ~Processor() = default; 110 NO_COPY_SEMANTIC(Processor); 111 NO_MOVE_SEMANTIC(Processor); 112 113 bool Init() override; 114 bool Process(Task task) override; 115 bool Destroy() override; 116 117 private: 118 PandaTypes types_; 119 inline static std::atomic<ThreadNum> next_thread_num_ {0}; 120 friend class ThreadPool; 121 }; 122 123 class ThreadPool { 124 public: 125 NO_COPY_SEMANTIC(ThreadPool); 126 NO_MOVE_SEMANTIC(ThreadPool); 127 128 static void Initialize(mem::InternalAllocatorPtr allocator, size_t num_threads); 129 static void Destroy(); 130 GetCache()131 static OptionalRef<LibCache> GetCache() 132 { 133 Data *data = GetData(); 134 if (data == nullptr) { 135 return {}; 136 } 137 return data->cache; 138 } 139 140 static bool Enqueue(Method *method); 141 142 // TODO(romanov) Current API implies waking everyone waiting for a method when _any_ method is verified. 143 // This means a lot of unnecessary wakeups. It would be good to reduce their number, but need to decide how exactly. 144 // E.g. pass Method& here and to SignalMethodVerified? Include a condition variable to wait on in a Job? 145 template <typename Handler, typename FailureHandler> WaitForVerification(Handler && continue_waiting,FailureHandler && failure_handler)146 static void WaitForVerification(Handler &&continue_waiting, FailureHandler &&failure_handler) 147 { 148 Data *data = GetData(); 149 if (data == nullptr) { 150 return; 151 } 152 panda::os::memory::LockHolder lck {data->lock}; 153 while (continue_waiting()) { 154 if (GetData() == nullptr) { 155 failure_handler(); 156 return; 157 } 158 constexpr uint64_t quantum = 500; 159 data->cond_var.TimedWait(&data->lock, quantum); 160 } 161 } 162 static void SignalMethodVerified(); 163 164 private: 165 struct Data { 166 LibCache cache; 167 TaskQueue queue; 168 panda::ThreadPool<Task, Processor, std::monostate> thread_pool; 169 panda::os::memory::Mutex lock; 170 panda::os::memory::ConditionVariable cond_var GUARDED_BY(lock); 171 172 NO_COPY_SEMANTIC(Data); 173 NO_MOVE_SEMANTIC(Data); DataData174 Data(mem::InternalAllocatorPtr allocator, size_t num_threads) 175 : cache {}, queue {allocator}, thread_pool {allocator, &queue, std::monostate {}, num_threads, "verifier"} 176 { 177 } 178 }; 179 180 static Data *GetData(bool allow_shutting_down = false) 181 { 182 // Atomic with acquire order reason: data race with shutdown_ with dependecies on reads after the load which 183 // should become visible 184 if (!allow_shutting_down && shutdown_.load(std::memory_order_acquire)) { 185 return nullptr; 186 } 187 // Atomic with seq_cst order reason: data race with data_ with requirement for sequentially consistent order 188 // where threads observe all modifications in the same order 189 return data_.load(std::memory_order_seq_cst); 190 } 191 192 inline static mem::InternalAllocatorPtr allocator_ {nullptr}; 193 inline static std::atomic<Data *> data_ {nullptr}; 194 inline static std::atomic<bool> shutdown_ {false}; 195 196 friend class Processor; 197 }; 198 199 } // namespace panda::verifier 200 201 #endif // !PANDA_VERIFIER_JOB_QUEUE_JOB_QUEUE_H 202