• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2011 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #ifndef WEBRTC_BASE_ATOMICOPS_H_
12 #define WEBRTC_BASE_ATOMICOPS_H_
13 
14 #include <string>
15 
16 #include "webrtc/base/basictypes.h"
17 #include "webrtc/base/common.h"
18 #include "webrtc/base/logging.h"
19 #include "webrtc/base/scoped_ptr.h"
20 
21 namespace rtc {
22 
23 // A single-producer, single-consumer, fixed-size queue.
24 // All methods not ending in Unsafe can be safely called without locking,
25 // provided that calls to consumer methods (Peek/Pop) or producer methods (Push)
26 // only happen on a single thread per method type. If multiple threads need to
27 // read simultaneously or write simultaneously, other synchronization is
28 // necessary. Synchronization is also required if a call into any Unsafe method
29 // could happen at the same time as a call to any other method.
30 template <typename T>
31 class FixedSizeLockFreeQueue {
32  private:
33 // Atomic primitives and memory barrier
34 #if defined(__arm__)
35   typedef uint32 Atomic32;
36 
37   // Copied from google3/base/atomicops-internals-arm-v6plus.h
MemoryBarrier()38   static inline void MemoryBarrier() {
39     asm volatile("dmb":::"memory");
40   }
41 
42   // Adapted from google3/base/atomicops-internals-arm-v6plus.h
AtomicIncrement(volatile Atomic32 * ptr)43   static inline void AtomicIncrement(volatile Atomic32* ptr) {
44     Atomic32 str_success, value;
45     asm volatile (
46         "1:\n"
47         "ldrex  %1, [%2]\n"
48         "add    %1, %1, #1\n"
49         "strex  %0, %1, [%2]\n"
50         "teq    %0, #0\n"
51         "bne    1b"
52         : "=&r"(str_success), "=&r"(value)
53         : "r" (ptr)
54         : "cc", "memory");
55   }
56 #elif !defined(SKIP_ATOMIC_CHECK)
57 #error "No atomic operations defined for the given architecture."
58 #endif
59 
60  public:
61   // Constructs an empty queue, with capacity 0.
FixedSizeLockFreeQueue()62   FixedSizeLockFreeQueue() : pushed_count_(0),
63                              popped_count_(0),
64                              capacity_(0),
65                              data_() {}
66   // Constructs an empty queue with the given capacity.
FixedSizeLockFreeQueue(size_t capacity)67   FixedSizeLockFreeQueue(size_t capacity) : pushed_count_(0),
68                                             popped_count_(0),
69                                             capacity_(capacity),
70                                             data_(new T[capacity]) {}
71 
72   // Pushes a value onto the queue. Returns true if the value was successfully
73   // pushed (there was space in the queue). This method can be safely called at
74   // the same time as PeekFront/PopFront.
PushBack(T value)75   bool PushBack(T value) {
76     if (capacity_ == 0) {
77       LOG(LS_WARNING) << "Queue capacity is 0.";
78       return false;
79     }
80     if (IsFull()) {
81       return false;
82     }
83 
84     data_[pushed_count_ % capacity_] = value;
85     // Make sure the data is written before the count is incremented, so other
86     // threads can't see the value exists before being able to read it.
87     MemoryBarrier();
88     AtomicIncrement(&pushed_count_);
89     return true;
90   }
91 
92   // Retrieves the oldest value pushed onto the queue. Returns true if there was
93   // an item to peek (the queue was non-empty). This method can be safely called
94   // at the same time as PushBack.
PeekFront(T * value_out)95   bool PeekFront(T* value_out) {
96     if (capacity_ == 0) {
97       LOG(LS_WARNING) << "Queue capacity is 0.";
98       return false;
99     }
100     if (IsEmpty()) {
101       return false;
102     }
103 
104     *value_out = data_[popped_count_ % capacity_];
105     return true;
106   }
107 
108   // Retrieves the oldest value pushed onto the queue and removes it from the
109   // queue. Returns true if there was an item to pop (the queue was non-empty).
110   // This method can be safely called at the same time as PushBack.
PopFront(T * value_out)111   bool PopFront(T* value_out) {
112     if (PeekFront(value_out)) {
113       AtomicIncrement(&popped_count_);
114       return true;
115     }
116     return false;
117   }
118 
119   // Clears the current items in the queue and sets the new (fixed) size. This
120   // method cannot be called at the same time as any other method.
ClearAndResizeUnsafe(int new_capacity)121   void ClearAndResizeUnsafe(int new_capacity) {
122     capacity_ = new_capacity;
123     data_.reset(new T[new_capacity]);
124     pushed_count_ = 0;
125     popped_count_ = 0;
126   }
127 
128   // Returns true if there is no space left in the queue for new elements.
IsFull()129   int IsFull() const { return pushed_count_ == popped_count_ + capacity_; }
130   // Returns true if there are no elements in the queue.
IsEmpty()131   int IsEmpty() const { return pushed_count_ == popped_count_; }
132   // Returns the current number of elements in the queue. This is always in the
133   // range [0, capacity]
Size()134   size_t Size() const { return pushed_count_ - popped_count_; }
135 
136   // Returns the capacity of the queue (max size).
capacity()137   size_t capacity() const { return capacity_; }
138 
139  private:
140   volatile Atomic32 pushed_count_;
141   volatile Atomic32 popped_count_;
142   size_t capacity_;
143   rtc::scoped_ptr<T[]> data_;
144   DISALLOW_COPY_AND_ASSIGN(FixedSizeLockFreeQueue);
145 };
146 
147 }
148 
149 #endif  // WEBRTC_BASE_ATOMICOPS_H_
150