1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifndef OMIT_MULTI_VER
16 #include <gtest/gtest.h>
17 #include <ctime>
18 #include <cmath>
19 #include <random>
20 #include <chrono>
21 #include <thread>
22 #include <cstdio>
23 #include <string>
24
25 #include "distributeddb_data_generator.h"
26 #include "distributed_test_tools.h"
27 #include "kv_store_delegate.h"
28 #include "kv_store_delegate_manager.h"
29
30 using namespace std;
31 using namespace testing;
32 #if defined TESTCASES_USING_GTEST_EXT
33 using namespace testing::ext;
34 #endif
35 using namespace DistributedDB;
36 using namespace DistributedDBDataGenerator;
37
38 namespace DistributeddbKvConcurrencyCrud {
39 const bool IS_LOCAL = false;
40 #ifdef KV_CON_CRUD
41 const unsigned int READ_RECORDS_NUM_START = 1;
42 const unsigned int READ_RECORDS_NUM_END = 128;
43 const unsigned int WRITE_RECORDS_NUM_START = 1;
44 const unsigned int WRITE_RECORDS_NUM_END = 9999;
45 const unsigned long LONG_TIME_TEST_SECONDS = 10;
46 #endif
47 const unsigned long LONG_TIME_INTERVAL_MILLSECONDS = 5;
48
49 KvStoreDelegate *g_kvStoreConcurDelegate = nullptr; // the delegate used in this suit.
50 KvStoreDelegateManager *g_concurManager = nullptr;
51
52 enum ReadOrWriteTag {
53 READ = 0,
54 WRITE = 1,
55 DELETE = 2
56 };
57
58 struct ConcurParam {
59 unsigned int threadId_;
60 ReadOrWriteTag tag_;
61 DistributedDB::Entry* entryPtr_;
62 };
63
64 // the thread runnnig methods
ConcurOperThread(ConcurParam * args)65 void ConcurOperThread(ConcurParam* args)
66 {
67 auto paramsPtr = static_cast<ConcurParam *>(args);
68 DBStatus status;
69 Value valueResult;
70 string strKey = "";
71 string strValue = "";
72
73 if (paramsPtr->tag_ == READ) {
74 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
75 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
76 if (valueResult.size() != 0) {
77 EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
78 }
79 } else if (paramsPtr->tag_ == WRITE) {
80 status = DistributedTestTools::Put(*g_kvStoreConcurDelegate,
81 paramsPtr->entryPtr_->key, paramsPtr->entryPtr_->value);
82 ASSERT_EQ(status, DBStatus::OK);
83 Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
84 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
85
86 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
87 if (valueResult.size() != 0) {
88 EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
89 }
90 } else {
91 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
92 if (valueResult.size() != 0) {
93 status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
94 ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
95
96 Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
97 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
98 } else {
99 status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
100 ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
101 }
102 }
103 }
104
KvCalculateTime(ConcurParam * & kvThreadParam,const Entry entryCurrent,const SysTime & start,SysDurTime dur)105 double KvCalculateTime(ConcurParam *&kvThreadParam, const Entry entryCurrent, const SysTime &start, SysDurTime dur)
106 {
107 SysTime end;
108 kvThreadParam->entryPtr_->key = entryCurrent.key;
109 kvThreadParam->entryPtr_->value = entryCurrent.value;
110 std::thread thread = std::thread(ConcurOperThread, reinterpret_cast<ConcurParam *>(kvThreadParam));
111 thread.join();
112
113 std::this_thread::sleep_for(std::chrono::duration<float, std::milli>(LONG_TIME_INTERVAL_MILLSECONDS));
114 end = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
115 dur = std::chrono::duration_cast<chrono::microseconds>(end - start);
116 double operInterval = static_cast<double>(dur.count()) * chrono::microseconds::period::num
117 / chrono::microseconds::period::den;
118 delete kvThreadParam->entryPtr_;
119 kvThreadParam->entryPtr_ = nullptr;
120 delete kvThreadParam;
121 kvThreadParam = nullptr;
122 return operInterval;
123 }
124
125 class DistributeddbKvConcurrencyCrudTest : public testing::Test {
126 public:
127 static void SetUpTestCase(void);
128 static void TearDownTestCase(void);
129 void SetUp();
130 void TearDown();
131 private:
132 };
133
SetUpTestCase(void)134 void DistributeddbKvConcurrencyCrudTest::SetUpTestCase(void)
135 {
136 }
137
TearDownTestCase(void)138 void DistributeddbKvConcurrencyCrudTest::TearDownTestCase(void)
139 {
140 }
141
SetUp(void)142 void DistributeddbKvConcurrencyCrudTest::SetUp(void)
143 {
144 MST_LOG("SetUpTest before all cases local[%d].", IS_LOCAL);
145 RemoveDir(DIRECTOR);
146
147 UnitTest *test = UnitTest::GetInstance();
148 ASSERT_NE(test, nullptr);
149 const TestInfo *testinfo = test->current_test_info();
150 ASSERT_NE(testinfo, nullptr);
151 string testCaseName = string(testinfo->name());
152 MST_LOG("[SetUp] test case %s is start to run", testCaseName.c_str());
153
154 g_kvStoreConcurDelegate = DistributedTestTools::GetDelegateSuccess(g_concurManager,
155 g_kvdbParameter1, g_kvOption);
156 ASSERT_TRUE(g_concurManager != nullptr && g_kvStoreConcurDelegate != nullptr);
157 }
158
TearDown(void)159 void DistributeddbKvConcurrencyCrudTest::TearDown(void)
160 {
161 MST_LOG("TearDownTest after all cases.");
162 EXPECT_TRUE(g_concurManager->CloseKvStore(g_kvStoreConcurDelegate) == OK);
163 g_kvStoreConcurDelegate = nullptr;
164 DBStatus status = g_concurManager->DeleteKvStore(STORE_ID_1);
165 EXPECT_TRUE(status == DistributedDB::DBStatus::OK) << "fail to delete exist kvdb";
166 delete g_concurManager;
167 g_concurManager = nullptr;
168 RemoveDir(DIRECTOR);
169 }
170
171 /*
172 * @tc.name: Read 002
173 * @tc.desc: Verify that support long-term multi-threaded concurrent reading.
174 * @tc.type: Long time
175 * @tc.require: SR000BUH3J
176 * @tc.author: luqianfu
177 */
178 #ifdef KV_CON_CRUD
179 HWTEST_F(DistributeddbKvConcurrencyCrudTest, Read002, TestSize.Level3)
180 {
181 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
182
183 vector<Entry> entriesBatch;
184 vector<Key> allKeys;
185 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
186
187 /**
188 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
189 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
190 */
191 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
192 EXPECT_TRUE(status == DBStatus::OK);
193 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
194
195 /**
196 * @tc.steps: step2. create 6 threads and read data from db concurrently for 10s.
197 * @tc.expected: step2. read successfully and has no exception.
198 */
199 std::random_device randDevReadKeyNo;
200 std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
201 std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
202 unsigned long randKeyNo;
203 unsigned int threadCurId = 0;
204
205 SysTime start;
206 SysDurTime dur;
207 double operInterval = 0.0;
208
209 start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
210 Entry entryCurrent;
211 while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
212 auto threadParam = new (std::nothrow) ConcurParam;
213 ASSERT_NE(threadParam, nullptr);
214 threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
215 ASSERT_NE(threadParam->entryPtr_, nullptr);
216 threadParam->threadId_ = threadCurId++;
217 threadParam->tag_ = READ;
218 randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
219
220 GenerateRecord(randKeyNo, entryCurrent);
221 operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
222 }
223
224 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
225 }
226
StartRandThread()227 void StartRandThread()
228 {
229 std::vector<std::thread> threads;
230 std::random_device randDevReadKeyNo;
231 std::random_device randDevWriteKeyNo;
232 std::random_device randDevTag;
233 std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
234 std::mt19937 genRandWriteKeyNo(randDevWriteKeyNo());
235 std::mt19937 genRandTag(randDevTag());
236 std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
237 std::uniform_int_distribution<unsigned long> disRandWriteKeyNo(WRITE_RECORDS_NUM_START, WRITE_RECORDS_NUM_END);
238 std::uniform_int_distribution<int> disRandTag(READ, WRITE);
239 unsigned long randKeyNo;
240 unsigned int threadCurId = 0;
241 int randTag;
242 SysTime start;
243 SysDurTime dur;
244 double operInterval = 0.0;
245 start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
246 while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
247 ConcurParam *threadParam = new (std::nothrow) ConcurParam;
248 ASSERT_NE(threadParam, nullptr);
249 threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
250 ASSERT_NE(threadParam->entryPtr_, nullptr);
251 threadParam->threadId_ = threadCurId++;
252 randTag = disRandTag(genRandTag);
253 threadParam->tag_ = static_cast<ReadOrWriteTag>(randTag);
254 if (randTag == READ) {
255 randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
256 } else {
257 randKeyNo = disRandWriteKeyNo(genRandWriteKeyNo);
258 }
259 Entry entryCurrent;
260 GenerateRecord(randKeyNo, entryCurrent);
261 operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
262 }
263 }
264 /*
265 * @tc.name: ReadWrite 002
266 * @tc.desc: Verify that support long-term multi-threaded concurrent reading and writing.
267 * @tc.type: Long time
268 * @tc.require: SR000BUH3J
269 * @tc.author: luqianfu
270 */
271 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite002, TestSize.Level3)
272 {
273 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
274
275 vector<Entry> entriesBatch;
276 vector<Key> allKeys;
277 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
278
279 /**
280 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
281 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
282 */
283 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
284 EXPECT_TRUE(status == DBStatus::OK);
285 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
286 MST_LOG("value size %zu", valueResult.size());
287
288 /**
289 * @tc.steps: step2. create 6 threads, read and write data concurrently for 10s.
290 * @tc.expected: step2. read and write successfully and has no exception.
291 */
292 StartRandThread();
293
294 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
295 }
296
297 /*
298 * @tc.name: ReadWrite 004
299 * @tc.desc: Verify that support multi-threaded concurrent writing.
300 * @tc.type: Long time
301 * @tc.require: SR000BUH3J
302 * @tc.author: luqianfu
303 */
304 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite004, TestSize.Level3)
305 {
306 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
307
308 vector<Entry> entriesBatch;
309 vector<Key> allKeys;
310 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
311
312 /**
313 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
314 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
315 */
316 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
317 EXPECT_TRUE(status == DBStatus::OK);
318 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
319 MST_LOG("value size %zu", valueResult.size());
320 /**
321 * @tc.steps: step2. create 6 threads to write and create 4 threads to delete data concurrently for 10s.
322 * @tc.expected: step2. write and delete successfully and has no exception.
323 */
324 StartRandThread();
325
326 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
327 }
328 #endif
329 }
330 #endif // OMIT_MULTI_VER
331