1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/skiplist.h"
6 #include <set>
7 #include "leveldb/env.h"
8 #include "util/arena.h"
9 #include "util/hash.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12
13 namespace leveldb {
14
15 typedef uint64_t Key;
16
17 struct Comparator {
operator ()leveldb::Comparator18 int operator()(const Key& a, const Key& b) const {
19 if (a < b) {
20 return -1;
21 } else if (a > b) {
22 return +1;
23 } else {
24 return 0;
25 }
26 }
27 };
28
29 class SkipTest { };
30
TEST(SkipTest,Empty)31 TEST(SkipTest, Empty) {
32 Arena arena;
33 Comparator cmp;
34 SkipList<Key, Comparator> list(cmp, &arena);
35 ASSERT_TRUE(!list.Contains(10));
36
37 SkipList<Key, Comparator>::Iterator iter(&list);
38 ASSERT_TRUE(!iter.Valid());
39 iter.SeekToFirst();
40 ASSERT_TRUE(!iter.Valid());
41 iter.Seek(100);
42 ASSERT_TRUE(!iter.Valid());
43 iter.SeekToLast();
44 ASSERT_TRUE(!iter.Valid());
45 }
46
TEST(SkipTest,InsertAndLookup)47 TEST(SkipTest, InsertAndLookup) {
48 const int N = 2000;
49 const int R = 5000;
50 Random rnd(1000);
51 std::set<Key> keys;
52 Arena arena;
53 Comparator cmp;
54 SkipList<Key, Comparator> list(cmp, &arena);
55 for (int i = 0; i < N; i++) {
56 Key key = rnd.Next() % R;
57 if (keys.insert(key).second) {
58 list.Insert(key);
59 }
60 }
61
62 for (int i = 0; i < R; i++) {
63 if (list.Contains(i)) {
64 ASSERT_EQ(keys.count(i), 1);
65 } else {
66 ASSERT_EQ(keys.count(i), 0);
67 }
68 }
69
70 // Simple iterator tests
71 {
72 SkipList<Key, Comparator>::Iterator iter(&list);
73 ASSERT_TRUE(!iter.Valid());
74
75 iter.Seek(0);
76 ASSERT_TRUE(iter.Valid());
77 ASSERT_EQ(*(keys.begin()), iter.key());
78
79 iter.SeekToFirst();
80 ASSERT_TRUE(iter.Valid());
81 ASSERT_EQ(*(keys.begin()), iter.key());
82
83 iter.SeekToLast();
84 ASSERT_TRUE(iter.Valid());
85 ASSERT_EQ(*(keys.rbegin()), iter.key());
86 }
87
88 // Forward iteration test
89 for (int i = 0; i < R; i++) {
90 SkipList<Key, Comparator>::Iterator iter(&list);
91 iter.Seek(i);
92
93 // Compare against model iterator
94 std::set<Key>::iterator model_iter = keys.lower_bound(i);
95 for (int j = 0; j < 3; j++) {
96 if (model_iter == keys.end()) {
97 ASSERT_TRUE(!iter.Valid());
98 break;
99 } else {
100 ASSERT_TRUE(iter.Valid());
101 ASSERT_EQ(*model_iter, iter.key());
102 ++model_iter;
103 iter.Next();
104 }
105 }
106 }
107
108 // Backward iteration test
109 {
110 SkipList<Key, Comparator>::Iterator iter(&list);
111 iter.SeekToLast();
112
113 // Compare against model iterator
114 for (std::set<Key>::reverse_iterator model_iter = keys.rbegin();
115 model_iter != keys.rend();
116 ++model_iter) {
117 ASSERT_TRUE(iter.Valid());
118 ASSERT_EQ(*model_iter, iter.key());
119 iter.Prev();
120 }
121 ASSERT_TRUE(!iter.Valid());
122 }
123 }
124
125 // We want to make sure that with a single writer and multiple
126 // concurrent readers (with no synchronization other than when a
127 // reader's iterator is created), the reader always observes all the
128 // data that was present in the skip list when the iterator was
129 // constructor. Because insertions are happening concurrently, we may
130 // also observe new values that were inserted since the iterator was
131 // constructed, but we should never miss any values that were present
132 // at iterator construction time.
133 //
134 // We generate multi-part keys:
135 // <key,gen,hash>
136 // where:
137 // key is in range [0..K-1]
138 // gen is a generation number for key
139 // hash is hash(key,gen)
140 //
141 // The insertion code picks a random key, sets gen to be 1 + the last
142 // generation number inserted for that key, and sets hash to Hash(key,gen).
143 //
144 // At the beginning of a read, we snapshot the last inserted
145 // generation number for each key. We then iterate, including random
146 // calls to Next() and Seek(). For every key we encounter, we
147 // check that it is either expected given the initial snapshot or has
148 // been concurrently added since the iterator started.
149 class ConcurrentTest {
150 private:
151 static const uint32_t K = 4;
152
key(Key key)153 static uint64_t key(Key key) { return (key >> 40); }
gen(Key key)154 static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
hash(Key key)155 static uint64_t hash(Key key) { return key & 0xff; }
156
HashNumbers(uint64_t k,uint64_t g)157 static uint64_t HashNumbers(uint64_t k, uint64_t g) {
158 uint64_t data[2] = { k, g };
159 return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
160 }
161
MakeKey(uint64_t k,uint64_t g)162 static Key MakeKey(uint64_t k, uint64_t g) {
163 assert(sizeof(Key) == sizeof(uint64_t));
164 assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
165 assert(g <= 0xffffffffu);
166 return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
167 }
168
IsValidKey(Key k)169 static bool IsValidKey(Key k) {
170 return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
171 }
172
RandomTarget(Random * rnd)173 static Key RandomTarget(Random* rnd) {
174 switch (rnd->Next() % 10) {
175 case 0:
176 // Seek to beginning
177 return MakeKey(0, 0);
178 case 1:
179 // Seek to end
180 return MakeKey(K, 0);
181 default:
182 // Seek to middle
183 return MakeKey(rnd->Next() % K, 0);
184 }
185 }
186
187 // Per-key generation
188 struct State {
189 port::AtomicPointer generation[K];
Setleveldb::ConcurrentTest::State190 void Set(int k, intptr_t v) {
191 generation[k].Release_Store(reinterpret_cast<void*>(v));
192 }
Getleveldb::ConcurrentTest::State193 intptr_t Get(int k) {
194 return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
195 }
196
Stateleveldb::ConcurrentTest::State197 State() {
198 for (int k = 0; k < K; k++) {
199 Set(k, 0);
200 }
201 }
202 };
203
204 // Current state of the test
205 State current_;
206
207 Arena arena_;
208
209 // SkipList is not protected by mu_. We just use a single writer
210 // thread to modify it.
211 SkipList<Key, Comparator> list_;
212
213 public:
ConcurrentTest()214 ConcurrentTest() : list_(Comparator(), &arena_) { }
215
216 // REQUIRES: External synchronization
WriteStep(Random * rnd)217 void WriteStep(Random* rnd) {
218 const uint32_t k = rnd->Next() % K;
219 const intptr_t g = current_.Get(k) + 1;
220 const Key key = MakeKey(k, g);
221 list_.Insert(key);
222 current_.Set(k, g);
223 }
224
ReadStep(Random * rnd)225 void ReadStep(Random* rnd) {
226 // Remember the initial committed state of the skiplist.
227 State initial_state;
228 for (int k = 0; k < K; k++) {
229 initial_state.Set(k, current_.Get(k));
230 }
231
232 Key pos = RandomTarget(rnd);
233 SkipList<Key, Comparator>::Iterator iter(&list_);
234 iter.Seek(pos);
235 while (true) {
236 Key current;
237 if (!iter.Valid()) {
238 current = MakeKey(K, 0);
239 } else {
240 current = iter.key();
241 ASSERT_TRUE(IsValidKey(current)) << current;
242 }
243 ASSERT_LE(pos, current) << "should not go backwards";
244
245 // Verify that everything in [pos,current) was not present in
246 // initial_state.
247 while (pos < current) {
248 ASSERT_LT(key(pos), K) << pos;
249
250 // Note that generation 0 is never inserted, so it is ok if
251 // <*,0,*> is missing.
252 ASSERT_TRUE((gen(pos) == 0) ||
253 (gen(pos) > initial_state.Get(key(pos)))
254 ) << "key: " << key(pos)
255 << "; gen: " << gen(pos)
256 << "; initgen: "
257 << initial_state.Get(key(pos));
258
259 // Advance to next key in the valid key space
260 if (key(pos) < key(current)) {
261 pos = MakeKey(key(pos) + 1, 0);
262 } else {
263 pos = MakeKey(key(pos), gen(pos) + 1);
264 }
265 }
266
267 if (!iter.Valid()) {
268 break;
269 }
270
271 if (rnd->Next() % 2) {
272 iter.Next();
273 pos = MakeKey(key(pos), gen(pos) + 1);
274 } else {
275 Key new_target = RandomTarget(rnd);
276 if (new_target > pos) {
277 pos = new_target;
278 iter.Seek(new_target);
279 }
280 }
281 }
282 }
283 };
284 const uint32_t ConcurrentTest::K;
285
286 // Simple test that does single-threaded testing of the ConcurrentTest
287 // scaffolding.
TEST(SkipTest,ConcurrentWithoutThreads)288 TEST(SkipTest, ConcurrentWithoutThreads) {
289 ConcurrentTest test;
290 Random rnd(test::RandomSeed());
291 for (int i = 0; i < 10000; i++) {
292 test.ReadStep(&rnd);
293 test.WriteStep(&rnd);
294 }
295 }
296
297 class TestState {
298 public:
299 ConcurrentTest t_;
300 int seed_;
301 port::AtomicPointer quit_flag_;
302
303 enum ReaderState {
304 STARTING,
305 RUNNING,
306 DONE
307 };
308
TestState(int s)309 explicit TestState(int s)
310 : seed_(s),
311 quit_flag_(NULL),
312 state_(STARTING),
313 state_cv_(&mu_) {}
314
Wait(ReaderState s)315 void Wait(ReaderState s) {
316 mu_.Lock();
317 while (state_ != s) {
318 state_cv_.Wait();
319 }
320 mu_.Unlock();
321 }
322
Change(ReaderState s)323 void Change(ReaderState s) {
324 mu_.Lock();
325 state_ = s;
326 state_cv_.Signal();
327 mu_.Unlock();
328 }
329
330 private:
331 port::Mutex mu_;
332 ReaderState state_;
333 port::CondVar state_cv_;
334 };
335
ConcurrentReader(void * arg)336 static void ConcurrentReader(void* arg) {
337 TestState* state = reinterpret_cast<TestState*>(arg);
338 Random rnd(state->seed_);
339 int64_t reads = 0;
340 state->Change(TestState::RUNNING);
341 while (!state->quit_flag_.Acquire_Load()) {
342 state->t_.ReadStep(&rnd);
343 ++reads;
344 }
345 state->Change(TestState::DONE);
346 }
347
RunConcurrent(int run)348 static void RunConcurrent(int run) {
349 const int seed = test::RandomSeed() + (run * 100);
350 Random rnd(seed);
351 const int N = 1000;
352 const int kSize = 1000;
353 for (int i = 0; i < N; i++) {
354 if ((i % 100) == 0) {
355 fprintf(stderr, "Run %d of %d\n", i, N);
356 }
357 TestState state(seed + 1);
358 Env::Default()->Schedule(ConcurrentReader, &state);
359 state.Wait(TestState::RUNNING);
360 for (int i = 0; i < kSize; i++) {
361 state.t_.WriteStep(&rnd);
362 }
363 state.quit_flag_.Release_Store(&state); // Any non-NULL arg will do
364 state.Wait(TestState::DONE);
365 }
366 }
367
TEST(SkipTest,Concurrent1)368 TEST(SkipTest, Concurrent1) { RunConcurrent(1); }
TEST(SkipTest,Concurrent2)369 TEST(SkipTest, Concurrent2) { RunConcurrent(2); }
TEST(SkipTest,Concurrent3)370 TEST(SkipTest, Concurrent3) { RunConcurrent(3); }
TEST(SkipTest,Concurrent4)371 TEST(SkipTest, Concurrent4) { RunConcurrent(4); }
TEST(SkipTest,Concurrent5)372 TEST(SkipTest, Concurrent5) { RunConcurrent(5); }
373
374 } // namespace leveldb
375
main(int argc,char ** argv)376 int main(int argc, char** argv) {
377 return leveldb::test::RunAllTests();
378 }
379