1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_result_set.h"
17 #include <algorithm>
18 #include "log_print.h"
19 #include "db_errno.h"
20 #include "sqlite_single_ver_forward_cursor.h"
21 #include "sqlite_single_ver_natural_store.h"
22 #include "sqlite_single_ver_storage_executor.h"
23
24 namespace DistributedDB {
25 namespace {
26 const int64_t MEM_WINDOW_SIZE = 0xFFFFFFFF; // 4G for max
27 const double MEM_WINDOW_SCALE = 0.5; // set default window size to 2G
28 const double DEFAULT_WINDOW_SCALE = 1; // For non-mem db
29 const int64_t WINDOW_SIZE_MB_UNIT = 1024 * 1024; // 1024 is scale
30 }
31
SQLiteSingleVerResultSet(SQLiteSingleVerNaturalStore * kvDB,const Key & keyPrefix,const Option & option)32 SQLiteSingleVerResultSet::SQLiteSingleVerResultSet(SQLiteSingleVerNaturalStore *kvDB, const Key &keyPrefix,
33 const Option& option) : option_(option), type_(ResultSetType::KEYPREFIX), keyPrefix_(keyPrefix), kvDB_(kvDB) {}
34
SQLiteSingleVerResultSet(SQLiteSingleVerNaturalStore * kvDB,const QueryObject & queryObj,const Option & option)35 SQLiteSingleVerResultSet::SQLiteSingleVerResultSet(SQLiteSingleVerNaturalStore *kvDB, const QueryObject &queryObj,
36 const Option& option) : option_(option), type_(ResultSetType::QUERY), queryObj_(queryObj), kvDB_(kvDB) {}
37
~SQLiteSingleVerResultSet()38 SQLiteSingleVerResultSet::~SQLiteSingleVerResultSet()
39 {
40 isOpen_ = false;
41 count_ = 0;
42 position_ = INIT_POSTION;
43 kvDB_ = nullptr;
44 window_ = nullptr;
45 rawCursor_ = nullptr;
46 handle_ = nullptr;
47 cacheStartPosition_ = INIT_POSTION;
48 }
49
50 // The user get KvStoreResultSet after Open function called, so no need mutex during open procedure
Open(bool isMemDb)51 int SQLiteSingleVerResultSet::Open(bool isMemDb)
52 {
53 if (isOpen_) {
54 return E_OK;
55 }
56 if (kvDB_ == nullptr) { // Unlikely
57 return -E_INVALID_ARGS;
58 }
59 if (option_.cacheMode == ResultSetCacheMode::CACHE_FULL_ENTRY) {
60 return OpenForCacheFullEntryMode(isMemDb);
61 } else {
62 return OpenForCacheEntryIdMode();
63 }
64 }
65
OpenForCacheFullEntryMode(bool isMemDb)66 int SQLiteSingleVerResultSet::OpenForCacheFullEntryMode(bool isMemDb)
67 {
68 if (type_ == ResultSetType::KEYPREFIX) {
69 rawCursor_ = new (std::nothrow) SQLiteSingleVerForwardCursor(kvDB_, keyPrefix_);
70 } else {
71 rawCursor_ = new (std::nothrow) SQLiteSingleVerForwardCursor(kvDB_, queryObj_);
72 }
73 if (rawCursor_ == nullptr) {
74 LOGE("[SqlSinResSet][OpenForEntry] OOM When Create ForwardCursor.");
75 return E_OUT_OF_MEMORY;
76 }
77 window_ = new (std::nothrow) ResultEntriesWindow();
78 if (window_ == nullptr) {
79 LOGE("[SqlSinResSet][OpenForEntry] OOM When Create EntryWindow.");
80 delete rawCursor_;
81 rawCursor_ = nullptr;
82 return -E_OUT_OF_MEMORY;
83 }
84 // cacheMaxSize is within [1,16]
85 int64_t windowSize = isMemDb ? MEM_WINDOW_SIZE : (option_.cacheMaxSize * WINDOW_SIZE_MB_UNIT);
86 double scale = isMemDb ? MEM_WINDOW_SCALE : DEFAULT_WINDOW_SCALE;
87 int errCode = window_->Init(rawCursor_, windowSize, scale);
88 if (errCode != E_OK) {
89 LOGE("[SqlSinResSet][OpenForEntry] EntryWindow Init Fail, ErrCode=%d.", errCode);
90 delete window_;
91 window_ = nullptr;
92 delete rawCursor_;
93 rawCursor_ = nullptr;
94 return errCode;
95 }
96 count_ = window_->GetTotalCount();
97 isOpen_ = true;
98 LOGD("[SqlSinResSet][OpenForEntry] Type=%d, CacheMaxSize=%d(MB), Count=%d, IsMem=%d.", static_cast<int>(type_),
99 option_.cacheMaxSize, count_, isMemDb);
100 return E_OK;
101 }
102
OpenForCacheEntryIdMode()103 int SQLiteSingleVerResultSet::OpenForCacheEntryIdMode()
104 {
105 int errCode = E_OK;
106 handle_ = kvDB_->GetHandle(false, errCode);
107 if (handle_ == nullptr) {
108 LOGE("[SqlSinResSet][OpenForRowId] Get handle fail, errCode=%d.", errCode);
109 return errCode;
110 }
111 // cacheMaxSize is within [1,16], rowId is of type int64_t
112 uint32_t cacheLimit = option_.cacheMaxSize * (WINDOW_SIZE_MB_UNIT / sizeof(int64_t));
113 if (type_ == ResultSetType::KEYPREFIX) {
114 errCode = handle_->OpenResultSetForCacheRowIdMode(keyPrefix_, cachedRowIds_, cacheLimit, count_);
115 } else {
116 errCode = handle_->OpenResultSetForCacheRowIdMode(queryObj_, cachedRowIds_, cacheLimit, count_);
117 }
118 if (errCode != E_OK) {
119 LOGE("[SqlSinResSet][OpenForRowId] Open ResultSet fail, errCode=%d.", errCode);
120 kvDB_->ReleaseHandle(handle_);
121 cachedRowIds_.clear();
122 return errCode;
123 }
124 // If no result, then nothing is cached, so the cacheStartPosition_ is still INIT_POSTION
125 if (count_ != 0) {
126 cacheStartPosition_ = 0;
127 }
128 isOpen_ = true;
129 LOGD("[SqlSinResSet][OpenForRowId] Type=%d, CacheMaxSize=%d(MB), Count=%d, Cached=%zu.", static_cast<int>(type_),
130 option_.cacheMaxSize, count_, cachedRowIds_.size());
131 return E_OK;
132 }
133
GetCount() const134 int SQLiteSingleVerResultSet::GetCount() const
135 {
136 // count_ never changed after ResultSet opened
137 return count_;
138 }
139
GetPosition() const140 int SQLiteSingleVerResultSet::GetPosition() const
141 {
142 std::lock_guard<std::mutex> lockGuard(mutex_);
143 return position_;
144 }
145
MoveTo(int position) const146 int SQLiteSingleVerResultSet::MoveTo(int position) const
147 {
148 std::lock_guard<std::mutex> lockGuard(mutex_);
149 if (!isOpen_) {
150 return -E_RESULT_SET_STATUS_INVALID;
151 }
152 if (count_ == 0) {
153 position_ = (position >= 0) ? 0 : INIT_POSTION;
154 LOGW("[SqlSinResSet][MoveTo] Empty ResultSet.");
155 return -E_RESULT_SET_EMPTY;
156 }
157 if (position < 0) {
158 position_ = INIT_POSTION;
159 LOGW("[SqlSinResSet][MoveTo] Target Position=%d invalid.", position);
160 return -E_INVALID_ARGS;
161 }
162 if (position >= count_) {
163 position_ = count_;
164 LOGW("[SqlSinResSet][MoveTo] Target Position=%d Exceed Count=%d.", position, count_);
165 return -E_INVALID_ARGS;
166 }
167 if (position_ == position) {
168 return E_OK;
169 }
170 if (option_.cacheMode == ResultSetCacheMode::CACHE_FULL_ENTRY) {
171 return MoveToForCacheFullEntryMode(position);
172 } else {
173 return MoveToForCacheEntryIdMode(position);
174 }
175 }
176
MoveToForCacheFullEntryMode(int position) const177 int SQLiteSingleVerResultSet::MoveToForCacheFullEntryMode(int position) const
178 {
179 if (window_->MoveToPosition(position)) {
180 position_ = position;
181 return E_OK;
182 }
183 position_ = INIT_POSTION;
184 LOGE("[SqlSinResSet][MoveForEntry] Move to position=%d fail.", position);
185 return -E_UNEXPECTED_DATA;
186 }
187
MoveToForCacheEntryIdMode(int position) const188 int SQLiteSingleVerResultSet::MoveToForCacheEntryIdMode(int position) const
189 {
190 // The parameter position now is in [0, count_) with this resultSet not empty
191 // cacheEndPosition is just after cachedRowIds_, the cached range is [cacheStartPosition_, cacheEndPosition)
192 int cacheEndPosition = cacheStartPosition_ + cachedRowIds_.size();
193 if (position >= cacheStartPosition_ && position < cacheEndPosition) {
194 // Already in the cachedRowId range, Just move position
195 position_ = position;
196 return E_OK;
197 }
198 // Not in the cachedRowId range, but valid position, we should reload the cachedRowIds to contain this position
199 int newCacheStartPos = position;
200 // cacheMaxSize is within [1,16], rowId is of type int64_t
201 uint32_t cacheLimit = option_.cacheMaxSize * (WINDOW_SIZE_MB_UNIT / sizeof(int64_t));
202 if (position > cacheStartPosition_) {
203 // Move Forward
204 int newCacheEndPos = newCacheStartPos + cacheLimit;
205 if (newCacheEndPos > count_) {
206 // Since startPos in [0, count_), So the right in (0, cacheLimit), So position still in range
207 newCacheStartPos -= (newCacheEndPos - count_);
208 }
209 } else {
210 // Move Backward
211 newCacheStartPos -= (cacheLimit - 1); // Attention, subtract by 1 to ensure position still in range
212 }
213 newCacheStartPos = std::max(newCacheStartPos, 0); // Adjust to at least 0 if less then 0
214 // Clear rowId cache to accept new rowIds
215 cachedRowIds_.clear();
216 int errCode;
217 if (type_ == ResultSetType::KEYPREFIX) {
218 errCode = handle_->ReloadResultSetForCacheRowIdMode(keyPrefix_, cachedRowIds_, cacheLimit, newCacheStartPos);
219 } else {
220 errCode = handle_->ReloadResultSetForCacheRowIdMode(queryObj_, cachedRowIds_, cacheLimit, newCacheStartPos);
221 }
222 if (errCode != E_OK) {
223 LOGE("[SqlSinResSet][MoveForRowid] Move to position=%d, Reload fail, errCode=%d.", position, errCode);
224 // What else shall we do if error happened ?
225 cachedRowIds_.clear();
226 cacheStartPosition_ = INIT_POSTION;
227 position_ = INIT_POSTION; // Reset Position As MoveForEntry Do
228 return -E_UNEXPECTED_DATA;
229 }
230 LOGD("[SqlSinResSet][MoveForRowid] Reload: position=%d, cacheStartPos=%d, cached=%zu, count=%d.",
231 position, newCacheStartPos, cachedRowIds_.size(), count_);
232 // Everything OK
233 position_ = position;
234 cacheStartPosition_ = newCacheStartPos;
235 return E_OK;
236 }
237
GetEntry(Entry & entry) const238 int SQLiteSingleVerResultSet::GetEntry(Entry &entry) const
239 {
240 std::lock_guard<std::mutex> lockGuard(mutex_);
241 if (!isOpen_ || count_ == 0) {
242 return -E_NO_SUCH_ENTRY;
243 }
244 if (position_ > INIT_POSTION && position_ < count_) {
245 // If position_ in the valid range, it can be guaranteed that everything is ok without errors
246 if (option_.cacheMode == ResultSetCacheMode::CACHE_FULL_ENTRY) {
247 return window_->GetEntry(entry);
248 } else {
249 // It can be guaranteed position_ in the range [cacheStartPosition_, cacheEndPosition)
250 // For CodeDex false alarm, we still do the check which is not necessary
251 int cacheIndex = position_ - cacheStartPosition_;
252 if (cacheIndex < 0 || cacheIndex >= static_cast<int>(cachedRowIds_.size())) { // Not Possible
253 LOGE("[SqlSinResSet][GetEntry] Internal Error: Position=%d, CacheStartPos=%d, cached=%zu.", position_,
254 cacheStartPosition_, cachedRowIds_.size());
255 return -E_INTERNAL_ERROR;
256 }
257 int errCode = handle_->GetEntryByRowId(cachedRowIds_[cacheIndex], entry);
258 if (errCode != E_OK) {
259 LOGE("[SqlSinResSet][GetEntry] GetEntryByRowId fail, errCode=%d.", errCode);
260 return errCode;
261 }
262 return E_OK;
263 }
264 }
265 return -E_NO_SUCH_ENTRY;
266 }
267
Close()268 void SQLiteSingleVerResultSet::Close()
269 {
270 std::lock_guard<std::mutex> lockGuard(mutex_);
271 if (!isOpen_) {
272 return;
273 }
274 if (option_.cacheMode == ResultSetCacheMode::CACHE_FULL_ENTRY) {
275 CloseForCacheFullEntryMode();
276 } else {
277 CloseForCacheEntryIdMode();
278 }
279 isOpen_ = false;
280 count_ = 0;
281 position_ = INIT_POSTION;
282 LOGD("[SqlSinResSet][Close] Done, Type=%d, Mode=%d.", static_cast<int>(type_), static_cast<int>(option_.cacheMode));
283 }
284
CloseForCacheFullEntryMode()285 void SQLiteSingleVerResultSet::CloseForCacheFullEntryMode()
286 {
287 // Attention! Must Delete EntryWindow First(will call ForwardCursor::Close), then delete ForwardCursor.
288 // ForwardCursor::Close will call Executor::CloseResultSet(Reset the statement and rollback transaction)
289 delete window_; // It is defined behavior to delete even a nullptr
290 window_ = nullptr;
291 // Attention! Delete ForwardCursor Later.
292 delete rawCursor_; // It is defined behavior to delete even a nullptr
293 rawCursor_ = nullptr;
294 }
295
CloseForCacheEntryIdMode()296 void SQLiteSingleVerResultSet::CloseForCacheEntryIdMode()
297 {
298 cacheStartPosition_ = INIT_POSTION;
299 cachedRowIds_.clear();
300 // In Fact : handle_ and kvDB_ is guaranteed to be not nullptr
301 if (handle_ != nullptr) {
302 handle_->CloseResultSet();
303 kvDB_->ReleaseHandle(handle_);
304 }
305 }
306 } // namespace DistributedDB
307