1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "single_ver_relational_syncer.h"
16 #ifdef RELATIONAL_STORE
17 #include "db_common.h"
18 #include "relational_db_sync_interface.h"
19 #include "single_ver_sync_engine.h"
20
21 namespace DistributedDB {
Initialize(ISyncInterface * syncInterface,bool isNeedActive)22 int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
23 {
24 int errCode = SingleVerSyncer::Initialize(syncInterface, isNeedActive);
25 if (errCode != E_OK) {
26 return errCode;
27 }
28 auto callback = std::bind(&SingleVerRelationalSyncer::SchemaChangeCallback, this);
29 return static_cast<RelationalDBSyncInterface *>(syncInterface)->
30 RegisterSchemaChangedCallback(callback);
31 }
32
Sync(const SyncParma & param,uint64_t connectionId)33 int SingleVerRelationalSyncer::Sync(const SyncParma ¶m, uint64_t connectionId)
34 {
35 if (QuerySyncPreCheck(param) != E_OK) {
36 return -E_NOT_SUPPORT;
37 }
38 return GenericSyncer::Sync(param, connectionId);
39 }
40
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)41 int SingleVerRelationalSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId)
42 {
43 const auto &syncInterface = static_cast<RelationalDBSyncInterface *>(syncInterface_);
44 std::vector<QuerySyncObject> tablesQuery;
45 if (param.isQuerySync) {
46 tablesQuery.push_back(param.syncQuery);
47 } else {
48 tablesQuery = syncInterface->GetTablesQuery();
49 }
50 std::set<uint32_t> subSyncIdSet;
51 int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet);
52 if (errCode != E_OK) {
53 DoRollBack(subSyncIdSet);
54 return errCode;
55 }
56 if (param.wait) {
57 bool connectionClose = false;
58 {
59 std::lock_guard<std::mutex> lockGuard(syncIdLock_);
60 connectionClose = connectionIdMap_.find(connectionId) == connectionIdMap_.end();
61 }
62 if (!connectionClose) {
63 DoOnComplete(param, syncId);
64 }
65 }
66 return E_OK;
67 }
68
GenerateEachSyncTask(const SyncParma & param,uint32_t syncId,const std::vector<QuerySyncObject> & tablesQuery,uint64_t connectionId,std::set<uint32_t> & subSyncIdSet)69 int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma ¶m, uint32_t syncId,
70 const std::vector<QuerySyncObject> &tablesQuery, uint64_t connectionId, std::set<uint32_t> &subSyncIdSet)
71 {
72 SyncParma subParam = param;
73 subParam.isQuerySync = true;
74 int errCode = E_OK;
75 for (const QuerySyncObject &table : tablesQuery) {
76 uint32_t subSyncId = GenerateSyncId();
77 std::string hashTableName = DBCommon::TransferHashString(table.GetRelationTableName());
78 LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
79 subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
80 subParam.syncQuery = table;
81 subParam.onComplete = std::bind(&SingleVerRelationalSyncer::DoOnSubSyncComplete, this, subSyncId,
82 syncId, param, std::placeholders::_1);
83 {
84 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
85 fullSyncIdMap_[syncId].insert(subSyncId);
86 }
87 errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId);
88 if (errCode != E_OK) {
89 LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode);
90 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
91 fullSyncIdMap_[syncId].erase(subSyncId);
92 break;
93 }
94 subSyncIdSet.insert(subSyncId);
95 }
96 return errCode;
97 }
98
DoOnSubSyncComplete(const uint32_t subSyncId,const uint32_t syncId,const SyncParma & param,const std::map<std::string,int> & devicesMap)99 void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
100 const SyncParma ¶m, const std::map<std::string, int> &devicesMap)
101 {
102 bool allFinish = true;
103 {
104 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
105 fullSyncIdMap_[syncId].erase(subSyncId);
106 allFinish = fullSyncIdMap_[syncId].empty();
107 for (const auto &item : devicesMap) {
108 resMap_[syncId][item.first][param.syncQuery.GetRelationTableName()] = static_cast<int>(item.second);
109 }
110 }
111 // block sync do callback in sync function
112 if (allFinish && !param.wait) {
113 DoOnComplete(param, syncId);
114 }
115 }
116
DoRollBack(std::set<uint32_t> & subSyncIdSet)117 void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
118 {
119 for (const auto &removeId : subSyncIdSet) {
120 int retCode = RemoveSyncOperation(static_cast<int>(removeId));
121 if (retCode != E_OK) {
122 LOGW("[SingleVerRelationalSyncer] RemoveSyncOperation failed errCode:%d, syncId:%d", retCode, removeId);
123 }
124 }
125 }
126
DoOnComplete(const SyncParma & param,uint32_t syncId)127 void SingleVerRelationalSyncer::DoOnComplete(const SyncParma ¶m, uint32_t syncId)
128 {
129 if (!param.relationOnComplete) {
130 return;
131 }
132 std::map<std::string, std::vector<TableStatus>> syncRes;
133 std::map<std::string, std::map<std::string, int>> tmpMap;
134 {
135 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
136 tmpMap = resMap_[syncId];
137 }
138 for (const auto &devicesRes : tmpMap) {
139 for (const auto &tableRes : devicesRes.second) {
140 syncRes[devicesRes.first].push_back(
141 {tableRes.first, static_cast<DBStatus>(tableRes.second)});
142 }
143 }
144 param.relationOnComplete(syncRes);
145 {
146 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
147 resMap_.erase(syncId);
148 fullSyncIdMap_.erase(syncId);
149 }
150 }
151
EnableAutoSync(bool enable)152 void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
153 {
154 (void)enable;
155 }
156
LocalDataChanged(int notifyEvent)157 void SingleVerRelationalSyncer::LocalDataChanged(int notifyEvent)
158 {
159 (void)notifyEvent;
160 }
161
SchemaChangeCallback()162 void SingleVerRelationalSyncer::SchemaChangeCallback()
163 {
164 if (syncEngine_ == nullptr) {
165 return;
166 }
167 syncEngine_->SchemaChange();
168 }
169
SyncConditionCheck(const SyncParma & param,const ISyncEngine * engine,ISyncInterface * storage) const170 int SingleVerRelationalSyncer::SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine,
171 ISyncInterface *storage) const
172 {
173 if (!param.isQuerySync) {
174 return E_OK;
175 }
176 QuerySyncObject query = param.syncQuery;
177 int errCode = static_cast<RelationalDBSyncInterface *>(storage)->CheckAndInitQueryCondition(query);
178 if (errCode != E_OK) {
179 LOGE("[SingleVerRelationalSyncer] QuerySyncObject check failed");
180 return errCode;
181 }
182 if (param.mode == SUBSCRIBE_QUERY) {
183 return -E_NOT_SUPPORT;
184 }
185 return E_OK;
186 }
187
QuerySyncPreCheck(const SyncParma & param) const188 int SingleVerRelationalSyncer::QuerySyncPreCheck(const SyncParma ¶m) const
189 {
190 if (!param.syncQuery.GetRelationTableNames().empty()) {
191 LOGE("[SingleVerRelationalSyncer] sync with not support query");
192 return -E_NOT_SUPPORT;
193 }
194 if (!param.isQuerySync) {
195 return E_OK;
196 }
197 if (param.mode == SYNC_MODE_PUSH_PULL) {
198 LOGE("[SingleVerRelationalSyncer] sync with not support push_pull mode");
199 return -E_NOT_SUPPORT;
200 }
201 if (param.syncQuery.GetRelationTableName().empty()) {
202 LOGE("[SingleVerRelationalSyncer] sync with empty table");
203 return -E_NOT_SUPPORT;
204 }
205 return E_OK;
206 }
207 }
208 #endif