1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifndef OMIT_MULTI_VER
16 #include <gtest/gtest.h>
17 #include <ctime>
18 #include <cmath>
19 #include <random>
20 #include <chrono>
21 #include <thread>
22 #include <cstdio>
23 #include <string>
24
25 #include "distributeddb_data_generator.h"
26 #include "distributed_test_tools.h"
27 #include "kv_store_delegate.h"
28 #include "kv_store_delegate_manager.h"
29
30 using namespace std;
31 using namespace testing;
32 #if defined TESTCASES_USING_GTEST_EXT
33 using namespace testing::ext;
34 #endif
35 using namespace DistributedDB;
36 using namespace DistributedDBDataGenerator;
37
38 namespace DistributeddbKvConcurrencyCrud {
39 const bool IS_LOCAL = false;
40 #ifdef KV_CON_CRUD
41 const unsigned int READ_RECORDS_NUM_START = 1;
42 const unsigned int READ_RECORDS_NUM_END = 128;
43 const unsigned int WRITE_RECORDS_NUM_START = 1;
44 const unsigned int WRITE_RECORDS_NUM_END = 9999;
45 const unsigned long LONG_TIME_TEST_SECONDS = 10;
46 #endif
47 const unsigned long LONG_TIME_INTERVAL_MILLSECONDS = 5;
48
49 KvStoreDelegate *g_kvStoreConcurDelegate = nullptr; // the delegate used in this suit.
50 KvStoreDelegateManager *g_concurManager = nullptr;
51
52 enum ReadOrWriteTag {
53 READ = 0,
54 WRITE = 1,
55 DELETE = 2
56 };
57
58 struct ConcurParam {
59 unsigned int threadId_;
60 ReadOrWriteTag tag_;
61 DistributedDB::Entry* entryPtr_;
62 };
63
64 // the thread runnnig methods
ConcurOperThread(ConcurParam * args)65 void ConcurOperThread(ConcurParam* args)
66 {
67 auto paramsPtr = static_cast<ConcurParam *>(args);
68 DBStatus status;
69 Value valueResult;
70 string strKey, strValue;
71
72 if (paramsPtr->tag_ == READ) {
73 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
74 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
75 if (valueResult.size() != 0) {
76 EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
77 }
78 } else if (paramsPtr->tag_ == WRITE) {
79 status = DistributedTestTools::Put(*g_kvStoreConcurDelegate,
80 paramsPtr->entryPtr_->key, paramsPtr->entryPtr_->value);
81 ASSERT_EQ(status, DBStatus::OK);
82 Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
83 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
84
85 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
86 if (valueResult.size() != 0) {
87 EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
88 }
89 } else {
90 valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
91 if (valueResult.size() != 0) {
92 status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
93 ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
94
95 Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
96 Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
97 } else {
98 status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
99 ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
100 }
101 }
102 }
103
KvCalculateTime(ConcurParam * & kvThreadParam,const Entry entryCurrent,const SysTime & start,SysDurTime dur)104 double KvCalculateTime(ConcurParam *&kvThreadParam, const Entry entryCurrent, const SysTime &start, SysDurTime dur)
105 {
106 SysTime end;
107 kvThreadParam->entryPtr_->key = entryCurrent.key;
108 kvThreadParam->entryPtr_->value = entryCurrent.value;
109 std::thread thread = std::thread(ConcurOperThread, reinterpret_cast<ConcurParam *>(kvThreadParam));
110 thread.join();
111
112 std::this_thread::sleep_for(std::chrono::duration<float, std::milli>(LONG_TIME_INTERVAL_MILLSECONDS));
113 end = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
114 dur = std::chrono::duration_cast<chrono::microseconds>(end - start);
115 double operInterval = static_cast<double>(dur.count()) * chrono::microseconds::period::num
116 / chrono::microseconds::period::den;
117 delete kvThreadParam->entryPtr_;
118 kvThreadParam->entryPtr_ = nullptr;
119 delete kvThreadParam;
120 kvThreadParam = nullptr;
121 return operInterval;
122 }
123
124 class DistributeddbKvConcurrencyCrudTest : public testing::Test {
125 public:
126 static void SetUpTestCase(void);
127 static void TearDownTestCase(void);
128 void SetUp();
129 void TearDown();
130 private:
131 };
132
SetUpTestCase(void)133 void DistributeddbKvConcurrencyCrudTest::SetUpTestCase(void)
134 {
135 }
136
TearDownTestCase(void)137 void DistributeddbKvConcurrencyCrudTest::TearDownTestCase(void)
138 {
139 }
140
SetUp(void)141 void DistributeddbKvConcurrencyCrudTest::SetUp(void)
142 {
143 MST_LOG("SetUpTest before all cases local[%d].", IS_LOCAL);
144 RemoveDir(DIRECTOR);
145
146 UnitTest *test = UnitTest::GetInstance();
147 ASSERT_NE(test, nullptr);
148 const TestInfo *testinfo = test->current_test_info();
149 ASSERT_NE(testinfo, nullptr);
150 string testCaseName = string(testinfo->name());
151 MST_LOG("[SetUp] test case %s is start to run", testCaseName.c_str());
152
153 g_kvStoreConcurDelegate = DistributedTestTools::GetDelegateSuccess(g_concurManager,
154 g_kvdbParameter1, g_kvOption);
155 ASSERT_TRUE(g_concurManager != nullptr && g_kvStoreConcurDelegate != nullptr);
156 }
157
TearDown(void)158 void DistributeddbKvConcurrencyCrudTest::TearDown(void)
159 {
160 MST_LOG("TearDownTest after all cases.");
161 EXPECT_TRUE(g_concurManager->CloseKvStore(g_kvStoreConcurDelegate) == OK);
162 g_kvStoreConcurDelegate = nullptr;
163 DBStatus status = g_concurManager->DeleteKvStore(STORE_ID_1);
164 EXPECT_TRUE(status == DistributedDB::DBStatus::OK) << "fail to delete exist kvdb";
165 delete g_concurManager;
166 g_concurManager = nullptr;
167 RemoveDir(DIRECTOR);
168 }
169
170 /*
171 * @tc.name: Read 002
172 * @tc.desc: Verify that support long-term multi-threaded concurrent reading.
173 * @tc.type: Long time
174 * @tc.require: SR000BUH3J
175 * @tc.author: luqianfu
176 */
177 #ifdef KV_CON_CRUD
178 HWTEST_F(DistributeddbKvConcurrencyCrudTest, Read002, TestSize.Level3)
179 {
180 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
181
182 vector<Entry> entriesBatch;
183 vector<Key> allKeys;
184 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
185
186 /**
187 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
188 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
189 */
190 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
191 EXPECT_TRUE(status == DBStatus::OK);
192 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
193
194 /**
195 * @tc.steps: step2. create 6 threads and read data from db concurrently for 10s.
196 * @tc.expected: step2. read successfully and has no exception.
197 */
198 std::random_device randDevReadKeyNo;
199 std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
200 std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
201 unsigned long randKeyNo;
202 unsigned int threadCurId = 0;
203
204 SysTime start;
205 SysDurTime dur;
206 double operInterval = 0.0;
207
208 start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
209 Entry entryCurrent;
210 while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
211 auto threadParam = new (std::nothrow) ConcurParam;
212 ASSERT_NE(threadParam, nullptr);
213 threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
214 ASSERT_NE(threadParam->entryPtr_, nullptr);
215 threadParam->threadId_ = threadCurId++;
216 threadParam->tag_ = READ;
217 randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
218
219 GenerateRecord(randKeyNo, entryCurrent);
220 operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
221 }
222
223 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
224 }
225
StartRandThread()226 void StartRandThread()
227 {
228 std::vector<std::thread> threads;
229 std::random_device randDevReadKeyNo, randDevWriteKeyNo, randDevTag;
230 std::mt19937 genRandReadKeyNo(randDevReadKeyNo()), genRandWriteKeyNo(randDevWriteKeyNo()), genRandTag(randDevTag());
231 std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
232 std::uniform_int_distribution<unsigned long> disRandWriteKeyNo(WRITE_RECORDS_NUM_START, WRITE_RECORDS_NUM_END);
233 std::uniform_int_distribution<int> disRandTag(READ, WRITE);
234 unsigned long randKeyNo;
235 unsigned int threadCurId = 0;
236 int randTag;
237 SysTime start;
238 SysDurTime dur;
239 double operInterval = 0.0;
240 start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
241 while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
242 ConcurParam *threadParam = new (std::nothrow) ConcurParam;
243 ASSERT_NE(threadParam, nullptr);
244 threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
245 ASSERT_NE(threadParam->entryPtr_, nullptr);
246 threadParam->threadId_ = threadCurId++;
247 randTag = disRandTag(genRandTag);
248 threadParam->tag_ = static_cast<ReadOrWriteTag>(randTag);
249 if (randTag == READ) {
250 randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
251 } else {
252 randKeyNo = disRandWriteKeyNo(genRandWriteKeyNo);
253 }
254 Entry entryCurrent;
255 GenerateRecord(randKeyNo, entryCurrent);
256 operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
257 }
258 }
259 /*
260 * @tc.name: ReadWrite 002
261 * @tc.desc: Verify that support long-term multi-threaded concurrent reading and writing.
262 * @tc.type: Long time
263 * @tc.require: SR000BUH3J
264 * @tc.author: luqianfu
265 */
266 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite002, TestSize.Level3)
267 {
268 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
269
270 vector<Entry> entriesBatch;
271 vector<Key> allKeys;
272 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
273
274 /**
275 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
276 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
277 */
278 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
279 EXPECT_TRUE(status == DBStatus::OK);
280 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
281 MST_LOG("value size %zu", valueResult.size());
282
283 /**
284 * @tc.steps: step2. create 6 threads, read and write data concurrently for 10s.
285 * @tc.expected: step2. read and write successfully and has no exception.
286 */
287 StartRandThread();
288
289 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
290 }
291
292 /*
293 * @tc.name: ReadWrite 004
294 * @tc.desc: Verify that support multi-threaded concurrent writing.
295 * @tc.type: Long time
296 * @tc.require: SR000BUH3J
297 * @tc.author: luqianfu
298 */
299 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite004, TestSize.Level3)
300 {
301 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
302
303 vector<Entry> entriesBatch;
304 vector<Key> allKeys;
305 GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
306
307 /**
308 * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
309 * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
310 */
311 DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
312 EXPECT_TRUE(status == DBStatus::OK);
313 vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
314 MST_LOG("value size %zu", valueResult.size());
315 /**
316 * @tc.steps: step2. create 6 threads to write and create 4 threads to delete data concurrently for 10s.
317 * @tc.expected: step2. write and delete successfully and has no exception.
318 */
319 StartRandThread();
320
321 DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
322 }
323 #endif
324 }
325 #endif // OMIT_MULTI_VER
326