• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2014 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/deprecated/recursive_critical_section.h"
12 
13 #include <stddef.h>
14 #include <stdint.h>
15 
16 #include <memory>
17 #include <set>
18 #include <type_traits>
19 #include <utility>
20 #include <vector>
21 
22 #include "absl/base/attributes.h"
23 #include "rtc_base/arraysize.h"
24 #include "rtc_base/atomic_ops.h"
25 #include "rtc_base/checks.h"
26 #include "rtc_base/event.h"
27 #include "rtc_base/location.h"
28 #include "rtc_base/message_handler.h"
29 #include "rtc_base/platform_thread.h"
30 #include "rtc_base/thread.h"
31 #include "test/gtest.h"
32 
33 namespace rtc {
34 
35 namespace {
36 
37 const int kLongTime = 10000;  // 10 seconds
38 const int kNumThreads = 16;
39 const int kOperationsToRun = 1000;
40 
41 class UniqueValueVerifier {
42  public:
Verify(const std::vector<int> & values)43   void Verify(const std::vector<int>& values) {
44     for (size_t i = 0; i < values.size(); ++i) {
45       std::pair<std::set<int>::iterator, bool> result =
46           all_values_.insert(values[i]);
47       // Each value should only be taken by one thread, so if this value
48       // has already been added, something went wrong.
49       EXPECT_TRUE(result.second)
50           << " Thread=" << Thread::Current() << " value=" << values[i];
51     }
52   }
53 
Finalize()54   void Finalize() {}
55 
56  private:
57   std::set<int> all_values_;
58 };
59 
60 class CompareAndSwapVerifier {
61  public:
CompareAndSwapVerifier()62   CompareAndSwapVerifier() : zero_count_(0) {}
63 
Verify(const std::vector<int> & values)64   void Verify(const std::vector<int>& values) {
65     for (auto v : values) {
66       if (v == 0) {
67         EXPECT_EQ(0, zero_count_) << "Thread=" << Thread::Current();
68         ++zero_count_;
69       } else {
70         EXPECT_EQ(1, v) << " Thread=" << Thread::Current();
71       }
72     }
73   }
74 
Finalize()75   void Finalize() { EXPECT_EQ(1, zero_count_); }
76 
77  private:
78   int zero_count_;
79 };
80 
81 class RunnerBase : public MessageHandler {
82  public:
RunnerBase(int value)83   explicit RunnerBase(int value)
84       : threads_active_(0),
85         start_event_(true, false),
86         done_event_(true, false),
87         shared_value_(value) {}
88 
Run()89   bool Run() {
90     // Signal all threads to start.
91     start_event_.Set();
92 
93     // Wait for all threads to finish.
94     return done_event_.Wait(kLongTime);
95   }
96 
SetExpectedThreadCount(int count)97   void SetExpectedThreadCount(int count) { threads_active_ = count; }
98 
shared_value() const99   int shared_value() const { return shared_value_; }
100 
101  protected:
102   // Derived classes must override OnMessage, and call BeforeStart and AfterEnd
103   // at the beginning and the end of OnMessage respectively.
BeforeStart()104   void BeforeStart() { ASSERT_TRUE(start_event_.Wait(kLongTime)); }
105 
106   // Returns true if all threads have finished.
AfterEnd()107   bool AfterEnd() {
108     if (AtomicOps::Decrement(&threads_active_) == 0) {
109       done_event_.Set();
110       return true;
111     }
112     return false;
113   }
114 
115   int threads_active_;
116   Event start_event_;
117   Event done_event_;
118   int shared_value_;
119 };
120 
121 class RTC_LOCKABLE CriticalSectionLock {
122  public:
Lock()123   void Lock() RTC_EXCLUSIVE_LOCK_FUNCTION() { cs_.Enter(); }
Unlock()124   void Unlock() RTC_UNLOCK_FUNCTION() { cs_.Leave(); }
125 
126  private:
127   RecursiveCriticalSection cs_;
128 };
129 
130 template <class Lock>
131 class LockRunner : public RunnerBase {
132  public:
LockRunner()133   LockRunner() : RunnerBase(0) {}
134 
OnMessage(Message * msg)135   void OnMessage(Message* msg) override {
136     BeforeStart();
137 
138     lock_.Lock();
139 
140     EXPECT_EQ(0, shared_value_);
141     int old = shared_value_;
142 
143     // Use a loop to increase the chance of race.
144     for (int i = 0; i < kOperationsToRun; ++i) {
145       ++shared_value_;
146     }
147     EXPECT_EQ(old + kOperationsToRun, shared_value_);
148     shared_value_ = 0;
149 
150     lock_.Unlock();
151 
152     AfterEnd();
153   }
154 
155  private:
156   Lock lock_;
157 };
158 
159 template <class Op, class Verifier>
160 class AtomicOpRunner : public RunnerBase {
161  public:
AtomicOpRunner(int initial_value)162   explicit AtomicOpRunner(int initial_value) : RunnerBase(initial_value) {}
163 
OnMessage(Message * msg)164   void OnMessage(Message* msg) override {
165     BeforeStart();
166 
167     std::vector<int> values;
168     values.reserve(kOperationsToRun);
169 
170     // Generate a bunch of values by updating shared_value_ atomically.
171     for (int i = 0; i < kOperationsToRun; ++i) {
172       values.push_back(Op::AtomicOp(&shared_value_));
173     }
174 
175     {  // Add them all to the set.
176       CritScope cs(&all_values_crit_);
177       verifier_.Verify(values);
178     }
179 
180     if (AfterEnd()) {
181       verifier_.Finalize();
182     }
183   }
184 
185  private:
186   RecursiveCriticalSection all_values_crit_;
187   Verifier verifier_;
188 };
189 
190 struct IncrementOp {
AtomicOprtc::__anonb8f3e3ae0111::IncrementOp191   static int AtomicOp(int* i) { return AtomicOps::Increment(i); }
192 };
193 
194 struct DecrementOp {
AtomicOprtc::__anonb8f3e3ae0111::DecrementOp195   static int AtomicOp(int* i) { return AtomicOps::Decrement(i); }
196 };
197 
198 struct CompareAndSwapOp {
AtomicOprtc::__anonb8f3e3ae0111::CompareAndSwapOp199   static int AtomicOp(int* i) { return AtomicOps::CompareAndSwap(i, 0, 1); }
200 };
201 
StartThreads(std::vector<std::unique_ptr<Thread>> * threads,MessageHandler * handler)202 void StartThreads(std::vector<std::unique_ptr<Thread>>* threads,
203                   MessageHandler* handler) {
204   for (int i = 0; i < kNumThreads; ++i) {
205     std::unique_ptr<Thread> thread(Thread::Create());
206     thread->Start();
207     thread->Post(RTC_FROM_HERE, handler);
208     threads->push_back(std::move(thread));
209   }
210 }
211 
212 }  // namespace
213 
TEST(AtomicOpsTest,Simple)214 TEST(AtomicOpsTest, Simple) {
215   int value = 0;
216   EXPECT_EQ(1, AtomicOps::Increment(&value));
217   EXPECT_EQ(1, value);
218   EXPECT_EQ(2, AtomicOps::Increment(&value));
219   EXPECT_EQ(2, value);
220   EXPECT_EQ(1, AtomicOps::Decrement(&value));
221   EXPECT_EQ(1, value);
222   EXPECT_EQ(0, AtomicOps::Decrement(&value));
223   EXPECT_EQ(0, value);
224 }
225 
TEST(AtomicOpsTest,SimplePtr)226 TEST(AtomicOpsTest, SimplePtr) {
227   class Foo {};
228   Foo* volatile foo = nullptr;
229   std::unique_ptr<Foo> a(new Foo());
230   std::unique_ptr<Foo> b(new Foo());
231   // Reading the initial value should work as expected.
232   EXPECT_TRUE(rtc::AtomicOps::AcquireLoadPtr(&foo) == nullptr);
233   // Setting using compare and swap should work.
234   EXPECT_TRUE(rtc::AtomicOps::CompareAndSwapPtr(
235                   &foo, static_cast<Foo*>(nullptr), a.get()) == nullptr);
236   EXPECT_TRUE(rtc::AtomicOps::AcquireLoadPtr(&foo) == a.get());
237   // Setting another value but with the wrong previous pointer should fail
238   // (remain a).
239   EXPECT_TRUE(rtc::AtomicOps::CompareAndSwapPtr(
240                   &foo, static_cast<Foo*>(nullptr), b.get()) == a.get());
241   EXPECT_TRUE(rtc::AtomicOps::AcquireLoadPtr(&foo) == a.get());
242   // Replacing a with b should work.
243   EXPECT_TRUE(rtc::AtomicOps::CompareAndSwapPtr(&foo, a.get(), b.get()) ==
244               a.get());
245   EXPECT_TRUE(rtc::AtomicOps::AcquireLoadPtr(&foo) == b.get());
246 }
247 
TEST(AtomicOpsTest,Increment)248 TEST(AtomicOpsTest, Increment) {
249   // Create and start lots of threads.
250   AtomicOpRunner<IncrementOp, UniqueValueVerifier> runner(0);
251   std::vector<std::unique_ptr<Thread>> threads;
252   StartThreads(&threads, &runner);
253   runner.SetExpectedThreadCount(kNumThreads);
254 
255   // Release the hounds!
256   EXPECT_TRUE(runner.Run());
257   EXPECT_EQ(kOperationsToRun * kNumThreads, runner.shared_value());
258 }
259 
TEST(AtomicOpsTest,Decrement)260 TEST(AtomicOpsTest, Decrement) {
261   // Create and start lots of threads.
262   AtomicOpRunner<DecrementOp, UniqueValueVerifier> runner(kOperationsToRun *
263                                                           kNumThreads);
264   std::vector<std::unique_ptr<Thread>> threads;
265   StartThreads(&threads, &runner);
266   runner.SetExpectedThreadCount(kNumThreads);
267 
268   // Release the hounds!
269   EXPECT_TRUE(runner.Run());
270   EXPECT_EQ(0, runner.shared_value());
271 }
272 
TEST(AtomicOpsTest,CompareAndSwap)273 TEST(AtomicOpsTest, CompareAndSwap) {
274   // Create and start lots of threads.
275   AtomicOpRunner<CompareAndSwapOp, CompareAndSwapVerifier> runner(0);
276   std::vector<std::unique_ptr<Thread>> threads;
277   StartThreads(&threads, &runner);
278   runner.SetExpectedThreadCount(kNumThreads);
279 
280   // Release the hounds!
281   EXPECT_TRUE(runner.Run());
282   EXPECT_EQ(1, runner.shared_value());
283 }
284 
TEST(RecursiveCriticalSectionTest,Basic)285 TEST(RecursiveCriticalSectionTest, Basic) {
286   // Create and start lots of threads.
287   LockRunner<CriticalSectionLock> runner;
288   std::vector<std::unique_ptr<Thread>> threads;
289   StartThreads(&threads, &runner);
290   runner.SetExpectedThreadCount(kNumThreads);
291 
292   // Release the hounds!
293   EXPECT_TRUE(runner.Run());
294   EXPECT_EQ(0, runner.shared_value());
295 }
296 
297 class PerfTestData {
298  public:
PerfTestData(int expected_count,Event * event)299   PerfTestData(int expected_count, Event* event)
300       : cache_line_barrier_1_(),
301         cache_line_barrier_2_(),
302         expected_count_(expected_count),
303         event_(event) {
304     cache_line_barrier_1_[0]++;  // Avoid 'is not used'.
305     cache_line_barrier_2_[0]++;  // Avoid 'is not used'.
306   }
~PerfTestData()307   ~PerfTestData() {}
308 
AddToCounter(int add)309   void AddToCounter(int add) {
310     rtc::CritScope cs(&lock_);
311     my_counter_ += add;
312     if (my_counter_ == expected_count_)
313       event_->Set();
314   }
315 
total() const316   int64_t total() const {
317     // Assume that only one thread is running now.
318     return my_counter_;
319   }
320 
321  private:
322   uint8_t cache_line_barrier_1_[64];
323   RecursiveCriticalSection lock_;
324   uint8_t cache_line_barrier_2_[64];
325   int64_t my_counter_ = 0;
326   const int expected_count_;
327   Event* const event_;
328 };
329 
330 class PerfTestThread {
331  public:
PerfTestThread()332   PerfTestThread() : thread_(&ThreadFunc, this, "CsPerf") {}
333 
Start(PerfTestData * data,int repeats,int id)334   void Start(PerfTestData* data, int repeats, int id) {
335     RTC_DCHECK(!thread_.IsRunning());
336     RTC_DCHECK(!data_);
337     data_ = data;
338     repeats_ = repeats;
339     my_id_ = id;
340     thread_.Start();
341   }
342 
Stop()343   void Stop() {
344     RTC_DCHECK(thread_.IsRunning());
345     RTC_DCHECK(data_);
346     thread_.Stop();
347     repeats_ = 0;
348     data_ = nullptr;
349     my_id_ = 0;
350   }
351 
352  private:
ThreadFunc(void * param)353   static void ThreadFunc(void* param) {
354     PerfTestThread* me = static_cast<PerfTestThread*>(param);
355     for (int i = 0; i < me->repeats_; ++i)
356       me->data_->AddToCounter(me->my_id_);
357   }
358 
359   PlatformThread thread_;
360   PerfTestData* data_ = nullptr;
361   int repeats_ = 0;
362   int my_id_ = 0;
363 };
364 
365 // Comparison of output of this test as tested on a MacBook Pro, 13-inch,
366 // 2017, 3,5 GHz Intel Core i7, 16 GB 2133 MHz LPDDR3,
367 // running macOS Mojave, 10.14.3.
368 //
369 // Native mutex implementation using fair policy (previously macOS default):
370 // Approximate CPU usage:
371 // real    4m54.612s
372 // user    1m20.575s
373 // sys     3m48.872s
374 // Unit test output:
375 // [       OK ] RecursiveCriticalSectionTest.Performance (294375 ms)
376 //
377 // Native mutex implementation using first fit policy (current macOS default):
378 // Approximate CPU usage:
379 // real    0m11.535s
380 // user    0m12.738s
381 // sys     0m31.207s
382 // Unit test output:
383 // [       OK ] RecursiveCriticalSectionTest.Performance (11444 ms)
384 //
385 // Special partially spin lock based implementation:
386 // Approximate CPU usage:
387 // real    0m2.113s
388 // user    0m3.014s
389 // sys     0m4.495s
390 // Unit test output:
391 // [       OK ] RecursiveCriticalSectionTest.Performance (1885 ms)
392 //
393 // The test is disabled by default to avoid unecessarily loading the bots.
TEST(RecursiveCriticalSectionTest,DISABLED_Performance)394 TEST(RecursiveCriticalSectionTest, DISABLED_Performance) {
395   PerfTestThread threads[8];
396   Event event;
397 
398   static const int kThreadRepeats = 10000000;
399   static const int kExpectedCount = kThreadRepeats * arraysize(threads);
400   PerfTestData test_data(kExpectedCount, &event);
401 
402   for (auto& t : threads)
403     t.Start(&test_data, kThreadRepeats, 1);
404 
405   event.Wait(Event::kForever);
406 
407   for (auto& t : threads)
408     t.Stop();
409 }
410 
411 }  // namespace rtc
412