• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef OMIT_MULTI_VER
16 #include <gtest/gtest.h>
17 #include <ctime>
18 #include <cmath>
19 #include <random>
20 #include <chrono>
21 #include <thread>
22 #include <cstdio>
23 #include <string>
24 
25 #include "distributeddb_data_generator.h"
26 #include "distributed_test_tools.h"
27 #include "kv_store_delegate.h"
28 #include "kv_store_delegate_manager.h"
29 
30 using namespace std;
31 using namespace testing;
32 #if defined TESTCASES_USING_GTEST_EXT
33 using namespace testing::ext;
34 #endif
35 using namespace DistributedDB;
36 using namespace DistributedDBDataGenerator;
37 
38 namespace DistributeddbKvConcurrencyCrud {
39 const bool IS_LOCAL = false;
40 #ifdef KV_CON_CRUD
41 const unsigned int READ_RECORDS_NUM_START = 1;
42 const unsigned int READ_RECORDS_NUM_END = 128;
43 const unsigned int WRITE_RECORDS_NUM_START = 1;
44 const unsigned int WRITE_RECORDS_NUM_END = 9999;
45 const unsigned long LONG_TIME_TEST_SECONDS = 10;
46 #endif
47 const unsigned long LONG_TIME_INTERVAL_MILLSECONDS = 5;
48 
49 KvStoreDelegate *g_kvStoreConcurDelegate = nullptr; // the delegate used in this suit.
50 KvStoreDelegateManager *g_concurManager = nullptr;
51 
52 enum ReadOrWriteTag {
53     READ = 0,
54     WRITE = 1,
55     DELETE = 2
56 };
57 
58 struct ConcurParam {
59     unsigned int threadId_;
60     ReadOrWriteTag tag_;
61     DistributedDB::Entry* entryPtr_;
62 };
63 
64 // the thread runnnig methods
ConcurOperThread(ConcurParam * args)65 void ConcurOperThread(ConcurParam* args)
66 {
67     auto paramsPtr = static_cast<ConcurParam *>(args);
68     DBStatus status;
69     Value valueResult;
70     string strKey, strValue;
71 
72     if (paramsPtr->tag_ == READ) {
73         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
74         Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
75         if (valueResult.size() != 0) {
76             EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
77         }
78     } else if (paramsPtr->tag_ == WRITE) {
79         status = DistributedTestTools::Put(*g_kvStoreConcurDelegate,
80             paramsPtr->entryPtr_->key, paramsPtr->entryPtr_->value);
81         ASSERT_EQ(status, DBStatus::OK);
82         Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
83         Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
84 
85         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
86         if (valueResult.size() != 0) {
87             EXPECT_TRUE(DistributedTestTools::IsValueEquals(valueResult, paramsPtr->entryPtr_->value));
88         }
89     } else {
90         valueResult = DistributedTestTools::Get(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
91         if (valueResult.size() != 0) {
92             status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
93             ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
94 
95             Uint8VecToString(paramsPtr->entryPtr_->key, strKey);
96             Uint8VecToString(paramsPtr->entryPtr_->value, strValue);
97         } else {
98             status = DistributedTestTools::Delete(*g_kvStoreConcurDelegate, paramsPtr->entryPtr_->key);
99             ASSERT_TRUE(status == DBStatus::NOT_FOUND || status == OK);
100         }
101     }
102 }
103 
KvCalculateTime(ConcurParam * & kvThreadParam,const Entry entryCurrent,const SysTime & start,SysDurTime dur)104 double KvCalculateTime(ConcurParam *&kvThreadParam, const Entry entryCurrent, const SysTime &start, SysDurTime dur)
105 {
106     SysTime end;
107     kvThreadParam->entryPtr_->key = entryCurrent.key;
108     kvThreadParam->entryPtr_->value = entryCurrent.value;
109     std::thread thread = std::thread(ConcurOperThread, reinterpret_cast<ConcurParam *>(kvThreadParam));
110     thread.join();
111 
112     std::this_thread::sleep_for(std::chrono::duration<float, std::milli>(LONG_TIME_INTERVAL_MILLSECONDS));
113     end = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
114     dur = std::chrono::duration_cast<chrono::microseconds>(end - start);
115     double operInterval = static_cast<double>(dur.count()) * chrono::microseconds::period::num
116         / chrono::microseconds::period::den;
117     delete kvThreadParam->entryPtr_;
118     kvThreadParam->entryPtr_ = nullptr;
119     delete kvThreadParam;
120     kvThreadParam = nullptr;
121     return operInterval;
122 }
123 
124 class DistributeddbKvConcurrencyCrudTest : public testing::Test {
125 public:
126     static void SetUpTestCase(void);
127     static void TearDownTestCase(void);
128     void SetUp();
129     void TearDown();
130 private:
131 };
132 
SetUpTestCase(void)133 void DistributeddbKvConcurrencyCrudTest::SetUpTestCase(void)
134 {
135 }
136 
TearDownTestCase(void)137 void DistributeddbKvConcurrencyCrudTest::TearDownTestCase(void)
138 {
139 }
140 
SetUp(void)141 void DistributeddbKvConcurrencyCrudTest::SetUp(void)
142 {
143     MST_LOG("SetUpTest before all cases local[%d].", IS_LOCAL);
144     RemoveDir(DIRECTOR);
145 
146     UnitTest *test = UnitTest::GetInstance();
147     ASSERT_NE(test, nullptr);
148     const TestInfo *testinfo = test->current_test_info();
149     ASSERT_NE(testinfo, nullptr);
150     string testCaseName = string(testinfo->name());
151     MST_LOG("[SetUp] test case %s is start to run", testCaseName.c_str());
152 
153     g_kvStoreConcurDelegate = DistributedTestTools::GetDelegateSuccess(g_concurManager,
154         g_kvdbParameter1, g_kvOption);
155     ASSERT_TRUE(g_concurManager != nullptr && g_kvStoreConcurDelegate != nullptr);
156 }
157 
TearDown(void)158 void DistributeddbKvConcurrencyCrudTest::TearDown(void)
159 {
160     MST_LOG("TearDownTest after all cases.");
161     EXPECT_TRUE(g_concurManager->CloseKvStore(g_kvStoreConcurDelegate) == OK);
162     g_kvStoreConcurDelegate = nullptr;
163     DBStatus status = g_concurManager->DeleteKvStore(STORE_ID_1);
164     EXPECT_TRUE(status == DistributedDB::DBStatus::OK) << "fail to delete exist kvdb";
165     delete g_concurManager;
166     g_concurManager = nullptr;
167     RemoveDir(DIRECTOR);
168 }
169 
170 /*
171  * @tc.name: Read 002
172  * @tc.desc: Verify that support long-term multi-threaded concurrent reading.
173  * @tc.type: Long time
174  * @tc.require: SR000BUH3J
175  * @tc.author: luqianfu
176  */
177 #ifdef KV_CON_CRUD
178 HWTEST_F(DistributeddbKvConcurrencyCrudTest, Read002, TestSize.Level3)
179 {
180     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
181 
182     vector<Entry> entriesBatch;
183     vector<Key> allKeys;
184     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
185 
186     /**
187      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
188      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
189      */
190     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
191     EXPECT_TRUE(status == DBStatus::OK);
192     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
193 
194     /**
195      * @tc.steps: step2. create 6 threads and read data from db concurrently for 10s.
196      * @tc.expected: step2. read successfully and has no exception.
197      */
198     std::random_device randDevReadKeyNo;
199     std::mt19937 genRandReadKeyNo(randDevReadKeyNo());
200     std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
201     unsigned long randKeyNo;
202     unsigned int threadCurId = 0;
203 
204     SysTime start;
205     SysDurTime dur;
206     double operInterval = 0.0;
207 
208     start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
209     Entry entryCurrent;
210     while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
211         auto threadParam = new (std::nothrow) ConcurParam;
212         ASSERT_NE(threadParam, nullptr);
213         threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
214         ASSERT_NE(threadParam->entryPtr_, nullptr);
215         threadParam->threadId_ = threadCurId++;
216         threadParam->tag_ = READ;
217         randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
218 
219         GenerateRecord(randKeyNo, entryCurrent);
220         operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
221     }
222 
223     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
224 }
225 
StartRandThread()226 void StartRandThread()
227 {
228     std::vector<std::thread> threads;
229     std::random_device randDevReadKeyNo, randDevWriteKeyNo, randDevTag;
230     std::mt19937 genRandReadKeyNo(randDevReadKeyNo()), genRandWriteKeyNo(randDevWriteKeyNo()), genRandTag(randDevTag());
231     std::uniform_int_distribution<unsigned long> disRandReadKeyNo(READ_RECORDS_NUM_START, READ_RECORDS_NUM_END);
232     std::uniform_int_distribution<unsigned long> disRandWriteKeyNo(WRITE_RECORDS_NUM_START, WRITE_RECORDS_NUM_END);
233     std::uniform_int_distribution<int> disRandTag(READ, WRITE);
234     unsigned long randKeyNo;
235     unsigned int threadCurId = 0;
236     int randTag;
237     SysTime start;
238     SysDurTime dur;
239     double operInterval = 0.0;
240     start = std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::steady_clock::now());
241     while (operInterval < static_cast<double>(LONG_TIME_TEST_SECONDS)) {
242         ConcurParam *threadParam = new (std::nothrow) ConcurParam;
243         ASSERT_NE(threadParam, nullptr);
244         threadParam->entryPtr_ = new (std::nothrow) DistributedDB::Entry;
245         ASSERT_NE(threadParam->entryPtr_, nullptr);
246         threadParam->threadId_ = threadCurId++;
247         randTag = disRandTag(genRandTag);
248         threadParam->tag_ = static_cast<ReadOrWriteTag>(randTag);
249         if (randTag == READ) {
250             randKeyNo = disRandReadKeyNo(genRandReadKeyNo);
251         } else {
252             randKeyNo = disRandWriteKeyNo(genRandWriteKeyNo);
253         }
254         Entry entryCurrent;
255         GenerateRecord(randKeyNo, entryCurrent);
256         operInterval = KvCalculateTime(threadParam, entryCurrent, start, dur);
257     }
258 }
259 /*
260  * @tc.name: ReadWrite 002
261  * @tc.desc: Verify that support long-term multi-threaded concurrent reading and writing.
262  * @tc.type: Long time
263  * @tc.require: SR000BUH3J
264  * @tc.author: luqianfu
265  */
266 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite002, TestSize.Level3)
267 {
268     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
269 
270     vector<Entry> entriesBatch;
271     vector<Key> allKeys;
272     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
273 
274     /**
275      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
276      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
277      */
278     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
279     EXPECT_TRUE(status == DBStatus::OK);
280     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
281     MST_LOG("value size %zu", valueResult.size());
282 
283     /**
284      * @tc.steps: step2. create 6 threads, read and write data concurrently for 10s.
285      * @tc.expected: step2. read and write successfully and has no exception.
286      */
287     StartRandThread();
288 
289     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
290 }
291 
292 /*
293  * @tc.name: ReadWrite 004
294  * @tc.desc: Verify that support multi-threaded concurrent writing.
295  * @tc.type: Long time
296  * @tc.require: SR000BUH3J
297  * @tc.author: luqianfu
298  */
299 HWTEST_F(DistributeddbKvConcurrencyCrudTest, ReadWrite004, TestSize.Level3)
300 {
301     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
302 
303     vector<Entry> entriesBatch;
304     vector<Key> allKeys;
305     GenerateRecords(BATCH_RECORDS, DEFAULT_START, allKeys, entriesBatch);
306 
307     /**
308      * @tc.steps: step1. putBatch 128 items of (keys,values) then getEntries with keyprefix='k'.
309      * @tc.expected: step1. putBatch successfully and the size of GetEntries is 128.
310      */
311     DBStatus status = DistributedTestTools::PutBatch(*g_kvStoreConcurDelegate, entriesBatch);
312     EXPECT_TRUE(status == DBStatus::OK);
313     vector<Entry> valueResult = DistributedTestTools::GetEntries(*g_kvStoreConcurDelegate, KEY_SEARCH_4);
314     MST_LOG("value size %zu", valueResult.size());
315     /**
316      * @tc.steps: step2. create 6 threads to write and create 4 threads to delete data concurrently for 10s.
317      * @tc.expected: step2. write and delete successfully and has no exception.
318      */
319     StartRandThread();
320 
321     DistributedTestTools::Clear(*g_kvStoreConcurDelegate);
322 }
323 #endif
324 }
325 #endif // OMIT_MULTI_VER
326