• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/skiplist.h"
6 #include <set>
7 #include "leveldb/env.h"
8 #include "util/arena.h"
9 #include "util/hash.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12 
13 namespace leveldb {
14 
15 typedef uint64_t Key;
16 
17 struct Comparator {
operator ()leveldb::Comparator18   int operator()(const Key& a, const Key& b) const {
19     if (a < b) {
20       return -1;
21     } else if (a > b) {
22       return +1;
23     } else {
24       return 0;
25     }
26   }
27 };
28 
29 class SkipTest { };
30 
TEST(SkipTest,Empty)31 TEST(SkipTest, Empty) {
32   Arena arena;
33   Comparator cmp;
34   SkipList<Key, Comparator> list(cmp, &arena);
35   ASSERT_TRUE(!list.Contains(10));
36 
37   SkipList<Key, Comparator>::Iterator iter(&list);
38   ASSERT_TRUE(!iter.Valid());
39   iter.SeekToFirst();
40   ASSERT_TRUE(!iter.Valid());
41   iter.Seek(100);
42   ASSERT_TRUE(!iter.Valid());
43   iter.SeekToLast();
44   ASSERT_TRUE(!iter.Valid());
45 }
46 
TEST(SkipTest,InsertAndLookup)47 TEST(SkipTest, InsertAndLookup) {
48   const int N = 2000;
49   const int R = 5000;
50   Random rnd(1000);
51   std::set<Key> keys;
52   Arena arena;
53   Comparator cmp;
54   SkipList<Key, Comparator> list(cmp, &arena);
55   for (int i = 0; i < N; i++) {
56     Key key = rnd.Next() % R;
57     if (keys.insert(key).second) {
58       list.Insert(key);
59     }
60   }
61 
62   for (int i = 0; i < R; i++) {
63     if (list.Contains(i)) {
64       ASSERT_EQ(keys.count(i), 1);
65     } else {
66       ASSERT_EQ(keys.count(i), 0);
67     }
68   }
69 
70   // Simple iterator tests
71   {
72     SkipList<Key, Comparator>::Iterator iter(&list);
73     ASSERT_TRUE(!iter.Valid());
74 
75     iter.Seek(0);
76     ASSERT_TRUE(iter.Valid());
77     ASSERT_EQ(*(keys.begin()), iter.key());
78 
79     iter.SeekToFirst();
80     ASSERT_TRUE(iter.Valid());
81     ASSERT_EQ(*(keys.begin()), iter.key());
82 
83     iter.SeekToLast();
84     ASSERT_TRUE(iter.Valid());
85     ASSERT_EQ(*(keys.rbegin()), iter.key());
86   }
87 
88   // Forward iteration test
89   for (int i = 0; i < R; i++) {
90     SkipList<Key, Comparator>::Iterator iter(&list);
91     iter.Seek(i);
92 
93     // Compare against model iterator
94     std::set<Key>::iterator model_iter = keys.lower_bound(i);
95     for (int j = 0; j < 3; j++) {
96       if (model_iter == keys.end()) {
97         ASSERT_TRUE(!iter.Valid());
98         break;
99       } else {
100         ASSERT_TRUE(iter.Valid());
101         ASSERT_EQ(*model_iter, iter.key());
102         ++model_iter;
103         iter.Next();
104       }
105     }
106   }
107 
108   // Backward iteration test
109   {
110     SkipList<Key, Comparator>::Iterator iter(&list);
111     iter.SeekToLast();
112 
113     // Compare against model iterator
114     for (std::set<Key>::reverse_iterator model_iter = keys.rbegin();
115          model_iter != keys.rend();
116          ++model_iter) {
117       ASSERT_TRUE(iter.Valid());
118       ASSERT_EQ(*model_iter, iter.key());
119       iter.Prev();
120     }
121     ASSERT_TRUE(!iter.Valid());
122   }
123 }
124 
125 // We want to make sure that with a single writer and multiple
126 // concurrent readers (with no synchronization other than when a
127 // reader's iterator is created), the reader always observes all the
128 // data that was present in the skip list when the iterator was
129 // constructor.  Because insertions are happening concurrently, we may
130 // also observe new values that were inserted since the iterator was
131 // constructed, but we should never miss any values that were present
132 // at iterator construction time.
133 //
134 // We generate multi-part keys:
135 //     <key,gen,hash>
136 // where:
137 //     key is in range [0..K-1]
138 //     gen is a generation number for key
139 //     hash is hash(key,gen)
140 //
141 // The insertion code picks a random key, sets gen to be 1 + the last
142 // generation number inserted for that key, and sets hash to Hash(key,gen).
143 //
144 // At the beginning of a read, we snapshot the last inserted
145 // generation number for each key.  We then iterate, including random
146 // calls to Next() and Seek().  For every key we encounter, we
147 // check that it is either expected given the initial snapshot or has
148 // been concurrently added since the iterator started.
149 class ConcurrentTest {
150  private:
151   static const uint32_t K = 4;
152 
key(Key key)153   static uint64_t key(Key key) { return (key >> 40); }
gen(Key key)154   static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
hash(Key key)155   static uint64_t hash(Key key) { return key & 0xff; }
156 
HashNumbers(uint64_t k,uint64_t g)157   static uint64_t HashNumbers(uint64_t k, uint64_t g) {
158     uint64_t data[2] = { k, g };
159     return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
160   }
161 
MakeKey(uint64_t k,uint64_t g)162   static Key MakeKey(uint64_t k, uint64_t g) {
163     assert(sizeof(Key) == sizeof(uint64_t));
164     assert(k <= K);  // We sometimes pass K to seek to the end of the skiplist
165     assert(g <= 0xffffffffu);
166     return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
167   }
168 
IsValidKey(Key k)169   static bool IsValidKey(Key k) {
170     return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
171   }
172 
RandomTarget(Random * rnd)173   static Key RandomTarget(Random* rnd) {
174     switch (rnd->Next() % 10) {
175       case 0:
176         // Seek to beginning
177         return MakeKey(0, 0);
178       case 1:
179         // Seek to end
180         return MakeKey(K, 0);
181       default:
182         // Seek to middle
183         return MakeKey(rnd->Next() % K, 0);
184     }
185   }
186 
187   // Per-key generation
188   struct State {
189     port::AtomicPointer generation[K];
Setleveldb::ConcurrentTest::State190     void Set(int k, intptr_t v) {
191       generation[k].Release_Store(reinterpret_cast<void*>(v));
192     }
Getleveldb::ConcurrentTest::State193     intptr_t Get(int k) {
194       return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
195     }
196 
Stateleveldb::ConcurrentTest::State197     State() {
198       for (int k = 0; k < K; k++) {
199         Set(k, 0);
200       }
201     }
202   };
203 
204   // Current state of the test
205   State current_;
206 
207   Arena arena_;
208 
209   // SkipList is not protected by mu_.  We just use a single writer
210   // thread to modify it.
211   SkipList<Key, Comparator> list_;
212 
213  public:
ConcurrentTest()214   ConcurrentTest() : list_(Comparator(), &arena_) { }
215 
216   // REQUIRES: External synchronization
WriteStep(Random * rnd)217   void WriteStep(Random* rnd) {
218     const uint32_t k = rnd->Next() % K;
219     const intptr_t g = current_.Get(k) + 1;
220     const Key key = MakeKey(k, g);
221     list_.Insert(key);
222     current_.Set(k, g);
223   }
224 
ReadStep(Random * rnd)225   void ReadStep(Random* rnd) {
226     // Remember the initial committed state of the skiplist.
227     State initial_state;
228     for (int k = 0; k < K; k++) {
229       initial_state.Set(k, current_.Get(k));
230     }
231 
232     Key pos = RandomTarget(rnd);
233     SkipList<Key, Comparator>::Iterator iter(&list_);
234     iter.Seek(pos);
235     while (true) {
236       Key current;
237       if (!iter.Valid()) {
238         current = MakeKey(K, 0);
239       } else {
240         current = iter.key();
241         ASSERT_TRUE(IsValidKey(current)) << current;
242       }
243       ASSERT_LE(pos, current) << "should not go backwards";
244 
245       // Verify that everything in [pos,current) was not present in
246       // initial_state.
247       while (pos < current) {
248         ASSERT_LT(key(pos), K) << pos;
249 
250         // Note that generation 0 is never inserted, so it is ok if
251         // <*,0,*> is missing.
252         ASSERT_TRUE((gen(pos) == 0) ||
253                     (gen(pos) > initial_state.Get(key(pos)))
254                     ) << "key: " << key(pos)
255                       << "; gen: " << gen(pos)
256                       << "; initgen: "
257                       << initial_state.Get(key(pos));
258 
259         // Advance to next key in the valid key space
260         if (key(pos) < key(current)) {
261           pos = MakeKey(key(pos) + 1, 0);
262         } else {
263           pos = MakeKey(key(pos), gen(pos) + 1);
264         }
265       }
266 
267       if (!iter.Valid()) {
268         break;
269       }
270 
271       if (rnd->Next() % 2) {
272         iter.Next();
273         pos = MakeKey(key(pos), gen(pos) + 1);
274       } else {
275         Key new_target = RandomTarget(rnd);
276         if (new_target > pos) {
277           pos = new_target;
278           iter.Seek(new_target);
279         }
280       }
281     }
282   }
283 };
284 const uint32_t ConcurrentTest::K;
285 
286 // Simple test that does single-threaded testing of the ConcurrentTest
287 // scaffolding.
TEST(SkipTest,ConcurrentWithoutThreads)288 TEST(SkipTest, ConcurrentWithoutThreads) {
289   ConcurrentTest test;
290   Random rnd(test::RandomSeed());
291   for (int i = 0; i < 10000; i++) {
292     test.ReadStep(&rnd);
293     test.WriteStep(&rnd);
294   }
295 }
296 
297 class TestState {
298  public:
299   ConcurrentTest t_;
300   int seed_;
301   port::AtomicPointer quit_flag_;
302 
303   enum ReaderState {
304     STARTING,
305     RUNNING,
306     DONE
307   };
308 
TestState(int s)309   explicit TestState(int s)
310       : seed_(s),
311         quit_flag_(NULL),
312         state_(STARTING),
313         state_cv_(&mu_) {}
314 
Wait(ReaderState s)315   void Wait(ReaderState s) {
316     mu_.Lock();
317     while (state_ != s) {
318       state_cv_.Wait();
319     }
320     mu_.Unlock();
321   }
322 
Change(ReaderState s)323   void Change(ReaderState s) {
324     mu_.Lock();
325     state_ = s;
326     state_cv_.Signal();
327     mu_.Unlock();
328   }
329 
330  private:
331   port::Mutex mu_;
332   ReaderState state_;
333   port::CondVar state_cv_;
334 };
335 
ConcurrentReader(void * arg)336 static void ConcurrentReader(void* arg) {
337   TestState* state = reinterpret_cast<TestState*>(arg);
338   Random rnd(state->seed_);
339   int64_t reads = 0;
340   state->Change(TestState::RUNNING);
341   while (!state->quit_flag_.Acquire_Load()) {
342     state->t_.ReadStep(&rnd);
343     ++reads;
344   }
345   state->Change(TestState::DONE);
346 }
347 
RunConcurrent(int run)348 static void RunConcurrent(int run) {
349   const int seed = test::RandomSeed() + (run * 100);
350   Random rnd(seed);
351   const int N = 1000;
352   const int kSize = 1000;
353   for (int i = 0; i < N; i++) {
354     if ((i % 100) == 0) {
355       fprintf(stderr, "Run %d of %d\n", i, N);
356     }
357     TestState state(seed + 1);
358     Env::Default()->Schedule(ConcurrentReader, &state);
359     state.Wait(TestState::RUNNING);
360     for (int i = 0; i < kSize; i++) {
361       state.t_.WriteStep(&rnd);
362     }
363     state.quit_flag_.Release_Store(&state);  // Any non-NULL arg will do
364     state.Wait(TestState::DONE);
365   }
366 }
367 
TEST(SkipTest,Concurrent1)368 TEST(SkipTest, Concurrent1) { RunConcurrent(1); }
TEST(SkipTest,Concurrent2)369 TEST(SkipTest, Concurrent2) { RunConcurrent(2); }
TEST(SkipTest,Concurrent3)370 TEST(SkipTest, Concurrent3) { RunConcurrent(3); }
TEST(SkipTest,Concurrent4)371 TEST(SkipTest, Concurrent4) { RunConcurrent(4); }
TEST(SkipTest,Concurrent5)372 TEST(SkipTest, Concurrent5) { RunConcurrent(5); }
373 
374 }  // namespace leveldb
375 
main(int argc,char ** argv)376 int main(int argc, char** argv) {
377   return leveldb::test::RunAllTests();
378 }
379