1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "result_entries_window.h"
17
18 #include <limits>
19 #include "db_constant.h"
20 #include "db_errno.h"
21
22 using std::vector;
23 using std::move;
24
25 namespace DistributedDB {
ResultEntriesWindow()26 ResultEntriesWindow::ResultEntriesWindow()
27 : rawCursor_(nullptr),
28 windowSize_(0),
29 totalCount_(0),
30 begin_(0),
31 currentPosition_(0) {}
32
~ResultEntriesWindow()33 ResultEntriesWindow::~ResultEntriesWindow()
34 {
35 if (rawCursor_ != nullptr) {
36 (void)(rawCursor_->Close());
37 rawCursor_ = nullptr;
38 }
39 }
40
Init(IKvDBRawCursor * rawCursor,int64_t windowSize,double scale)41 int ResultEntriesWindow::Init(IKvDBRawCursor *rawCursor, int64_t windowSize, double scale)
42 {
43 if (rawCursor == nullptr ||
44 (windowSize <= 0 || windowSize > MAX_WINDOW_SIZE) ||
45 (scale <= std::numeric_limits<double>::epsilon() || scale > 1)) {
46 return -E_INVALID_ARGS;
47 }
48 int errCode = rawCursor->Open();
49 if (errCode != E_OK) {
50 return errCode;
51 }
52
53 rawCursor_ = rawCursor;
54 windowSize_ = static_cast<int64_t>(static_cast<double>(windowSize) * scale);
55 totalCount_ = rawCursor_->GetCount();
56 return E_OK;
57 }
58
GetTotalCount() const59 int ResultEntriesWindow::GetTotalCount() const
60 {
61 return totalCount_;
62 }
63
GetCurrentPosition() const64 int ResultEntriesWindow::GetCurrentPosition() const
65 {
66 return currentPosition_;
67 }
68
MoveToPosition(int position)69 bool ResultEntriesWindow::MoveToPosition(int position)
70 {
71 if ((rawCursor_ == nullptr && buffer_.size() == 0) || (position < 0 || position >= totalCount_)) {
72 return false;
73 }
74 if (buffer_.size() == 0) {
75 if (SetCursor(0, position) != E_OK) {
76 return false;
77 }
78 return true;
79 }
80 int last = begin_ + buffer_.size() - 1;
81 if (position > last) {
82 buffer_.clear();
83 buffer_.shrink_to_fit();
84 if (SetCursor(last + 1, position) != E_OK) {
85 return false;
86 }
87 return true;
88 } else if (position < begin_) {
89 if (rawCursor_ == nullptr) {
90 return false;
91 }
92 buffer_.clear();
93 buffer_.shrink_to_fit();
94 if (rawCursor_->Reload() != E_OK) {
95 ResetWindow();
96 return false;
97 }
98 if (SetCursor(0, position) != E_OK) {
99 return false;
100 }
101 return true;
102 } else {
103 currentPosition_ = position;
104 }
105 return true;
106 }
107
GetEntry(Entry & entry) const108 int ResultEntriesWindow::GetEntry(Entry &entry) const
109 {
110 if (rawCursor_ == nullptr && buffer_.size() == 0) {
111 return -E_NOT_INIT;
112 }
113 if (totalCount_ == 0) {
114 return -E_NOT_FOUND;
115 }
116 if (buffer_.size() == 0) {
117 int errCode = LoadData(0, currentPosition_);
118 if (errCode != E_OK) {
119 return errCode;
120 }
121 }
122 entry = buffer_[currentPosition_ - begin_];
123 return E_OK;
124 }
125
ResetWindow()126 void ResultEntriesWindow::ResetWindow()
127 {
128 buffer_.clear();
129 buffer_.shrink_to_fit();
130 if (rawCursor_ != nullptr) {
131 (void)(rawCursor_->Reload());
132 }
133 begin_ = 0;
134 currentPosition_ = 0;
135 return;
136 }
137
SetCursor(int begin,int target)138 int ResultEntriesWindow::SetCursor(int begin, int target)
139 {
140 int errCode = LoadData(begin, target);
141 if (errCode == E_OK) {
142 begin_ = target;
143 currentPosition_ = target;
144 } else {
145 ResetWindow();
146 }
147 return errCode;
148 }
149
LoadData(int begin,int target) const150 int ResultEntriesWindow::LoadData(int begin, int target) const
151 {
152 if (rawCursor_ == nullptr) {
153 return -E_NOT_INIT;
154 }
155 if (target < begin || target >= totalCount_) {
156 return -E_INVALID_ARGS;
157 }
158 int cursor = begin;
159 int errCode = E_OK;
160 for (; cursor < target; cursor++) {
161 Entry next;
162 errCode = rawCursor_->GetNext(next, false);
163 if (errCode != E_OK) {
164 return errCode;
165 }
166 }
167 int64_t bufferSize = 0;
168 for (; cursor < totalCount_ && bufferSize < windowSize_; cursor++) {
169 Entry next;
170 errCode = rawCursor_->GetNext(next, true);
171 if (errCode != E_OK) {
172 return errCode;
173 }
174 // filter the abnormal data.
175 if (next.key.size() > DBConstant::MAX_KEY_SIZE || next.value.size() > DBConstant::MAX_VALUE_SIZE) {
176 continue;
177 }
178 bufferSize += next.key.size() + next.value.size();
179 buffer_.push_back(move(next));
180 }
181 if (buffer_.size() == static_cast<size_t>(totalCount_)) {
182 (void)(rawCursor_->Close());
183 rawCursor_ = nullptr;
184 }
185 return E_OK;
186 }
187 }
188