1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "single_ver_relational_syncer.h"
16 #ifdef RELATIONAL_STORE
17 #include "db_common.h"
18 #include "relational_db_sync_interface.h"
19 #include "single_ver_sync_engine.h"
20
21 namespace DistributedDB {
Initialize(ISyncInterface * syncInterface,bool isNeedActive)22 int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
23 {
24 int errCode = SingleVerSyncer::Initialize(syncInterface, isNeedActive);
25 if (errCode != E_OK) {
26 return errCode;
27 }
28 auto callback = [this] { SchemaChangeCallback(); };
29 return static_cast<RelationalDBSyncInterface *>(syncInterface)->
30 RegisterSchemaChangedCallback(callback);
31 }
32
Sync(const SyncParam & param,uint64_t connectionId)33 int SingleVerRelationalSyncer::Sync(const SyncParam ¶m, uint64_t connectionId)
34 {
35 int errCode = QuerySyncPreCheck(param);
36 if (errCode != E_OK) {
37 return errCode;
38 }
39 return GenericSyncer::Sync(param, connectionId);
40 }
41
PrepareSync(const SyncParam & param,uint32_t syncId,uint64_t connectionId)42 int SingleVerRelationalSyncer::PrepareSync(const SyncParam ¶m, uint32_t syncId, uint64_t connectionId)
43 {
44 if (syncInterface_ == nullptr) {
45 LOGE("[SingleVerRelationalSyncer] [PrepareSync] syncInterface_ is nullptr.");
46 return -E_INTERNAL_ERROR;
47 }
48 const auto &syncInterface = static_cast<RelationalDBSyncInterface *>(syncInterface_);
49 std::vector<QuerySyncObject> tablesQuery;
50 if (param.isQuerySync) {
51 tablesQuery = GetQuerySyncObject(param);
52 } else {
53 tablesQuery = syncInterface->GetTablesQuery();
54 }
55 std::set<uint32_t> subSyncIdSet;
56 int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet);
57 if (errCode != E_OK) {
58 DoRollBack(subSyncIdSet);
59 return errCode;
60 }
61 if (param.wait) {
62 bool connectionClose = false;
63 {
64 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
65 connectionClose = connectionIdMap_.find(connectionId) == connectionIdMap_.end();
66 }
67 if (!connectionClose) {
68 DoOnComplete(param, syncId);
69 }
70 }
71 return E_OK;
72 }
73
GenerateEachSyncTask(const SyncParam & param,uint32_t syncId,const std::vector<QuerySyncObject> & tablesQuery,uint64_t connectionId,std::set<uint32_t> & subSyncIdSet)74 int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParam ¶m, uint32_t syncId,
75 const std::vector<QuerySyncObject> &tablesQuery, uint64_t connectionId, std::set<uint32_t> &subSyncIdSet)
76 {
77 SyncParam subParam = param;
78 subParam.isQuerySync = true;
79 int errCode = E_OK;
80 for (const QuerySyncObject &table : tablesQuery) {
81 uint32_t subSyncId = GenerateSyncId();
82 std::string hashTableName = DBCommon::TransferHashString(table.GetRelationTableName());
83 LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
84 subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
85 subParam.syncQuery = table;
86 subParam.onComplete = [this, subSyncId, syncId, subParam](const std::map<std::string, int> &devicesMap) {
87 DoOnSubSyncComplete(subSyncId, syncId, subParam, devicesMap);
88 };
89 {
90 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
91 fullSyncIdMap_[syncId].insert(subSyncId);
92 }
93 errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId);
94 if (errCode != E_OK) {
95 LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode);
96 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
97 fullSyncIdMap_[syncId].erase(subSyncId);
98 break;
99 }
100 subSyncIdSet.insert(subSyncId);
101 }
102 return errCode;
103 }
104
DoOnSubSyncComplete(const uint32_t subSyncId,const uint32_t syncId,const SyncParam & param,const std::map<std::string,int> & devicesMap)105 void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
106 const SyncParam ¶m, const std::map<std::string, int> &devicesMap)
107 {
108 bool allFinish = true;
109 {
110 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
111 fullSyncIdMap_[syncId].erase(subSyncId);
112 allFinish = fullSyncIdMap_[syncId].empty();
113 TableStatus tableStatus;
114 tableStatus.tableName = param.syncQuery.GetRelationTableName();
115 for (const auto &item : devicesMap) {
116 tableStatus.status = static_cast<DBStatus>(item.second);
117 resMap_[syncId][item.first].push_back(tableStatus);
118 }
119 }
120 // block sync do callback in sync function
121 if (allFinish && !param.wait) {
122 DoOnComplete(param, syncId);
123 }
124 }
125
DoRollBack(std::set<uint32_t> & subSyncIdSet)126 void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
127 {
128 for (const auto &removeId : subSyncIdSet) {
129 int retCode = RemoveSyncOperation(static_cast<int>(removeId));
130 if (retCode != E_OK) {
131 LOGW("[SingleVerRelationalSyncer] RemoveSyncOperation failed errCode:%d, syncId:%d", retCode, removeId);
132 }
133 }
134 }
135
DoOnComplete(const SyncParam & param,uint32_t syncId)136 void SingleVerRelationalSyncer::DoOnComplete(const SyncParam ¶m, uint32_t syncId)
137 {
138 if (!param.relationOnComplete) {
139 return;
140 }
141 std::map<std::string, std::vector<TableStatus>> syncRes;
142 std::map<std::string, std::vector<TableStatus>> tmpMap;
143 {
144 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
145 tmpMap = resMap_[syncId];
146 }
147 for (const auto &devicesRes : tmpMap) {
148 for (const auto &tableRes : devicesRes.second) {
149 syncRes[devicesRes.first].push_back(
150 {tableRes.tableName, static_cast<DBStatus>(tableRes.status)});
151 }
152 }
153 param.relationOnComplete(syncRes);
154 {
155 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
156 resMap_.erase(syncId);
157 fullSyncIdMap_.erase(syncId);
158 }
159 }
160
EnableAutoSync(bool enable)161 void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
162 {
163 (void)enable;
164 }
165
LocalDataChanged(int notifyEvent)166 void SingleVerRelationalSyncer::LocalDataChanged(int notifyEvent)
167 {
168 (void)notifyEvent;
169 }
170
SchemaChangeCallback()171 void SingleVerRelationalSyncer::SchemaChangeCallback()
172 {
173 if (syncEngine_ == nullptr) {
174 return;
175 }
176 syncEngine_->SchemaChange();
177 int errCode = UpgradeSchemaVerInMeta();
178 if (errCode != E_OK) {
179 LOGE("[SingleVerRelationalSyncer] upgrade schema version in meta failed:%d", errCode);
180 }
181 }
182
SyncConditionCheck(const SyncParam & param,const ISyncEngine * engine,ISyncInterface * storage) const183 int SingleVerRelationalSyncer::SyncConditionCheck(const SyncParam ¶m, const ISyncEngine *engine,
184 ISyncInterface *storage) const
185 {
186 if (!param.isQuerySync) {
187 return E_OK;
188 }
189 auto queryList = GetQuerySyncObject(param);
190 const RelationalSchemaObject schemaObj = static_cast<RelationalDBSyncInterface *>(storage)->GetSchemaInfo();
191 const std::vector<DistributedTable> &sTable = schemaObj.GetDistributedSchema().tables;
192 if (schemaObj.GetTableMode() == DistributedTableMode::COLLABORATION && sTable.empty()) {
193 LOGE("[SingleVerRelationalSyncer] Distributed schema not set in COLLABORATION mode");
194 return -E_SCHEMA_MISMATCH;
195 }
196 for (auto &item : queryList) {
197 int errCode = static_cast<RelationalDBSyncInterface *>(storage)->CheckAndInitQueryCondition(item);
198 if (errCode != E_OK) {
199 LOGE("[SingleVerRelationalSyncer] table %s[length: %zu] query check failed %d",
200 DBCommon::StringMiddleMasking(item.GetTableName()).c_str(), item.GetTableName().size(), errCode);
201 return errCode;
202 }
203 if (schemaObj.GetTableMode() == DistributedTableMode::COLLABORATION) {
204 auto iter = std::find_if(sTable.begin(), sTable.end(), [&item](const DistributedTable &table) {
205 return DBCommon::ToLowerCase(table.tableName) == DBCommon::ToLowerCase(item.GetTableName());
206 });
207 if (iter == sTable.end()) {
208 LOGE("[SingleVerRelationalSyncer] table %s[length: %zu] mismatch distributed schema",
209 DBCommon::StringMiddleMasking(item.GetTableName()).c_str(), item.GetTableName().size());
210 return -E_SCHEMA_MISMATCH;
211 }
212 }
213 }
214 if (param.mode == SUBSCRIBE_QUERY) {
215 return -E_NOT_SUPPORT;
216 }
217 return E_OK;
218 }
219
QuerySyncPreCheck(const SyncParam & param) const220 int SingleVerRelationalSyncer::QuerySyncPreCheck(const SyncParam ¶m) const
221 {
222 if (!param.isQuerySync) {
223 return E_OK;
224 }
225 if (param.mode == SYNC_MODE_PUSH_PULL) {
226 LOGE("[SingleVerRelationalSyncer] sync with not support push_pull mode");
227 return -E_NOT_SUPPORT;
228 }
229 if (param.syncQuery.IsUseFromTables() && param.syncQuery.GetRelationTableNames().empty()) {
230 LOGE("[SingleVerRelationalSyncer] sync with from table but no table found");
231 return -E_INVALID_ARGS;
232 }
233 if (!param.syncQuery.IsUseFromTables() && param.syncQuery.GetRelationTableName().empty()) {
234 LOGE("[SingleVerRelationalSyncer] sync with empty table");
235 return -E_NOT_SUPPORT;
236 }
237 return E_OK;
238 }
239
GetQuerySyncObject(const SyncParam & param)240 std::vector<QuerySyncObject> SingleVerRelationalSyncer::GetQuerySyncObject(const SyncParam ¶m)
241 {
242 std::vector<QuerySyncObject> res;
243 auto tables = param.syncQuery.GetRelationTableNames();
244 if (!tables.empty()) {
245 for (const auto &it : tables) {
246 res.emplace_back(Query::Select(it));
247 }
248 } else {
249 res.push_back(param.syncQuery);
250 }
251 return res;
252 }
253
GetTaskCount()254 int32_t SingleVerRelationalSyncer::GetTaskCount()
255 {
256 int32_t count = GenericSyncer::GetTaskCount();
257 ISyncEngine *syncEngine = nullptr;
258 {
259 std::lock_guard<std::mutex> lock(syncerLock_);
260 if (syncEngine_ == nullptr) {
261 return count;
262 }
263 syncEngine = syncEngine_;
264 RefObject::IncObjRef(syncEngine);
265 }
266 count += syncEngine->GetRemoteQueryTaskCount();
267 RefObject::DecObjRef(syncEngine);
268 return count;
269 }
270 }
271 #endif