• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "single_ver_relational_syncer.h"
16 #ifdef RELATIONAL_STORE
17 #include "db_common.h"
18 #include "relational_db_sync_interface.h"
19 #include "single_ver_sync_engine.h"
20 
21 namespace DistributedDB {
Initialize(ISyncInterface * syncInterface,bool isNeedActive)22 int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
23 {
24     int errCode = SingleVerSyncer::Initialize(syncInterface, isNeedActive);
25     if (errCode != E_OK) {
26         return errCode;
27     }
28     auto callback = [this] { SchemaChangeCallback(); };
29     return static_cast<RelationalDBSyncInterface *>(syncInterface)->
30         RegisterSchemaChangedCallback(callback);
31 }
32 
Sync(const SyncParam & param,uint64_t connectionId)33 int SingleVerRelationalSyncer::Sync(const SyncParam &param, uint64_t connectionId)
34 {
35     int errCode = QuerySyncPreCheck(param);
36     if (errCode != E_OK) {
37         return errCode;
38     }
39     return GenericSyncer::Sync(param, connectionId);
40 }
41 
PrepareSync(const SyncParam & param,uint32_t syncId,uint64_t connectionId)42 int SingleVerRelationalSyncer::PrepareSync(const SyncParam &param, uint32_t syncId, uint64_t connectionId)
43 {
44     if (syncInterface_ == nullptr) {
45         LOGE("[SingleVerRelationalSyncer] [PrepareSync] syncInterface_ is nullptr.");
46         return -E_INTERNAL_ERROR;
47     }
48     const auto &syncInterface = static_cast<RelationalDBSyncInterface *>(syncInterface_);
49     std::vector<QuerySyncObject> tablesQuery;
50     if (param.isQuerySync) {
51         tablesQuery = GetQuerySyncObject(param);
52     } else {
53         tablesQuery = syncInterface->GetTablesQuery();
54     }
55     std::set<uint32_t> subSyncIdSet;
56     int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet);
57     if (errCode != E_OK) {
58         DoRollBack(subSyncIdSet);
59         return errCode;
60     }
61     if (param.wait) {
62         bool connectionClose = false;
63         {
64             std::lock_guard<std::mutex> lockGuard(syncIdLock_);
65             connectionClose = connectionIdMap_.find(connectionId) == connectionIdMap_.end();
66         }
67         if (!connectionClose) {
68             DoOnComplete(param, syncId);
69         }
70     }
71     return E_OK;
72 }
73 
GenerateEachSyncTask(const SyncParam & param,uint32_t syncId,const std::vector<QuerySyncObject> & tablesQuery,uint64_t connectionId,std::set<uint32_t> & subSyncIdSet)74 int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParam &param, uint32_t syncId,
75     const std::vector<QuerySyncObject> &tablesQuery, uint64_t connectionId, std::set<uint32_t> &subSyncIdSet)
76 {
77     SyncParam subParam = param;
78     subParam.isQuerySync = true;
79     int errCode = E_OK;
80     for (const QuerySyncObject &table : tablesQuery) {
81         uint32_t subSyncId = GenerateSyncId();
82         std::string hashTableName = DBCommon::TransferHashString(table.GetRelationTableName());
83         LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
84             subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
85         subParam.syncQuery = table;
86         subParam.onComplete = [this, subSyncId, syncId, subParam](const std::map<std::string, int> &devicesMap) {
87             DoOnSubSyncComplete(subSyncId, syncId, subParam, devicesMap);
88         };
89         {
90             std::lock_guard<std::mutex> lockGuard(syncMapLock_);
91             fullSyncIdMap_[syncId].insert(subSyncId);
92         }
93         errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId);
94         if (errCode != E_OK) {
95             LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode);
96             std::lock_guard<std::mutex> lockGuard(syncMapLock_);
97             fullSyncIdMap_[syncId].erase(subSyncId);
98             break;
99         }
100         subSyncIdSet.insert(subSyncId);
101     }
102     return errCode;
103 }
104 
DoOnSubSyncComplete(const uint32_t subSyncId,const uint32_t syncId,const SyncParam & param,const std::map<std::string,int> & devicesMap)105 void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
106     const SyncParam &param, const std::map<std::string, int> &devicesMap)
107 {
108     bool allFinish = true;
109     {
110         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
111         fullSyncIdMap_[syncId].erase(subSyncId);
112         allFinish = fullSyncIdMap_[syncId].empty();
113         TableStatus tableStatus;
114         tableStatus.tableName = param.syncQuery.GetRelationTableName();
115         for (const auto &item : devicesMap) {
116             tableStatus.status = static_cast<DBStatus>(item.second);
117             resMap_[syncId][item.first].push_back(tableStatus);
118         }
119     }
120     // block sync do callback in sync function
121     if (allFinish && !param.wait) {
122         DoOnComplete(param, syncId);
123     }
124 }
125 
DoRollBack(std::set<uint32_t> & subSyncIdSet)126 void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
127 {
128     for (const auto &removeId : subSyncIdSet) {
129         int retCode = RemoveSyncOperation(static_cast<int>(removeId));
130         if (retCode != E_OK) {
131             LOGW("[SingleVerRelationalSyncer] RemoveSyncOperation failed errCode:%d, syncId:%d", retCode, removeId);
132         }
133     }
134 }
135 
DoOnComplete(const SyncParam & param,uint32_t syncId)136 void SingleVerRelationalSyncer::DoOnComplete(const SyncParam &param, uint32_t syncId)
137 {
138     if (!param.relationOnComplete) {
139         return;
140     }
141     std::map<std::string, std::vector<TableStatus>> syncRes;
142     std::map<std::string, std::vector<TableStatus>> tmpMap;
143     {
144         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
145         tmpMap = resMap_[syncId];
146     }
147     for (const auto &devicesRes : tmpMap) {
148         for (const auto &tableRes : devicesRes.second) {
149             syncRes[devicesRes.first].push_back(
150                 {tableRes.tableName, static_cast<DBStatus>(tableRes.status)});
151         }
152     }
153     param.relationOnComplete(syncRes);
154     {
155         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
156         resMap_.erase(syncId);
157         fullSyncIdMap_.erase(syncId);
158     }
159 }
160 
EnableAutoSync(bool enable)161 void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
162 {
163     (void)enable;
164 }
165 
LocalDataChanged(int notifyEvent)166 void SingleVerRelationalSyncer::LocalDataChanged(int notifyEvent)
167 {
168     (void)notifyEvent;
169 }
170 
SchemaChangeCallback()171 void SingleVerRelationalSyncer::SchemaChangeCallback()
172 {
173     if (syncEngine_ == nullptr) {
174         return;
175     }
176     syncEngine_->SchemaChange();
177     int errCode = UpgradeSchemaVerInMeta();
178     if (errCode != E_OK) {
179         LOGE("[SingleVerRelationalSyncer] upgrade schema version in meta failed:%d", errCode);
180     }
181 }
182 
SyncConditionCheck(const SyncParam & param,const ISyncEngine * engine,ISyncInterface * storage) const183 int SingleVerRelationalSyncer::SyncConditionCheck(const SyncParam &param, const ISyncEngine *engine,
184     ISyncInterface *storage) const
185 {
186     if (!param.isQuerySync) {
187         return E_OK;
188     }
189     auto queryList = GetQuerySyncObject(param);
190     const RelationalSchemaObject schemaObj = static_cast<RelationalDBSyncInterface *>(storage)->GetSchemaInfo();
191     const std::vector<DistributedTable> &sTable = schemaObj.GetDistributedSchema().tables;
192     if (schemaObj.GetTableMode() == DistributedTableMode::COLLABORATION && sTable.empty()) {
193         LOGE("[SingleVerRelationalSyncer] Distributed schema not set in COLLABORATION mode");
194         return -E_SCHEMA_MISMATCH;
195     }
196     for (auto &item : queryList) {
197         int errCode = static_cast<RelationalDBSyncInterface *>(storage)->CheckAndInitQueryCondition(item);
198         if (errCode != E_OK) {
199             LOGE("[SingleVerRelationalSyncer] table %s[length: %zu] query check failed %d",
200                 DBCommon::StringMiddleMasking(item.GetTableName()).c_str(), item.GetTableName().size(), errCode);
201             return errCode;
202         }
203         if (schemaObj.GetTableMode() == DistributedTableMode::COLLABORATION) {
204             auto iter = std::find_if(sTable.begin(), sTable.end(), [&item](const DistributedTable &table) {
205                 return DBCommon::ToLowerCase(table.tableName) == DBCommon::ToLowerCase(item.GetTableName());
206             });
207             if (iter == sTable.end()) {
208                 LOGE("[SingleVerRelationalSyncer] table %s[length: %zu] mismatch distributed schema",
209                     DBCommon::StringMiddleMasking(item.GetTableName()).c_str(), item.GetTableName().size());
210                 return -E_SCHEMA_MISMATCH;
211             }
212         }
213     }
214     if (param.mode == SUBSCRIBE_QUERY) {
215         return -E_NOT_SUPPORT;
216     }
217     return E_OK;
218 }
219 
QuerySyncPreCheck(const SyncParam & param) const220 int SingleVerRelationalSyncer::QuerySyncPreCheck(const SyncParam &param) const
221 {
222     if (!param.isQuerySync) {
223         return E_OK;
224     }
225     if (param.mode == SYNC_MODE_PUSH_PULL) {
226         LOGE("[SingleVerRelationalSyncer] sync with not support push_pull mode");
227         return -E_NOT_SUPPORT;
228     }
229     if (param.syncQuery.IsUseFromTables() && param.syncQuery.GetRelationTableNames().empty()) {
230         LOGE("[SingleVerRelationalSyncer] sync with from table but no table found");
231         return -E_INVALID_ARGS;
232     }
233     if (!param.syncQuery.IsUseFromTables() && param.syncQuery.GetRelationTableName().empty()) {
234         LOGE("[SingleVerRelationalSyncer] sync with empty table");
235         return -E_NOT_SUPPORT;
236     }
237     return E_OK;
238 }
239 
GetQuerySyncObject(const SyncParam & param)240 std::vector<QuerySyncObject> SingleVerRelationalSyncer::GetQuerySyncObject(const SyncParam &param)
241 {
242     std::vector<QuerySyncObject> res;
243     auto tables = param.syncQuery.GetRelationTableNames();
244     if (!tables.empty()) {
245         for (const auto &it : tables) {
246             res.emplace_back(Query::Select(it));
247         }
248     } else {
249         res.push_back(param.syncQuery);
250     }
251     return res;
252 }
253 
GetTaskCount()254 int32_t SingleVerRelationalSyncer::GetTaskCount()
255 {
256     int32_t count = GenericSyncer::GetTaskCount();
257     ISyncEngine *syncEngine = nullptr;
258     {
259         std::lock_guard<std::mutex> lock(syncerLock_);
260         if (syncEngine_ == nullptr) {
261             return count;
262         }
263         syncEngine = syncEngine_;
264         RefObject::IncObjRef(syncEngine);
265     }
266     count += syncEngine->GetRemoteQueryTaskCount();
267     RefObject::DecObjRef(syncEngine);
268     return count;
269 }
270 }
271 #endif