• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <gtest/gtest.h>
16 #include <ctime>
17 #include <cmath>
18 #include <random>
19 #include <chrono>
20 #include <thread>
21 #include <cstdio>
22 #include <string>
23 
24 #include "distributeddb_data_generator.h"
25 #include "distributed_test_tools.h"
26 #include "kv_store_delegate.h"
27 #include "kv_store_delegate_manager.h"
28 
29 using namespace std;
30 using namespace testing;
31 #if defined TESTCASES_USING_GTEST_EXT
32 using namespace testing::ext;
33 #endif
34 using namespace DistributedDB;
35 using namespace DistributedDBDataGenerator;
36 
37 namespace DistributeddbKvConcurrencyCrud {
38 const bool IS_LOCAL = false;
39 #ifdef KV_CON_CRUD
40 const unsigned int READ_RECORDS_NUM_START = 1;
41 const unsigned int READ_RECORDS_NUM_END = 128;
42 const unsigned int WRITE_RECORDS_NUM_START = 1;
43 const unsigned int WRITE_RECORDS_NUM_END = 9999;
44 const unsigned long LONG_TIME_TEST_SECONDS = 10;
45 #endif
46 const unsigned long LONG_TIME_INTERVAL_MILLSECONDS = 5;
47 
48 KvStoreDelegate *g_kvStoreConcurDelegate = nullptr; // the delegate used in this suit.
49 KvStoreDelegateManager *g_concurManager = nullptr;
50 
51 enum ReadOrWriteTag {
52     READ = 0,
53     WRITE = 1,
54     DELETE = 2
55 };
56 
57 struct ConcurParam {
58     unsigned int threadId_;
59     ReadOrWriteTag tag_;
60     DistributedDB::Entry* entryPtr_;
61 };
62 
63 // the thread runnnig methods
ConcurOperThread(ConcurParam * args)64 void ConcurOperThread(ConcurParam* args)
65 {
66     auto paramsPtr = static_cast<ConcurParam *>(args);
67     DBStatus status;
68     Value valueResult;
69     string strKey, strValue;
70 
71     if (paramsPtr->tag_ == READ) {
72         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
73         Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
74         if (valueResult.size() != 0) {
75             EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
76         }
77     } else if (paramsPtr->tag_ == WRITE) {
78         status = DistributedTestTools::Put(*g_kvStoreConcurDelegate,
79             paramsPtr->entryPtr_->key, paramsPtr->entryPtr_->value);
80         ASSERT_EQ(status, DBStatus::OK);
81         Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
82         Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
83 
84         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
85         if (valueResult.size() != 0) {
86             EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
87         }
88     } else {
89         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
90         if (valueResult.size() != 0) {
91             status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
92             ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
93 
94             Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
95             Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
96         } else {
97             status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
98             ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
99         }
100     }
101 }
102 
KvCalculateTime(ConcurParam * & kvThreadParam,const Entry entryCurrent,const SysTime & start,SysDurTime dur)103 double KvCalculateTime(ConcurParam *&kvThreadParam, const Entry entryCurrent, const SysTime &start, SysDurTime dur)
104 {
105     SysTime end;
106     kvThreadParam->entryPtr_->key = entryCurrent.key;
107     kvThreadParam->entryPtr_->value = entryCurrent.value;
108     std::thread thread = std::thread(ConcurOperThread, reinterpret_cast<ConcurParam *>(kvThreadParam));
109     thread.join();
110 
111     std::this_thread::sleep_for(std::chrono::duration<float, std::milli>(LONG_TIME_INTERVAL_MILLSECONDS));
112     end = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
113     dur = std::chrono::duration_cast<chrono::microseconds>(end - start);
114     double operInterval = static_cast<double>(dur.count()) * chrono::microseconds::period::num
115         / chrono::microseconds::period::den;
116     delete kvThreadParam->entryPtr_;
117     kvThreadParam->entryPtr_ = nullptr;
118     delete kvThreadParam;
119     kvThreadParam = nullptr;
120     return operInterval;
121 }
122 
123 class DistributeddbKvConcurrencyCrudTest : public testing::Test {
124 public:
125     static void SetUpTestCase(void);
126     static void TearDownTestCase(void);
127     void SetUp();
128     void TearDown();
129 private:
130 };
131 
SetUpTestCase(void)132 void DistributeddbKvConcurrencyCrudTest::SetUpTestCase(void)
133 {
134 }
135 
TearDownTestCase(void)136 void DistributeddbKvConcurrencyCrudTest::TearDownTestCase(void)
137 {
138 }
139 
SetUp(void)140 void DistributeddbKvConcurrencyCrudTest::SetUp(void)
141 {
142     MST_LOG("SetUpTest before all cases local[%d].", IS_LOCAL);
143     RemoveDir(DIRECTOR);
144 
145     UnitTest *test = UnitTest::GetInstance();
146     ASSERT_NE(test, nullptr);
147     const TestInfo *testinfo = test->current_test_info();
148     ASSERT_NE(testinfo, nullptr);
149     string testCaseName = string(testinfo->name());
150     MST_LOG("[SetUp] test case %s is start to run", testCaseName.c_str());
151 
152     g_kvStoreConcurDelegate = DistributedTestTools::GetDelegateSuccess(g_concurManager,
153         g_kvdbParameter1, g_kvOption);
154     ASSERT_TRUE(g_concurManager != nullptr && g_kvStoreConcurDelegate != nullptr);
155 }
156 
TearDown(void)157 void DistributeddbKvConcurrencyCrudTest::TearDown(void)
158 {
159     MST_LOG("TearDownTest after all cases.");
160     EXPECT_TRUE(g_concurManager->CloseKvStore(g_kvStoreConcurDelegate) == OK);
161     g_kvStoreConcurDelegate = nullptr;
162     DBStatus status = g_concurManager->DeleteKvStore(STORE_ID_1);
163     EXPECT_TRUE(status == DistributedDB::DBStatus::OK) << "fail to delete exist kvdb";
164     delete g_concurManager;
165     g_concurManager = nullptr;
166     RemoveDir(DIRECTOR);
167 }
168 
169 /*
170  * @tc.name: Read 002
171  * @tc.desc: Verify that support long-term multi-threaded concurrent reading.
172  * @tc.type: Long time
173  * @tc.require: SR000BUH3J
174  * @tc.author: luqianfu
175  */
176 #ifdef KV_CON_CRUD
177 HWTEST_F(DistributeddbKvConcurrencyCrudTest, Read002, TestSize.Level3)
178 {
179     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
180 
181     vector<Entry> entriesBatch;
182     vector<Key> allKeys;
183     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
184 
185     /**
186      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
187      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
188      */
189     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
190     EXPECT_TRUE(status == DBStatus::OK);
191     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
192 
193     /**
194      * @tc.steps: step2. create 6 threads and read data from db concurrently for 10s.
195      * @tc.expected: step2. read successfully and has no exception.
196      */
197     std::random_device randDevReadKeyNo;
198     std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
199     std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
200     unsigned long randKeyNo;
201     unsigned int threadCurId = 0;
202 
203     SysTime start;
204     SysDurTime dur;
205     double operInterval = 0.0;
206 
207     start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
208     Entry entryCurrent;
209     while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
210         auto threadParam = new (std::nothrow) ConcurParam;
211         ASSERT_NE(threadParam, nullptr);
212         threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
213         ASSERT_NE(threadParam->entryPtr_, nullptr);
214         threadParam->threadId_ = threadCurId++;
215         threadParam->tag_ = READ;
216         randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
217 
218         GenerateRecord(randKeyNo, entryCurrent);
219         operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
220     }
221 
222     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
223 }
224 
StartRandThread()225 void StartRandThread()
226 {
227     std::vector<std::thread> threads;
228     std::random_device randDevReadKeyNo, randDevWriteKeyNo, randDevTag;
229     std::mt19937 genRandReadKeyNo(randDevReadKeyNo()), genRandWriteKeyNo(randDevWriteKeyNo()), genRandTag(randDevTag());
230     std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
231     std::uniform_int_distribution<unsigned long> disRandWriteKeyNo(WRITE_RECORDS_NUM_START, WRITE_RECORDS_NUM_END);
232     std::uniform_int_distribution<int> disRandTag(READ, WRITE);
233     unsigned long randKeyNo;
234     unsigned int threadCurId = 0;
235     int randTag;
236     SysTime start;
237     SysDurTime dur;
238     double operInterval = 0.0;
239     start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
240     while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
241         ConcurParam *threadParam = new (std::nothrow) ConcurParam;
242         ASSERT_NE(threadParam, nullptr);
243         threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
244         ASSERT_NE(threadParam->entryPtr_, nullptr);
245         threadParam->threadId_ = threadCurId++;
246         randTag = disRandTag(genRandTag);
247         threadParam->tag_ = static_cast<ReadOrWriteTag>(randTag);
248         if (randTag == READ) {
249             randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
250         } else {
251             randKeyNo = disRandWriteKeyNo(genRandWriteKeyNo);
252         }
253         Entry entryCurrent;
254         GenerateRecord(randKeyNo, entryCurrent);
255         operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
256     }
257 }
258 /*
259  * @tc.name: ReadWrite 002
260  * @tc.desc: Verify that support long-term multi-threaded concurrent reading and writing.
261  * @tc.type: Long time
262  * @tc.require: SR000BUH3J
263  * @tc.author: luqianfu
264  */
265 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite002, TestSize.Level3)
266 {
267     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
268 
269     vector<Entry> entriesBatch;
270     vector<Key> allKeys;
271     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
272 
273     /**
274      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
275      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
276      */
277     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
278     EXPECT_TRUE(status == DBStatus::OK);
279     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
280     MST_LOG("value size %zu", valueResult.size());
281 
282     /**
283      * @tc.steps: step2. create 6 threads, read and write data concurrently for 10s.
284      * @tc.expected: step2. read and write successfully and has no exception.
285      */
286     StartRandThread();
287 
288     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
289 }
290 
291 /*
292  * @tc.name: ReadWrite 004
293  * @tc.desc: Verify that support multi-threaded concurrent writing.
294  * @tc.type: Long time
295  * @tc.require: SR000BUH3J
296  * @tc.author: luqianfu
297  */
298 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite004, TestSize.Level3)
299 {
300     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
301 
302     vector<Entry> entriesBatch;
303     vector<Key> allKeys;
304     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
305 
306     /**
307      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
308      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
309      */
310     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
311     EXPECT_TRUE(status == DBStatus::OK);
312     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
313     MST_LOG("value size %zu", valueResult.size());
314     /**
315      * @tc.steps: step2. create 6 threads to write and create 4 threads to delete data concurrently for 10s.
316      * @tc.expected: step2. write and delete successfully and has no exception.
317      */
318     StartRandThread();
319 
320     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
321 }
322 #endif
323 }
324