• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Abseil Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "absl/flags/internal/sequence_lock.h"
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <thread>  // NOLINT(build/c++11)
19 #include <tuple>
20 #include <vector>
21 
22 #include "gtest/gtest.h"
23 #include "absl/base/internal/sysinfo.h"
24 #include "absl/container/fixed_array.h"
25 #include "absl/time/clock.h"
26 
27 namespace {
28 
29 namespace flags = absl::flags_internal;
30 
31 class ConcurrentSequenceLockTest
32     : public testing::TestWithParam<std::tuple<int, int>> {
33  public:
ConcurrentSequenceLockTest()34   ConcurrentSequenceLockTest()
35       : buf_bytes_(std::get<0>(GetParam())),
36         num_threads_(std::get<1>(GetParam())) {}
37 
38  protected:
39   const int buf_bytes_;
40   const int num_threads_;
41 };
42 
TEST_P(ConcurrentSequenceLockTest,ReadAndWrite)43 TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) {
44   const int buf_words =
45       flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t);
46 
47   // The buffer that will be protected by the SequenceLock.
48   absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words);
49   for (auto& v : protected_buf) v = -1;
50 
51   flags::SequenceLock seq_lock;
52   std::atomic<bool> stop{false};
53   std::atomic<int64_t> bad_reads{0};
54   std::atomic<int64_t> good_reads{0};
55   std::atomic<int64_t> unsuccessful_reads{0};
56 
57   // Start a bunch of threads which read 'protected_buf' under the sequence
58   // lock. The main thread will concurrently update 'protected_buf'. The updates
59   // always consist of an array of identical integers. The reader ensures that
60   // any data it reads matches that pattern (i.e. the reads are not "torn").
61   std::vector<std::thread> threads;
62   for (int i = 0; i < num_threads_; i++) {
63     threads.emplace_back([&]() {
64       absl::FixedArray<char> local_buf(buf_bytes_);
65       while (!stop.load(std::memory_order_relaxed)) {
66         if (seq_lock.TryRead(local_buf.data(), protected_buf.data(),
67                              buf_bytes_)) {
68           bool good = true;
69           for (const auto& v : local_buf) {
70             if (v != local_buf[0]) good = false;
71           }
72           if (good) {
73             good_reads.fetch_add(1, std::memory_order_relaxed);
74           } else {
75             bad_reads.fetch_add(1, std::memory_order_relaxed);
76           }
77         } else {
78           unsuccessful_reads.fetch_add(1, std::memory_order_relaxed);
79         }
80       }
81     });
82   }
83   while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) {
84     absl::SleepFor(absl::Milliseconds(1));
85   }
86   seq_lock.MarkInitialized();
87 
88   // Run a maximum of 5 seconds. On Windows, the scheduler behavior seems
89   // somewhat unfair and without an explicit timeout for this loop, the tests
90   // can run a long time.
91   absl::Time deadline = absl::Now() + absl::Seconds(5);
92   for (int i = 0; i < 100 && absl::Now() < deadline; i++) {
93     absl::FixedArray<char> writer_buf(buf_bytes_);
94     for (auto& v : writer_buf) v = i;
95     seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_);
96     absl::SleepFor(absl::Microseconds(10));
97   }
98   stop.store(true, std::memory_order_relaxed);
99   for (auto& t : threads) t.join();
100   ASSERT_GE(good_reads, 0);
101   ASSERT_EQ(bad_reads, 0);
102 }
103 
104 // Simple helper for generating a range of thread counts.
105 // Generates [low, low*scale, low*scale^2, ...high)
106 // (even if high is between low*scale^k and low*scale^(k+1)).
MultiplicativeRange(int low,int high,int scale)107 std::vector<int> MultiplicativeRange(int low, int high, int scale) {
108   std::vector<int> result;
109   for (int current = low; current < high; current *= scale) {
110     result.push_back(current);
111   }
112   result.push_back(high);
113   return result;
114 }
115 
116 #ifndef ABSL_HAVE_THREAD_SANITIZER
117 const int kMaxThreads = absl::base_internal::NumCPUs();
118 #else
119 // With TSAN, a lot of threads contending for atomic access on the sequence
120 // lock make this test run too slowly.
121 const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4);
122 #endif
123 
124 // Return all of the interesting buffer sizes worth testing:
125 // powers of two and adjacent values.
InterestingBufferSizes()126 std::vector<int> InterestingBufferSizes() {
127   std::vector<int> ret;
128   for (int v : MultiplicativeRange(1, 128, 2)) {
129     ret.push_back(v);
130     if (v > 1) {
131       ret.push_back(v - 1);
132     }
133     ret.push_back(v + 1);
134   }
135   return ret;
136 }
137 
138 INSTANTIATE_TEST_SUITE_P(
139     TestManyByteSizes, ConcurrentSequenceLockTest,
140     testing::Combine(
141         // Buffer size (bytes).
142         testing::ValuesIn(InterestingBufferSizes()),
143         // Number of reader threads.
144         testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2))));
145 
146 // Simple single-threaded test, parameterized by the size of the buffer to be
147 // protected.
148 class SequenceLockTest : public testing::TestWithParam<int> {};
149 
TEST_P(SequenceLockTest,SingleThreaded)150 TEST_P(SequenceLockTest, SingleThreaded) {
151   const int size = GetParam();
152   absl::FixedArray<std::atomic<uint64_t>> protected_buf(
153       flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t));
154 
155   flags::SequenceLock seq_lock;
156   seq_lock.MarkInitialized();
157 
158   std::vector<char> src_buf(size, 'x');
159   seq_lock.Write(protected_buf.data(), src_buf.data(), size);
160 
161   std::vector<char> dst_buf(size, '0');
162   ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size));
163   ASSERT_EQ(src_buf, dst_buf);
164 }
165 INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest,
166                          // Buffer size (bytes).
167                          testing::Range(1, 128));
168 
169 }  // namespace
170