1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #include "pw_work_queue/work_queue.h" 16 17 #include <mutex> 18 19 #include "pw_assert/check.h" 20 21 namespace pw::work_queue { 22 RequestStop()23void WorkQueue::RequestStop() { 24 std::lock_guard lock(lock_); 25 stop_requested_ = true; 26 work_notification_.release(); 27 } 28 Run()29void WorkQueue::Run() { 30 while (true) { 31 work_notification_.acquire(); 32 33 // Drain the work queue. 34 bool stop_requested; 35 bool work_remaining; 36 do { 37 std::optional<WorkItem> possible_work_item; 38 { 39 std::lock_guard lock(lock_); 40 possible_work_item = circular_buffer_.Pop(); 41 work_remaining = !circular_buffer_.empty(); 42 stop_requested = stop_requested_; 43 } 44 if (!possible_work_item.has_value()) { 45 continue; // No work item to process. 46 } 47 WorkItem& work_item = possible_work_item.value(); 48 PW_CHECK(work_item != nullptr); 49 work_item(); 50 } while (work_remaining); 51 52 // Queue was drained, return if we've been requested to stop. 53 if (stop_requested) { 54 return; 55 } 56 } 57 } 58 CheckPushWork(WorkItem && work_item)59void WorkQueue::CheckPushWork(WorkItem&& work_item) { 60 PW_CHECK_OK(InternalPushWork(std::move(work_item)), 61 "Failed to push work item into the work queue"); 62 } 63 InternalPushWork(WorkItem && work_item)64Status WorkQueue::InternalPushWork(WorkItem&& work_item) { 65 std::lock_guard lock(lock_); 66 67 if (stop_requested_) { 68 // Entries are not permitted to be enqueued once stop has been requested. 69 return Status::FailedPrecondition(); 70 } 71 72 if (circular_buffer_.full()) { 73 return Status::ResourceExhausted(); 74 } 75 76 circular_buffer_.Push(std::move(work_item)); 77 78 // Update the watermarks for the queue. 79 const uint32_t queue_entries = circular_buffer_.size(); 80 if (queue_entries > max_queue_used_.value()) { 81 max_queue_used_.Set(queue_entries); 82 } 83 const uint32_t queue_remaining = circular_buffer_.capacity() - queue_entries; 84 if (queue_remaining < min_queue_remaining_.value()) { 85 min_queue_remaining_.Set(queue_entries); 86 } 87 88 work_notification_.release(); 89 return OkStatus(); 90 } 91 92 } // namespace pw::work_queue 93