1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "table/merger.h"
6
7 #include "leveldb/comparator.h"
8 #include "leveldb/iterator.h"
9 #include "table/iterator_wrapper.h"
10
11 namespace leveldb {
12
13 namespace {
14 class MergingIterator : public Iterator {
15 public:
MergingIterator(const Comparator * comparator,Iterator ** children,int n)16 MergingIterator(const Comparator* comparator, Iterator** children, int n)
17 : comparator_(comparator),
18 children_(new IteratorWrapper[n]),
19 n_(n),
20 current_(NULL),
21 direction_(kForward) {
22 for (int i = 0; i < n; i++) {
23 children_[i].Set(children[i]);
24 }
25 }
26
~MergingIterator()27 virtual ~MergingIterator() {
28 delete[] children_;
29 }
30
Valid() const31 virtual bool Valid() const {
32 return (current_ != NULL);
33 }
34
SeekToFirst()35 virtual void SeekToFirst() {
36 for (int i = 0; i < n_; i++) {
37 children_[i].SeekToFirst();
38 }
39 FindSmallest();
40 direction_ = kForward;
41 }
42
SeekToLast()43 virtual void SeekToLast() {
44 for (int i = 0; i < n_; i++) {
45 children_[i].SeekToLast();
46 }
47 FindLargest();
48 direction_ = kReverse;
49 }
50
Seek(const Slice & target)51 virtual void Seek(const Slice& target) {
52 for (int i = 0; i < n_; i++) {
53 children_[i].Seek(target);
54 }
55 FindSmallest();
56 direction_ = kForward;
57 }
58
Next()59 virtual void Next() {
60 assert(Valid());
61
62 // Ensure that all children are positioned after key().
63 // If we are moving in the forward direction, it is already
64 // true for all of the non-current_ children since current_ is
65 // the smallest child and key() == current_->key(). Otherwise,
66 // we explicitly position the non-current_ children.
67 if (direction_ != kForward) {
68 for (int i = 0; i < n_; i++) {
69 IteratorWrapper* child = &children_[i];
70 if (child != current_) {
71 child->Seek(key());
72 if (child->Valid() &&
73 comparator_->Compare(key(), child->key()) == 0) {
74 child->Next();
75 }
76 }
77 }
78 direction_ = kForward;
79 }
80
81 current_->Next();
82 FindSmallest();
83 }
84
Prev()85 virtual void Prev() {
86 assert(Valid());
87
88 // Ensure that all children are positioned before key().
89 // If we are moving in the reverse direction, it is already
90 // true for all of the non-current_ children since current_ is
91 // the largest child and key() == current_->key(). Otherwise,
92 // we explicitly position the non-current_ children.
93 if (direction_ != kReverse) {
94 for (int i = 0; i < n_; i++) {
95 IteratorWrapper* child = &children_[i];
96 if (child != current_) {
97 child->Seek(key());
98 if (child->Valid()) {
99 // Child is at first entry >= key(). Step back one to be < key()
100 child->Prev();
101 } else {
102 // Child has no entries >= key(). Position at last entry.
103 child->SeekToLast();
104 }
105 }
106 }
107 direction_ = kReverse;
108 }
109
110 current_->Prev();
111 FindLargest();
112 }
113
key() const114 virtual Slice key() const {
115 assert(Valid());
116 return current_->key();
117 }
118
value() const119 virtual Slice value() const {
120 assert(Valid());
121 return current_->value();
122 }
123
status() const124 virtual Status status() const {
125 Status status;
126 for (int i = 0; i < n_; i++) {
127 status = children_[i].status();
128 if (!status.ok()) {
129 break;
130 }
131 }
132 return status;
133 }
134
135 private:
136 void FindSmallest();
137 void FindLargest();
138
139 // We might want to use a heap in case there are lots of children.
140 // For now we use a simple array since we expect a very small number
141 // of children in leveldb.
142 const Comparator* comparator_;
143 IteratorWrapper* children_;
144 int n_;
145 IteratorWrapper* current_;
146
147 // Which direction is the iterator moving?
148 enum Direction {
149 kForward,
150 kReverse
151 };
152 Direction direction_;
153 };
154
FindSmallest()155 void MergingIterator::FindSmallest() {
156 IteratorWrapper* smallest = NULL;
157 for (int i = 0; i < n_; i++) {
158 IteratorWrapper* child = &children_[i];
159 if (child->Valid()) {
160 if (smallest == NULL) {
161 smallest = child;
162 } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
163 smallest = child;
164 }
165 }
166 }
167 current_ = smallest;
168 }
169
FindLargest()170 void MergingIterator::FindLargest() {
171 IteratorWrapper* largest = NULL;
172 for (int i = n_-1; i >= 0; i--) {
173 IteratorWrapper* child = &children_[i];
174 if (child->Valid()) {
175 if (largest == NULL) {
176 largest = child;
177 } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
178 largest = child;
179 }
180 }
181 }
182 current_ = largest;
183 }
184 } // namespace
185
NewMergingIterator(const Comparator * cmp,Iterator ** list,int n)186 Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
187 assert(n >= 0);
188 if (n == 0) {
189 return NewEmptyIterator();
190 } else if (n == 1) {
191 return list[0];
192 } else {
193 return new MergingIterator(cmp, list, n);
194 }
195 }
196
197 } // namespace leveldb
198