• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef OMIT_MULTI_VER
16 #include <gtest/gtest.h>
17 #include <ctime>
18 #include <cmath>
19 #include <random>
20 #include <chrono>
21 #include <thread>
22 #include <cstdio>
23 #include <string>
24 
25 #include "distributeddb_data_generator.h"
26 #include "distributed_test_tools.h"
27 #include "kv_store_delegate.h"
28 #include "kv_store_delegate_manager.h"
29 
30 using namespace std;
31 using namespace testing;
32 #if defined TESTCASES_USING_GTEST_EXT
33 using namespace testing::ext;
34 #endif
35 using namespace DistributedDB;
36 using namespace DistributedDBDataGenerator;
37 
38 namespace DistributeddbKvConcurrencyCrud {
39 const bool IS_LOCAL = false;
40 #ifdef KV_CON_CRUD
41 const unsigned int READ_RECORDS_NUM_START = 1;
42 const unsigned int READ_RECORDS_NUM_END = 128;
43 const unsigned int WRITE_RECORDS_NUM_START = 1;
44 const unsigned int WRITE_RECORDS_NUM_END = 9999;
45 const unsigned long LONG_TIME_TEST_SECONDS = 10;
46 #endif
47 const unsigned long LONG_TIME_INTERVAL_MILLSECONDS = 5;
48 
49 KvStoreDelegate *g_kvStoreConcurDelegate = nullptr; // the delegate used in this suit.
50 KvStoreDelegateManager *g_concurManager = nullptr;
51 
52 enum ReadOrWriteTag {
53     READ = 0,
54     WRITE = 1,
55     DELETE = 2
56 };
57 
58 struct ConcurParam {
59     unsigned int threadId_;
60     ReadOrWriteTag tag_;
61     DistributedDB::Entry* entryPtr_;
62 };
63 
64 // the thread runnnig methods
ConcurOperThread(ConcurParam * args)65 void ConcurOperThread(ConcurParam* args)
66 {
67     auto paramsPtr = static_cast<ConcurParam *>(args);
68     DBStatus status;
69     Value valueResult;
70     string strKey = "";
71     string strValue = "";
72 
73     if (paramsPtr->tag_ == READ) {
74         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
75         Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
76         if (valueResult.size() != 0) {
77             EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
78         }
79     } else if (paramsPtr->tag_ == WRITE) {
80         status = DistributedTestTools::Put(*g_kvStoreConcurDelegate,
81             paramsPtr->entryPtr_->key, paramsPtr->entryPtr_->value);
82         ASSERT_EQ(status, DBStatus::OK);
83         Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
84         Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
85 
86         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
87         if (valueResult.size() != 0) {
88             EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
89         }
90     } else {
91         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
92         if (valueResult.size() != 0) {
93             status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
94             ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
95 
96             Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
97             Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
98         } else {
99             status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
100             ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
101         }
102     }
103 }
104 
KvCalculateTime(ConcurParam * & kvThreadParam,const Entry entryCurrent,const SysTime & start,SysDurTime dur)105 double KvCalculateTime(ConcurParam *&kvThreadParam, const Entry entryCurrent, const SysTime &start, SysDurTime dur)
106 {
107     SysTime end;
108     kvThreadParam->entryPtr_->key = entryCurrent.key;
109     kvThreadParam->entryPtr_->value = entryCurrent.value;
110     std::thread thread = std::thread(ConcurOperThread, reinterpret_cast<ConcurParam *>(kvThreadParam));
111     thread.join();
112 
113     std::this_thread::sleep_for(std::chrono::duration<float, std::milli>(LONG_TIME_INTERVAL_MILLSECONDS));
114     end = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
115     dur = std::chrono::duration_cast<chrono::microseconds>(end - start);
116     double operInterval = static_cast<double>(dur.count()) * chrono::microseconds::period::num
117         / chrono::microseconds::period::den;
118     delete kvThreadParam->entryPtr_;
119     kvThreadParam->entryPtr_ = nullptr;
120     delete kvThreadParam;
121     kvThreadParam = nullptr;
122     return operInterval;
123 }
124 
125 class DistributeddbKvConcurrencyCrudTest : public testing::Test {
126 public:
127     static void SetUpTestCase(void);
128     static void TearDownTestCase(void);
129     void SetUp();
130     void TearDown();
131 private:
132 };
133 
SetUpTestCase(void)134 void DistributeddbKvConcurrencyCrudTest::SetUpTestCase(void)
135 {
136 }
137 
TearDownTestCase(void)138 void DistributeddbKvConcurrencyCrudTest::TearDownTestCase(void)
139 {
140 }
141 
SetUp(void)142 void DistributeddbKvConcurrencyCrudTest::SetUp(void)
143 {
144     MST_LOG("SetUpTest before all cases local[%d].", IS_LOCAL);
145     RemoveDir(DIRECTOR);
146 
147     UnitTest *test = UnitTest::GetInstance();
148     ASSERT_NE(test, nullptr);
149     const TestInfo *testinfo = test->current_test_info();
150     ASSERT_NE(testinfo, nullptr);
151     string testCaseName = string(testinfo->name());
152     MST_LOG("[SetUp] test case %s is start to run", testCaseName.c_str());
153 
154     g_kvStoreConcurDelegate = DistributedTestTools::GetDelegateSuccess(g_concurManager,
155         g_kvdbParameter1, g_kvOption);
156     ASSERT_TRUE(g_concurManager != nullptr && g_kvStoreConcurDelegate != nullptr);
157 }
158 
TearDown(void)159 void DistributeddbKvConcurrencyCrudTest::TearDown(void)
160 {
161     MST_LOG("TearDownTest after all cases.");
162     EXPECT_TRUE(g_concurManager->CloseKvStore(g_kvStoreConcurDelegate) == OK);
163     g_kvStoreConcurDelegate = nullptr;
164     DBStatus status = g_concurManager->DeleteKvStore(STORE_ID_1);
165     EXPECT_TRUE(status == DistributedDB::DBStatus::OK) << "fail to delete exist kvdb";
166     delete g_concurManager;
167     g_concurManager = nullptr;
168     RemoveDir(DIRECTOR);
169 }
170 
171 /*
172  * @tc.name: Read 002
173  * @tc.desc: Verify that support long-term multi-threaded concurrent reading.
174  * @tc.type: Long time
175  * @tc.require: SR000BUH3J
176  * @tc.author: luqianfu
177  */
178 #ifdef KV_CON_CRUD
179 HWTEST_F(DistributeddbKvConcurrencyCrudTest, Read002, TestSize.Level3)
180 {
181     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
182 
183     vector<Entry> entriesBatch;
184     vector<Key> allKeys;
185     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
186 
187     /**
188      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
189      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
190      */
191     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
192     EXPECT_TRUE(status == DBStatus::OK);
193     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
194 
195     /**
196      * @tc.steps: step2. create 6 threads and read data from db concurrently for 10s.
197      * @tc.expected: step2. read successfully and has no exception.
198      */
199     std::random_device randDevReadKeyNo;
200     std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
201     std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
202     unsigned long randKeyNo;
203     unsigned int threadCurId = 0;
204 
205     SysTime start;
206     SysDurTime dur;
207     double operInterval = 0.0;
208 
209     start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
210     Entry entryCurrent;
211     while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
212         auto threadParam = new (std::nothrow) ConcurParam;
213         ASSERT_NE(threadParam, nullptr);
214         threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
215         ASSERT_NE(threadParam->entryPtr_, nullptr);
216         threadParam->threadId_ = threadCurId++;
217         threadParam->tag_ = READ;
218         randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
219 
220         GenerateRecord(randKeyNo, entryCurrent);
221         operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
222     }
223 
224     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
225 }
226 
StartRandThread()227 void StartRandThread()
228 {
229     std::vector<std::thread> threads;
230     std::random_device randDevReadKeyNo;
231     std::random_device randDevWriteKeyNo;
232     std::random_device randDevTag;
233     std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
234     std::mt19937 genRandWriteKeyNo(randDevWriteKeyNo());
235     std::mt19937 genRandTag(randDevTag());
236     std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
237     std::uniform_int_distribution<unsigned long> disRandWriteKeyNo(WRITE_RECORDS_NUM_START, WRITE_RECORDS_NUM_END);
238     std::uniform_int_distribution<int> disRandTag(READ, WRITE);
239     unsigned long randKeyNo;
240     unsigned int threadCurId = 0;
241     int randTag;
242     SysTime start;
243     SysDurTime dur;
244     double operInterval = 0.0;
245     start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
246     while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
247         ConcurParam *threadParam = new (std::nothrow) ConcurParam;
248         ASSERT_NE(threadParam, nullptr);
249         threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
250         ASSERT_NE(threadParam->entryPtr_, nullptr);
251         threadParam->threadId_ = threadCurId++;
252         randTag = disRandTag(genRandTag);
253         threadParam->tag_ = static_cast<ReadOrWriteTag>(randTag);
254         if (randTag == READ) {
255             randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
256         } else {
257             randKeyNo = disRandWriteKeyNo(genRandWriteKeyNo);
258         }
259         Entry entryCurrent;
260         GenerateRecord(randKeyNo, entryCurrent);
261         operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
262     }
263 }
264 /*
265  * @tc.name: ReadWrite 002
266  * @tc.desc: Verify that support long-term multi-threaded concurrent reading and writing.
267  * @tc.type: Long time
268  * @tc.require: SR000BUH3J
269  * @tc.author: luqianfu
270  */
271 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite002, TestSize.Level3)
272 {
273     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
274 
275     vector<Entry> entriesBatch;
276     vector<Key> allKeys;
277     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
278 
279     /**
280      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
281      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
282      */
283     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
284     EXPECT_TRUE(status == DBStatus::OK);
285     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
286     MST_LOG("value size %zu", valueResult.size());
287 
288     /**
289      * @tc.steps: step2. create 6 threads, read and write data concurrently for 10s.
290      * @tc.expected: step2. read and write successfully and has no exception.
291      */
292     StartRandThread();
293 
294     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
295 }
296 
297 /*
298  * @tc.name: ReadWrite 004
299  * @tc.desc: Verify that support multi-threaded concurrent writing.
300  * @tc.type: Long time
301  * @tc.require: SR000BUH3J
302  * @tc.author: luqianfu
303  */
304 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite004, TestSize.Level3)
305 {
306     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
307 
308     vector<Entry> entriesBatch;
309     vector<Key> allKeys;
310     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
311 
312     /**
313      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
314      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
315      */
316     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
317     EXPECT_TRUE(status == DBStatus::OK);
318     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
319     MST_LOG("value size %zu", valueResult.size());
320     /**
321      * @tc.steps: step2. create 6 threads to write and create 4 threads to delete data concurrently for 10s.
322      * @tc.expected: step2. write and delete successfully and has no exception.
323      */
324     StartRandThread();
325 
326     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
327 }
328 #endif
329 }
330 #endif // OMIT_MULTI_VER
331