1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 17 #include <array> 18 #include <cstdint> 19 20 #include "pw_function/function.h" 21 #include "pw_metric/metric.h" 22 #include "pw_span/span.h" 23 #include "pw_status/status.h" 24 #include "pw_sync/interrupt_spin_lock.h" 25 #include "pw_sync/lock_annotations.h" 26 #include "pw_sync/thread_notification.h" 27 #include "pw_thread/thread_core.h" 28 #include "pw_work_queue/internal/circular_buffer.h" 29 30 namespace pw::work_queue { 31 32 using WorkItem = Function<void()>; 33 34 // The WorkQueue class enables threads and interrupts to enqueue work as a 35 // pw::work_queue::WorkItem for execution by the work queue. 36 // 37 // The entire API is thread and interrupt safe. 38 class WorkQueue : public thread::ThreadCore { 39 public: 40 // Note: the ThreadNotification prevents this from being constexpr. WorkQueue(span<WorkItem> queue_storage)41 explicit WorkQueue(span<WorkItem> queue_storage) 42 : stop_requested_(false), circular_buffer_(queue_storage) {} 43 44 // Enqueues a work_item for execution by the work queue thread. 45 // 46 // Returns: 47 // Ok - Success, entry was enqueued for execution. 48 // FailedPrecondition - the work queue is shutting down, entries are no 49 // longer permitted. 50 // ResourceExhausted - internal work queue is full, entry was not enqueued. PushWork(WorkItem && work_item)51 Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) { 52 return InternalPushWork(std::move(work_item)); 53 } 54 55 // Queue work for execution. Crash if the work cannot be queued due to a 56 // full queue or a stopped worker thread. 57 // 58 // This call is recommended where possible since it saves error handling code 59 // at the callsite; and in many practical cases, it is a bug if the work 60 // queue is full (and so a crash is useful to detect the problem). 61 // 62 // Precondition: The queue must not overflow, i.e. be full. 63 // Precondition: The queue must not have been requested to stop, i.e. it must 64 // not be in the process of shutting down. 65 void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_); 66 67 // Locks the queue to prevent further work enqueing, finishes outstanding 68 // work, then shuts down the worker thread. 69 // 70 // The WorkQueue cannot be resumed after stopping as the ThreadCore thread 71 // returns and may be joined. It must be reconstructed for re-use after 72 // the thread has been joined. 73 void RequestStop() PW_LOCKS_EXCLUDED(lock_); 74 75 private: 76 void Run() override PW_LOCKS_EXCLUDED(lock_); 77 Status InternalPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_); 78 79 sync::InterruptSpinLock lock_; 80 bool stop_requested_ PW_GUARDED_BY(lock_); 81 internal::CircularBuffer<WorkItem> circular_buffer_ PW_GUARDED_BY(lock_); 82 sync::ThreadNotification work_notification_; 83 84 // TODO(ewout): The group and/or its name token should be passed as a ctor 85 // arg instead. Depending on the approach here the group should be exposed 86 // While doing this evaluate whether perhaps we should instead construct 87 // TypedMetric<uint32_t>s directly, avoiding the macro usage given the 88 // min_queue_remaining_ initial value requires dependency injection. 89 // And lastly when the restructure is finalized add unit tests to ensure these 90 // metrics work as intended. 91 PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue"); 92 PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u); 93 PW_METRIC(metrics_, 94 min_queue_remaining_, 95 "min_queue_remaining", 96 static_cast<uint32_t>(circular_buffer_.capacity())); 97 }; 98 99 template <size_t kWorkQueueEntries> 100 class WorkQueueWithBuffer : public WorkQueue { 101 public: WorkQueueWithBuffer()102 constexpr WorkQueueWithBuffer() : WorkQueue(queue_storage_) {} 103 104 private: 105 std::array<WorkItem, kWorkQueueEntries> queue_storage_; 106 }; 107 108 } // namespace pw::work_queue 109