• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef QUERY_SYNC_WATER_MARK_HELPER_H
17 #define QUERY_SYNC_WATER_MARK_HELPER_H
18 
19 #include <deque>
20 #include <map>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 #include "db_types.h"
25 #include "ikvdb_sync_interface.h"
26 #include "lru_map.h"
27 
28 namespace DistributedDB {
29 struct QueryWaterMark {
30     uint32_t version = 0; // start with 103
31     WaterMark sendWaterMark = 0;
32     WaterMark recvWaterMark = 0;
33     Timestamp lastUsedTime = 0; // use for delete data
34     std::string sql; // for analyze sql from logs
35     Timestamp lastQueryTime = 0; // use for miss query scene add in 106
36 };
37 
38 struct DeleteWaterMark {
39     uint32_t version = 0;
40     WaterMark sendWaterMark = 0;
41     WaterMark recvWaterMark = 0;
42 };
43 
44 class QuerySyncWaterMarkHelper {
45 public:
46     QuerySyncWaterMarkHelper();
47     ~QuerySyncWaterMarkHelper();
48 
49     DISABLE_COPY_ASSIGN_MOVE(QuerySyncWaterMarkHelper);
50 
51     int Initialize(ISyncInterface *storage);
52 
53     int GetQueryWaterMark(const std::string &queryIdentify, const std::string &deviceId, const std::string &userId,
54         QueryWaterMark &queryWaterMark);
55 
56     int SetSendQueryWaterMark(const std::string &queryIdentify,
57         const std::string &deviceId, const std::string &userId, const WaterMark &waterMark);
58 
59     int SetRecvQueryWaterMark(const std::string &queryIdentify,
60         const std::string &deviceId, const std::string &userId, const WaterMark &waterMark);
61 
62     int SetLastQueryTime(const std::string &queryIdentify,
63         const std::string &deviceId, const std::string &userId, const Timestamp &timestamp);
64 
65     int GetDeleteSyncWaterMark(const std::string &deviceId, const std::string &userId,
66         DeleteWaterMark &deleteWaterMark);
67 
68     int SetSendDeleteSyncWaterMark(const std::string &deviceId, const std::string &userId, const WaterMark &waterMark);
69 
70     int SetRecvDeleteSyncWaterMark(const std::string &deviceId, const std::string &userId, const WaterMark &waterMark,
71         bool isNeedHash);
72 
73     // this function will read deleteWaterMark from db by it's deleteWaterMarkKey
74     // and then serialize it and put to cache
75     int LoadDeleteSyncDataToCache(const Key &deleteWaterMarkKey);
76 
77     // this function will remove data in db
78     int RemoveLeastUsedQuerySyncItems(const std::vector<Key> &querySyncIds);
79 
80     // reset the waterMark to zero
81     int ResetRecvQueryWaterMark(const DeviceID &deviceId, const DeviceID &userId, const std::string &tableName,
82         bool isNeedHash);
83 
84     static std::string GetQuerySyncPrefixKey();
85 
86     static std::string GetDeleteSyncPrefixKey();
87 
88 private:
89 
90     int GetMetadataFromDb(const std::vector<uint8_t> &key, std::vector<uint8_t> &outValue);
91 
92     int GetMetadataByPrefixKeyFromDb(const Key &prefixKey, std::map<Key, Value> &data);
93 
94     int SetMetadataToDb(const std::vector<uint8_t> &key, const std::vector<uint8_t> &inValue);
95 
96     int DeleteMetaDataFromDB(const std::vector<Key> &keys) const;
97 
98     int SaveQueryWaterMarkToDB(const DeviceID &dbKeyString, const QueryWaterMark &queryWaterMark);
99 
100     int GetQueryWaterMarkFromDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark);
101 
102     int SetRecvQueryWaterMarkWithoutLock(const std::string &cacheKey,
103         const WaterMark &waterMark);
104 
105     // search the queryWaterMark from db or cache_
106     // and ensure it exit in cache_
107     int GetQueryWaterMarkInCacheAndDb(const std::string &cacheKey, QueryWaterMark &queryWaterMark);
108 
109     // only first create queryWaterMark will call this function
110     // it will create a queryWaterMark and save to db
111     int PutQueryWaterMarkToDB(const DeviceID &dbKeyString, QueryWaterMark &queryWaterMark);
112 
113     // get the querySync hashId in cache_ or generate one and then put it in to cache_
114     // the hashId is made up of "QUERY_SYNC_PREFIX_KEY" + hash(deviceId) + queryId
115     DeviceID GetHashQuerySyncDeviceId(const DeviceID &deviceId, const DeviceID &userId, const DeviceID &queryId);
116 
117     // put queryWaterMark to lru cache_ and then save to db
118     int UpdateCacheAndSave(const std::string &cacheKey, QueryWaterMark &queryWaterMark);
119 
120     // search the deleteWaterMark from db or cache_
121     // and ensure it exit in cache_
122     int GetDeleteWaterMarkFromCache(const DeviceID &hashDeviceId, DeleteWaterMark &deleteWaterMark);
123 
124     // get the deleteSync hashId in cache_ or generate one and then put it in to cache_
125     // the hashId is made up of "DELETE_SYNC_PREFIX_KEY" + hash(deviceId)
126     DeviceID GetHashDeleteSyncDeviceId(const DeviceID &deviceId, const DeviceID &userId, bool isNeedHash = true);
127 
128     int SaveDeleteWaterMarkToDB(const DeviceID &hashDeviceId, const DeleteWaterMark &deleteWaterMark);
129 
130     int GetDeleteWaterMarkFromDB(const DeviceID &hashDeviceId, DeleteWaterMark &deleteWaterMark);
131 
132     int GetDeleteWatersMarkFromDB(const DeviceID &hashId, std::map<Key, DeleteWaterMark> &deleteWaterMarks);
133 
134     // put queryWaterMark to lru cache_ and then save to db
135     int UpdateDeleteSyncCacheAndSave(const std::string &dbKey, const DeleteWaterMark &deleteWaterMark);
136 
137     static int SerializeQueryWaterMark(const QueryWaterMark &queryWaterMark, std::vector<uint8_t> &outValue);
138 
139     static int DeSerializeQueryWaterMark(const std::vector<uint8_t> &dbQueryWaterMark, QueryWaterMark &queryWaterMark);
140 
141     static uint64_t CalculateQueryWaterMarkSize(const QueryWaterMark &queryWaterMark);
142 
143     static int SerializeDeleteWaterMark(const DeleteWaterMark &deleteWaterMark, std::vector<uint8_t> &outValue);
144 
145     static int DeSerializeDeleteWaterMark(const std::vector<uint8_t> &inValue, DeleteWaterMark &deleteWaterMark);
146 
147     static uint64_t CalculateDeleteWaterMarkSize();
148 
149     // store or visit queryWaterMark should add a lock
150     // because it will change the eliminationChain
151     // and the queryWaterMark use a LRU Map to store in ram
152     std::mutex queryWaterMarkLock_;
153     std::map<DeviceSyncTarget, std::map<std::string, std::string>> deviceIdToHashQuerySyncIdMap_;
154 
155     // also store deleteKeyWaterMark should add a lock
156     std::mutex deleteSyncLock_;
157     std::map<DeviceSyncTarget, std::string> deviceIdToHashDeleteSyncIdMap_;
158 
159     ISyncInterface *storage_;
160 };
161 } // namespace DistributedDB
162 #endif
163