• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "table/merger.h"
6 
7 #include "leveldb/comparator.h"
8 #include "leveldb/iterator.h"
9 #include "table/iterator_wrapper.h"
10 
11 namespace leveldb {
12 
13 namespace {
14 class MergingIterator : public Iterator {
15  public:
MergingIterator(const Comparator * comparator,Iterator ** children,int n)16   MergingIterator(const Comparator* comparator, Iterator** children, int n)
17       : comparator_(comparator),
18         children_(new IteratorWrapper[n]),
19         n_(n),
20         current_(NULL),
21         direction_(kForward) {
22     for (int i = 0; i < n; i++) {
23       children_[i].Set(children[i]);
24     }
25   }
26 
~MergingIterator()27   virtual ~MergingIterator() {
28     delete[] children_;
29   }
30 
Valid() const31   virtual bool Valid() const {
32     return (current_ != NULL);
33   }
34 
SeekToFirst()35   virtual void SeekToFirst() {
36     for (int i = 0; i < n_; i++) {
37       children_[i].SeekToFirst();
38     }
39     FindSmallest();
40     direction_ = kForward;
41   }
42 
SeekToLast()43   virtual void SeekToLast() {
44     for (int i = 0; i < n_; i++) {
45       children_[i].SeekToLast();
46     }
47     FindLargest();
48     direction_ = kReverse;
49   }
50 
Seek(const Slice & target)51   virtual void Seek(const Slice& target) {
52     for (int i = 0; i < n_; i++) {
53       children_[i].Seek(target);
54     }
55     FindSmallest();
56     direction_ = kForward;
57   }
58 
Next()59   virtual void Next() {
60     assert(Valid());
61 
62     // Ensure that all children are positioned after key().
63     // If we are moving in the forward direction, it is already
64     // true for all of the non-current_ children since current_ is
65     // the smallest child and key() == current_->key().  Otherwise,
66     // we explicitly position the non-current_ children.
67     if (direction_ != kForward) {
68       for (int i = 0; i < n_; i++) {
69         IteratorWrapper* child = &children_[i];
70         if (child != current_) {
71           child->Seek(key());
72           if (child->Valid() &&
73               comparator_->Compare(key(), child->key()) == 0) {
74             child->Next();
75           }
76         }
77       }
78       direction_ = kForward;
79     }
80 
81     current_->Next();
82     FindSmallest();
83   }
84 
Prev()85   virtual void Prev() {
86     assert(Valid());
87 
88     // Ensure that all children are positioned before key().
89     // If we are moving in the reverse direction, it is already
90     // true for all of the non-current_ children since current_ is
91     // the largest child and key() == current_->key().  Otherwise,
92     // we explicitly position the non-current_ children.
93     if (direction_ != kReverse) {
94       for (int i = 0; i < n_; i++) {
95         IteratorWrapper* child = &children_[i];
96         if (child != current_) {
97           child->Seek(key());
98           if (child->Valid()) {
99             // Child is at first entry >= key().  Step back one to be < key()
100             child->Prev();
101           } else {
102             // Child has no entries >= key().  Position at last entry.
103             child->SeekToLast();
104           }
105         }
106       }
107       direction_ = kReverse;
108     }
109 
110     current_->Prev();
111     FindLargest();
112   }
113 
key() const114   virtual Slice key() const {
115     assert(Valid());
116     return current_->key();
117   }
118 
value() const119   virtual Slice value() const {
120     assert(Valid());
121     return current_->value();
122   }
123 
status() const124   virtual Status status() const {
125     Status status;
126     for (int i = 0; i < n_; i++) {
127       status = children_[i].status();
128       if (!status.ok()) {
129         break;
130       }
131     }
132     return status;
133   }
134 
135  private:
136   void FindSmallest();
137   void FindLargest();
138 
139   // We might want to use a heap in case there are lots of children.
140   // For now we use a simple array since we expect a very small number
141   // of children in leveldb.
142   const Comparator* comparator_;
143   IteratorWrapper* children_;
144   int n_;
145   IteratorWrapper* current_;
146 
147   // Which direction is the iterator moving?
148   enum Direction {
149     kForward,
150     kReverse
151   };
152   Direction direction_;
153 };
154 
FindSmallest()155 void MergingIterator::FindSmallest() {
156   IteratorWrapper* smallest = NULL;
157   for (int i = 0; i < n_; i++) {
158     IteratorWrapper* child = &children_[i];
159     if (child->Valid()) {
160       if (smallest == NULL) {
161         smallest = child;
162       } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
163         smallest = child;
164       }
165     }
166   }
167   current_ = smallest;
168 }
169 
FindLargest()170 void MergingIterator::FindLargest() {
171   IteratorWrapper* largest = NULL;
172   for (int i = n_-1; i >= 0; i--) {
173     IteratorWrapper* child = &children_[i];
174     if (child->Valid()) {
175       if (largest == NULL) {
176         largest = child;
177       } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
178         largest = child;
179       }
180     }
181   }
182   current_ = largest;
183 }
184 }  // namespace
185 
NewMergingIterator(const Comparator * cmp,Iterator ** list,int n)186 Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
187   assert(n >= 0);
188   if (n == 0) {
189     return NewEmptyIterator();
190   } else if (n == 1) {
191     return list[0];
192   } else {
193     return new MergingIterator(cmp, list, n);
194   }
195 }
196 
197 }  // namespace leveldb
198