• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_work_queue/work_queue.h"
16 
17 #include <mutex>
18 
19 #include "pw_assert/check.h"
20 
21 namespace pw::work_queue {
22 
RequestStop()23 void WorkQueue::RequestStop() {
24   std::lock_guard lock(lock_);
25   stop_requested_ = true;
26   work_notification_.release();
27 }
28 
Run()29 void WorkQueue::Run() {
30   while (true) {
31     work_notification_.acquire();
32 
33     // Drain the work queue.
34     bool stop_requested;
35     bool work_remaining;
36     do {
37       std::optional<WorkItem> possible_work_item;
38       {
39         std::lock_guard lock(lock_);
40         possible_work_item = circular_buffer_.Pop();
41         work_remaining = !circular_buffer_.empty();
42         stop_requested = stop_requested_;
43       }
44       if (!possible_work_item.has_value()) {
45         continue;  // No work item to process.
46       }
47       WorkItem& work_item = possible_work_item.value();
48       PW_CHECK(work_item != nullptr);
49       work_item();
50     } while (work_remaining);
51 
52     // Queue was drained, return if we've been requested to stop.
53     if (stop_requested) {
54       return;
55     }
56   }
57 }
58 
CheckPushWork(WorkItem && work_item)59 void WorkQueue::CheckPushWork(WorkItem&& work_item) {
60   PW_CHECK_OK(InternalPushWork(std::move(work_item)),
61               "Failed to push work item into the work queue");
62 }
63 
InternalPushWork(WorkItem && work_item)64 Status WorkQueue::InternalPushWork(WorkItem&& work_item) {
65   std::lock_guard lock(lock_);
66 
67   if (stop_requested_) {
68     // Entries are not permitted to be enqueued once stop has been requested.
69     return Status::FailedPrecondition();
70   }
71 
72   if (circular_buffer_.full()) {
73     return Status::ResourceExhausted();
74   }
75 
76   circular_buffer_.Push(std::move(work_item));
77 
78   // Update the watermarks for the queue.
79   const uint32_t queue_entries = circular_buffer_.size();
80   if (queue_entries > max_queue_used_.value()) {
81     max_queue_used_.Set(queue_entries);
82   }
83   const uint32_t queue_remaining = circular_buffer_.capacity() - queue_entries;
84   if (queue_remaining < min_queue_remaining_.value()) {
85     min_queue_remaining_.Set(queue_entries);
86   }
87 
88   work_notification_.release();
89   return OkStatus();
90 }
91 
92 }  // namespace pw::work_queue
93