1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "single_ver_relational_syncer.h"
16 #ifdef RELATIONAL_STORE
17 #include "db_common.h"
18 #include "relational_db_sync_interface.h"
19 #include "single_ver_sync_engine.h"
20
21 namespace DistributedDB {
Initialize(ISyncInterface * syncInterface,bool isNeedActive)22 int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
23 {
24 int errCode = SingleVerSyncer::Initialize(syncInterface, isNeedActive);
25 if (errCode != E_OK) {
26 return errCode;
27 }
28 auto callback = std::bind(&SingleVerRelationalSyncer::SchemaChangeCallback, this);
29 return static_cast<RelationalDBSyncInterface *>(syncInterface)->
30 RegisterSchemaChangedCallback(callback);
31 }
32
Sync(const SyncParma & param,uint64_t connectionId)33 int SingleVerRelationalSyncer::Sync(const SyncParma ¶m, uint64_t connectionId)
34 {
35 if (param.mode == SYNC_MODE_PUSH_PULL) {
36 return -E_NOT_SUPPORT;
37 }
38 if (param.syncQuery.GetRelationTableName().empty()) {
39 return -E_NOT_SUPPORT;
40 }
41 return GenericSyncer::Sync(param, connectionId);
42 }
43
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)44 int SingleVerRelationalSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId)
45 {
46 const auto &syncInterface = static_cast<RelationalDBSyncInterface *>(syncInterface_);
47 std::vector<QuerySyncObject> tablesQuery;
48 if (param.isQuerySync) {
49 tablesQuery.push_back(param.syncQuery);
50 } else {
51 tablesQuery = syncInterface->GetTablesQuery();
52 }
53 std::set<uint32_t> subSyncIdSet;
54 int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet);
55 if (errCode != E_OK) {
56 DoRollBack(subSyncIdSet);
57 return errCode;
58 }
59 if (param.wait) {
60 DoOnComplete(param, syncId);
61 }
62 return E_OK;
63 }
64
GenerateEachSyncTask(const SyncParma & param,uint32_t syncId,const std::vector<QuerySyncObject> & tablesQuery,uint64_t connectionId,std::set<uint32_t> & subSyncIdSet)65 int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma ¶m, uint32_t syncId,
66 const std::vector<QuerySyncObject> &tablesQuery, uint64_t connectionId, std::set<uint32_t> &subSyncIdSet)
67 {
68 SyncParma subParam = param;
69 subParam.isQuerySync = true;
70 int errCode = E_OK;
71 for (const QuerySyncObject &table : tablesQuery) {
72 uint32_t subSyncId = GenerateSyncId();
73 std::string hashTableName = DBCommon::TransferHashString(table.GetRelationTableName());
74 LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
75 subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
76 subParam.syncQuery = table;
77 subParam.onComplete = std::bind(&SingleVerRelationalSyncer::DoOnSubSyncComplete, this, subSyncId,
78 syncId, param, std::placeholders::_1);
79 {
80 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
81 fullSyncIdMap_[syncId].insert(subSyncId);
82 }
83 errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId);
84 if (errCode != E_OK) {
85 LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode);
86 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
87 fullSyncIdMap_[syncId].erase(subSyncId);
88 break;
89 }
90 subSyncIdSet.insert(subSyncId);
91 }
92 return errCode;
93 }
94
DoOnSubSyncComplete(const uint32_t subSyncId,const uint32_t syncId,const SyncParma & param,const std::map<std::string,int> & devicesMap)95 void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
96 const SyncParma ¶m, const std::map<std::string, int> &devicesMap)
97 {
98 bool allFinish = true;
99 {
100 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
101 fullSyncIdMap_[syncId].erase(subSyncId);
102 allFinish = fullSyncIdMap_[syncId].empty();
103 for (const auto &item : devicesMap) {
104 resMap_[syncId][item.first][param.syncQuery.GetRelationTableName()] = static_cast<int>(item.second);
105 }
106 }
107 // block sync do callback in sync function
108 if (allFinish && !param.wait) {
109 DoOnComplete(param, syncId);
110 }
111 }
112
DoRollBack(std::set<uint32_t> & subSyncIdSet)113 void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
114 {
115 for (const auto &removeId : subSyncIdSet) {
116 int retCode = RemoveSyncOperation(static_cast<int>(removeId));
117 if (retCode != E_OK) {
118 LOGW("[SingleVerRelationalSyncer] RemoveSyncOperation failed errCode:%d, syncId:%d", retCode, removeId);
119 }
120 }
121 }
122
DoOnComplete(const SyncParma & param,uint32_t syncId)123 void SingleVerRelationalSyncer::DoOnComplete(const SyncParma ¶m, uint32_t syncId)
124 {
125 if (!param.relationOnComplete) {
126 return;
127 }
128 std::map<std::string, std::vector<TableStatus>> syncRes;
129 std::map<std::string, std::map<std::string, int>> tmpMap;
130 {
131 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
132 tmpMap = resMap_[syncId];
133 }
134 for (const auto &devicesRes : tmpMap) {
135 for (const auto &tableRes : devicesRes.second) {
136 syncRes[devicesRes.first].push_back(
137 {tableRes.first, static_cast<DBStatus>(tableRes.second)});
138 }
139 }
140 param.relationOnComplete(syncRes);
141 {
142 std::lock_guard<std::mutex> lockGuard(syncMapLock_);
143 resMap_.erase(syncId);
144 fullSyncIdMap_.erase(syncId);
145 }
146 }
147
EnableAutoSync(bool enable)148 void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
149 {
150 (void)enable;
151 }
152
LocalDataChanged(int notifyEvent)153 void SingleVerRelationalSyncer::LocalDataChanged(int notifyEvent)
154 {
155 (void)notifyEvent;
156 }
157
SchemaChangeCallback()158 void SingleVerRelationalSyncer::SchemaChangeCallback()
159 {
160 if (syncEngine_ == nullptr) {
161 return;
162 }
163 RefObject::IncObjRef(syncEngine_);
164 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] {
165 syncEngine_->SchemaChange();
166 RefObject::DecObjRef(syncEngine_);
167 });
168 if (errCode != E_OK) {
169 LOGE("[SchemaChangeCallback] SchemaChangeCallback retCode:%d", errCode);
170 RefObject::DecObjRef(syncEngine_);
171 }
172 }
173
SyncConditionCheck(QuerySyncObject & query,int mode,bool isQuerySync,const std::vector<std::string> & devices) const174 int SingleVerRelationalSyncer::SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync,
175 const std::vector<std::string> &devices) const
176 {
177 if (!isQuerySync) {
178 return E_OK;
179 }
180 int errCode = static_cast<RelationalDBSyncInterface *>(syncInterface_)->CheckAndInitQueryCondition(query);
181 if (errCode != E_OK) {
182 LOGE("[SingleVerRelationalSyncer] QuerySyncObject check failed");
183 return errCode;
184 }
185 if (mode == SUBSCRIBE_QUERY) {
186 return -E_NOT_SUPPORT;
187 }
188 return E_OK;
189 }
190 }
191 #endif