1 /* 2 * Copyright 2011 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef WEBRTC_BASE_ATOMICOPS_H_ 12 #define WEBRTC_BASE_ATOMICOPS_H_ 13 14 #include <string> 15 16 #include "webrtc/base/basictypes.h" 17 #include "webrtc/base/common.h" 18 #include "webrtc/base/logging.h" 19 #include "webrtc/base/scoped_ptr.h" 20 21 namespace rtc { 22 23 // A single-producer, single-consumer, fixed-size queue. 24 // All methods not ending in Unsafe can be safely called without locking, 25 // provided that calls to consumer methods (Peek/Pop) or producer methods (Push) 26 // only happen on a single thread per method type. If multiple threads need to 27 // read simultaneously or write simultaneously, other synchronization is 28 // necessary. Synchronization is also required if a call into any Unsafe method 29 // could happen at the same time as a call to any other method. 30 template <typename T> 31 class FixedSizeLockFreeQueue { 32 private: 33 // Atomic primitives and memory barrier 34 #if defined(__arm__) 35 typedef uint32 Atomic32; 36 37 // Copied from google3/base/atomicops-internals-arm-v6plus.h MemoryBarrier()38 static inline void MemoryBarrier() { 39 asm volatile("dmb":::"memory"); 40 } 41 42 // Adapted from google3/base/atomicops-internals-arm-v6plus.h AtomicIncrement(volatile Atomic32 * ptr)43 static inline void AtomicIncrement(volatile Atomic32* ptr) { 44 Atomic32 str_success, value; 45 asm volatile ( 46 "1:\n" 47 "ldrex %1, [%2]\n" 48 "add %1, %1, #1\n" 49 "strex %0, %1, [%2]\n" 50 "teq %0, #0\n" 51 "bne 1b" 52 : "=&r"(str_success), "=&r"(value) 53 : "r" (ptr) 54 : "cc", "memory"); 55 } 56 #elif !defined(SKIP_ATOMIC_CHECK) 57 #error "No atomic operations defined for the given architecture." 58 #endif 59 60 public: 61 // Constructs an empty queue, with capacity 0. FixedSizeLockFreeQueue()62 FixedSizeLockFreeQueue() : pushed_count_(0), 63 popped_count_(0), 64 capacity_(0), 65 data_() {} 66 // Constructs an empty queue with the given capacity. FixedSizeLockFreeQueue(size_t capacity)67 FixedSizeLockFreeQueue(size_t capacity) : pushed_count_(0), 68 popped_count_(0), 69 capacity_(capacity), 70 data_(new T[capacity]) {} 71 72 // Pushes a value onto the queue. Returns true if the value was successfully 73 // pushed (there was space in the queue). This method can be safely called at 74 // the same time as PeekFront/PopFront. PushBack(T value)75 bool PushBack(T value) { 76 if (capacity_ == 0) { 77 LOG(LS_WARNING) << "Queue capacity is 0."; 78 return false; 79 } 80 if (IsFull()) { 81 return false; 82 } 83 84 data_[pushed_count_ % capacity_] = value; 85 // Make sure the data is written before the count is incremented, so other 86 // threads can't see the value exists before being able to read it. 87 MemoryBarrier(); 88 AtomicIncrement(&pushed_count_); 89 return true; 90 } 91 92 // Retrieves the oldest value pushed onto the queue. Returns true if there was 93 // an item to peek (the queue was non-empty). This method can be safely called 94 // at the same time as PushBack. PeekFront(T * value_out)95 bool PeekFront(T* value_out) { 96 if (capacity_ == 0) { 97 LOG(LS_WARNING) << "Queue capacity is 0."; 98 return false; 99 } 100 if (IsEmpty()) { 101 return false; 102 } 103 104 *value_out = data_[popped_count_ % capacity_]; 105 return true; 106 } 107 108 // Retrieves the oldest value pushed onto the queue and removes it from the 109 // queue. Returns true if there was an item to pop (the queue was non-empty). 110 // This method can be safely called at the same time as PushBack. PopFront(T * value_out)111 bool PopFront(T* value_out) { 112 if (PeekFront(value_out)) { 113 AtomicIncrement(&popped_count_); 114 return true; 115 } 116 return false; 117 } 118 119 // Clears the current items in the queue and sets the new (fixed) size. This 120 // method cannot be called at the same time as any other method. ClearAndResizeUnsafe(int new_capacity)121 void ClearAndResizeUnsafe(int new_capacity) { 122 capacity_ = new_capacity; 123 data_.reset(new T[new_capacity]); 124 pushed_count_ = 0; 125 popped_count_ = 0; 126 } 127 128 // Returns true if there is no space left in the queue for new elements. IsFull()129 int IsFull() const { return pushed_count_ == popped_count_ + capacity_; } 130 // Returns true if there are no elements in the queue. IsEmpty()131 int IsEmpty() const { return pushed_count_ == popped_count_; } 132 // Returns the current number of elements in the queue. This is always in the 133 // range [0, capacity] Size()134 size_t Size() const { return pushed_count_ - popped_count_; } 135 136 // Returns the capacity of the queue (max size). capacity()137 size_t capacity() const { return capacity_; } 138 139 private: 140 volatile Atomic32 pushed_count_; 141 volatile Atomic32 popped_count_; 142 size_t capacity_; 143 rtc::scoped_ptr<T[]> data_; 144 DISALLOW_COPY_AND_ASSIGN(FixedSizeLockFreeQueue); 145 }; 146 147 } 148 149 #endif // WEBRTC_BASE_ATOMICOPS_H_ 150