1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <gtest/gtest.h>
16 #include <ctime>
17 #include <cmath>
18 #include <random>
19 #include <chrono>
20 #include <thread>
21 #include <cstdio>
22 #include <string>
23
24 #include "distributeddb_data_generator.h"
25 #include "distributed_test_tools.h"
26 #include "kv_store_delegate.h"
27 #include "kv_store_delegate_manager.h"
28
29 using namespace std;
30 using namespace testing;
31 #if defined TESTCASES_USING_GTEST_EXT
32 using namespace testing::ext;
33 #endif
34 using namespace DistributedDB;
35 using namespace DistributedDBDataGenerator;
36
37 namespace DistributeddbKvConcurrencyCrud {
38 const bool IS_LOCAL = false;
39 #ifdef KV_CON_CRUD
40 const unsigned int READ_RECORDS_NUM_START = 1;
41 const unsigned int READ_RECORDS_NUM_END = 128;
42 const unsigned int WRITE_RECORDS_NUM_START = 1;
43 const unsigned int WRITE_RECORDS_NUM_END = 9999;
44 const unsigned long LONG_TIME_TEST_SECONDS = 10;
45 #endif
46 const unsigned long LONG_TIME_INTERVAL_MILLSECONDS = 5;
47
48 KvStoreDelegate *g_kvStoreConcurDelegate = nullptr; // the delegate used in this suit.
49 KvStoreDelegateManager *g_concurManager = nullptr;
50
51 enum ReadOrWriteTag {
52 READ = 0,
53 WRITE = 1,
54 DELETE = 2
55 };
56
57 struct ConcurParam {
58 unsigned int threadId_;
59 ReadOrWriteTag tag_;
60 DistributedDB::Entry* entryPtr_;
61 };
62
63 // the thread runnnig methods
ConcurOperThread(ConcurParam * args)64 void ConcurOperThread(ConcurParam* args)
65 {
66 auto paramsPtr = static_cast<ConcurParam *>(args);
67 DBStatus status;
68 Value valueResult;
69 string strKey, strValue;
70
71 if (paramsPtr->tag_ == READ) {
72 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
73 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
74 if (valueResult.size() != 0) {
75 EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
76 }
77 } else if (paramsPtr->tag_ == WRITE) {
78 status = DistributedTestTools::Put(*g_kvStoreConcurDelegate,
79 paramsPtr->entryPtr_->key, paramsPtr->entryPtr_->value);
80 ASSERT_EQ(status, DBStatus::OK);
81 Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
82 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
83
84 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
85 if (valueResult.size() != 0) {
86 EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
87 }
88 } else {
89 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
90 if (valueResult.size() != 0) {
91 status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
92 ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
93
94 Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
95 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
96 } else {
97 status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
98 ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
99 }
100 }
101 }
102
KvCalculateTime(ConcurParam * & kvThreadParam,const Entry entryCurrent,const SysTime & start,SysDurTime dur)103 double KvCalculateTime(ConcurParam *&kvThreadParam, const Entry entryCurrent, const SysTime &start, SysDurTime dur)
104 {
105 SysTime end;
106 kvThreadParam->entryPtr_->key = entryCurrent.key;
107 kvThreadParam->entryPtr_->value = entryCurrent.value;
108 std::thread thread = std::thread(ConcurOperThread, reinterpret_cast<ConcurParam *>(kvThreadParam));
109 thread.join();
110
111 std::this_thread::sleep_for(std::chrono::duration<float, std::milli>(LONG_TIME_INTERVAL_MILLSECONDS));
112 end = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
113 dur = std::chrono::duration_cast<chrono::microseconds>(end - start);
114 double operInterval = static_cast<double>(dur.count()) * chrono::microseconds::period::num
115 / chrono::microseconds::period::den;
116 delete kvThreadParam->entryPtr_;
117 kvThreadParam->entryPtr_ = nullptr;
118 delete kvThreadParam;
119 kvThreadParam = nullptr;
120 return operInterval;
121 }
122
123 class DistributeddbKvConcurrencyCrudTest : public testing::Test {
124 public:
125 static void SetUpTestCase(void);
126 static void TearDownTestCase(void);
127 void SetUp();
128 void TearDown();
129 private:
130 };
131
SetUpTestCase(void)132 void DistributeddbKvConcurrencyCrudTest::SetUpTestCase(void)
133 {
134 }
135
TearDownTestCase(void)136 void DistributeddbKvConcurrencyCrudTest::TearDownTestCase(void)
137 {
138 }
139
SetUp(void)140 void DistributeddbKvConcurrencyCrudTest::SetUp(void)
141 {
142 MST_LOG("SetUpTest before all cases local[%d].", IS_LOCAL);
143 RemoveDir(DIRECTOR);
144
145 UnitTest *test = UnitTest::GetInstance();
146 ASSERT_NE(test, nullptr);
147 const TestInfo *testinfo = test->current_test_info();
148 ASSERT_NE(testinfo, nullptr);
149 string testCaseName = string(testinfo->name());
150 MST_LOG("[SetUp] test case %s is start to run", testCaseName.c_str());
151
152 g_kvStoreConcurDelegate = DistributedTestTools::GetDelegateSuccess(g_concurManager,
153 g_kvdbParameter1, g_kvOption);
154 ASSERT_TRUE(g_concurManager != nullptr && g_kvStoreConcurDelegate != nullptr);
155 }
156
TearDown(void)157 void DistributeddbKvConcurrencyCrudTest::TearDown(void)
158 {
159 MST_LOG("TearDownTest after all cases.");
160 EXPECT_TRUE(g_concurManager->CloseKvStore(g_kvStoreConcurDelegate) == OK);
161 g_kvStoreConcurDelegate = nullptr;
162 DBStatus status = g_concurManager->DeleteKvStore(STORE_ID_1);
163 EXPECT_TRUE(status == DistributedDB::DBStatus::OK) << "fail to delete exist kvdb";
164 delete g_concurManager;
165 g_concurManager = nullptr;
166 RemoveDir(DIRECTOR);
167 }
168
169 /*
170 * @tc.name: Read 002
171 * @tc.desc: Verify that support long-term multi-threaded concurrent reading.
172 * @tc.type: Long time
173 * @tc.require: SR000BUH3J
174 * @tc.author: luqianfu
175 */
176 #ifdef KV_CON_CRUD
177 HWTEST_F(DistributeddbKvConcurrencyCrudTest, Read002, TestSize.Level3)
178 {
179 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
180
181 vector<Entry> entriesBatch;
182 vector<Key> allKeys;
183 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
184
185 /**
186 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
187 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
188 */
189 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
190 EXPECT_TRUE(status == DBStatus::OK);
191 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
192
193 /**
194 * @tc.steps: step2. create 6 threads and read data from db concurrently for 10s.
195 * @tc.expected: step2. read successfully and has no exception.
196 */
197 std::random_device randDevReadKeyNo;
198 std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
199 std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
200 unsigned long randKeyNo;
201 unsigned int threadCurId = 0;
202
203 SysTime start;
204 SysDurTime dur;
205 double operInterval = 0.0;
206
207 start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
208 Entry entryCurrent;
209 while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
210 auto threadParam = new (std::nothrow) ConcurParam;
211 ASSERT_NE(threadParam, nullptr);
212 threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
213 ASSERT_NE(threadParam->entryPtr_, nullptr);
214 threadParam->threadId_ = threadCurId++;
215 threadParam->tag_ = READ;
216 randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
217
218 GenerateRecord(randKeyNo, entryCurrent);
219 operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
220 }
221
222 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
223 }
224
StartRandThread()225 void StartRandThread()
226 {
227 std::vector<std::thread> threads;
228 std::random_device randDevReadKeyNo, randDevWriteKeyNo, randDevTag;
229 std::mt19937 genRandReadKeyNo(randDevReadKeyNo()), genRandWriteKeyNo(randDevWriteKeyNo()), genRandTag(randDevTag());
230 std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
231 std::uniform_int_distribution<unsigned long> disRandWriteKeyNo(WRITE_RECORDS_NUM_START, WRITE_RECORDS_NUM_END);
232 std::uniform_int_distribution<int> disRandTag(READ, WRITE);
233 unsigned long randKeyNo;
234 unsigned int threadCurId = 0;
235 int randTag;
236 SysTime start;
237 SysDurTime dur;
238 double operInterval = 0.0;
239 start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
240 while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
241 ConcurParam *threadParam = new (std::nothrow) ConcurParam;
242 ASSERT_NE(threadParam, nullptr);
243 threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
244 ASSERT_NE(threadParam->entryPtr_, nullptr);
245 threadParam->threadId_ = threadCurId++;
246 randTag = disRandTag(genRandTag);
247 threadParam->tag_ = static_cast<ReadOrWriteTag>(randTag);
248 if (randTag == READ) {
249 randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
250 } else {
251 randKeyNo = disRandWriteKeyNo(genRandWriteKeyNo);
252 }
253 Entry entryCurrent;
254 GenerateRecord(randKeyNo, entryCurrent);
255 operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
256 }
257 }
258 /*
259 * @tc.name: ReadWrite 002
260 * @tc.desc: Verify that support long-term multi-threaded concurrent reading and writing.
261 * @tc.type: Long time
262 * @tc.require: SR000BUH3J
263 * @tc.author: luqianfu
264 */
265 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite002, TestSize.Level3)
266 {
267 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
268
269 vector<Entry> entriesBatch;
270 vector<Key> allKeys;
271 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
272
273 /**
274 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
275 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
276 */
277 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
278 EXPECT_TRUE(status == DBStatus::OK);
279 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
280 MST_LOG("value size %zu", valueResult.size());
281
282 /**
283 * @tc.steps: step2. create 6 threads, read and write data concurrently for 10s.
284 * @tc.expected: step2. read and write successfully and has no exception.
285 */
286 StartRandThread();
287
288 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
289 }
290
291 /*
292 * @tc.name: ReadWrite 004
293 * @tc.desc: Verify that support multi-threaded concurrent writing.
294 * @tc.type: Long time
295 * @tc.require: SR000BUH3J
296 * @tc.author: luqianfu
297 */
298 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite004, TestSize.Level3)
299 {
300 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
301
302 vector<Entry> entriesBatch;
303 vector<Key> allKeys;
304 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
305
306 /**
307 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
308 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
309 */
310 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
311 EXPECT_TRUE(status == DBStatus::OK);
312 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
313 MST_LOG("value size %zu", valueResult.size());
314 /**
315 * @tc.steps: step2. create 6 threads to write and create 4 threads to delete data concurrently for 10s.
316 * @tc.expected: step2. write and delete successfully and has no exception.
317 */
318 StartRandThread();
319
320 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
321 }
322 #endif
323 }
324