• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "single_ver_relational_syncer.h"
16 #ifdef RELATIONAL_STORE
17 #include "db_common.h"
18 #include "relational_db_sync_interface.h"
19 #include "single_ver_sync_engine.h"
20 
21 namespace DistributedDB {
Initialize(ISyncInterface * syncInterface,bool isNeedActive)22 int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool isNeedActive)
23 {
24     int errCode = SingleVerSyncer::Initialize(syncInterface, isNeedActive);
25     if (errCode != E_OK) {
26         return errCode;
27     }
28     auto callback = std::bind(&SingleVerRelationalSyncer::SchemaChangeCallback, this);
29     return static_cast<RelationalDBSyncInterface *>(syncInterface)->
30         RegisterSchemaChangedCallback(callback);
31 }
32 
Sync(const SyncParma & param,uint64_t connectionId)33 int SingleVerRelationalSyncer::Sync(const SyncParma &param, uint64_t connectionId)
34 {
35     if (param.mode == SYNC_MODE_PUSH_PULL) {
36         return -E_NOT_SUPPORT;
37     }
38     if (param.syncQuery.GetRelationTableName().empty()) {
39         return -E_NOT_SUPPORT;
40     }
41     return GenericSyncer::Sync(param, connectionId);
42 }
43 
PrepareSync(const SyncParma & param,uint32_t syncId,uint64_t connectionId)44 int SingleVerRelationalSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
45 {
46     const auto &syncInterface = static_cast<RelationalDBSyncInterface *>(syncInterface_);
47     std::vector<QuerySyncObject> tablesQuery;
48     if (param.isQuerySync) {
49         tablesQuery.push_back(param.syncQuery);
50     } else {
51         tablesQuery = syncInterface->GetTablesQuery();
52     }
53     std::set<uint32_t> subSyncIdSet;
54     int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet);
55     if (errCode != E_OK) {
56         DoRollBack(subSyncIdSet);
57         return errCode;
58     }
59     if (param.wait) {
60         DoOnComplete(param, syncId);
61     }
62     return E_OK;
63 }
64 
GenerateEachSyncTask(const SyncParma & param,uint32_t syncId,const std::vector<QuerySyncObject> & tablesQuery,uint64_t connectionId,std::set<uint32_t> & subSyncIdSet)65 int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma &param, uint32_t syncId,
66     const std::vector<QuerySyncObject> &tablesQuery, uint64_t connectionId, std::set<uint32_t> &subSyncIdSet)
67 {
68     SyncParma subParam = param;
69     subParam.isQuerySync = true;
70     int errCode = E_OK;
71     for (const QuerySyncObject &table : tablesQuery) {
72         uint32_t subSyncId = GenerateSyncId();
73         std::string hashTableName = DBCommon::TransferHashString(table.GetRelationTableName());
74         LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
75             subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
76         subParam.syncQuery = table;
77         subParam.onComplete = std::bind(&SingleVerRelationalSyncer::DoOnSubSyncComplete, this, subSyncId,
78             syncId, param, std::placeholders::_1);
79         {
80             std::lock_guard<std::mutex> lockGuard(syncMapLock_);
81             fullSyncIdMap_[syncId].insert(subSyncId);
82         }
83         errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId);
84         if (errCode != E_OK) {
85             LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode);
86             std::lock_guard<std::mutex> lockGuard(syncMapLock_);
87             fullSyncIdMap_[syncId].erase(subSyncId);
88             break;
89         }
90         subSyncIdSet.insert(subSyncId);
91     }
92     return errCode;
93 }
94 
DoOnSubSyncComplete(const uint32_t subSyncId,const uint32_t syncId,const SyncParma & param,const std::map<std::string,int> & devicesMap)95 void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
96     const SyncParma &param, const std::map<std::string, int> &devicesMap)
97 {
98     bool allFinish = true;
99     {
100         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
101         fullSyncIdMap_[syncId].erase(subSyncId);
102         allFinish = fullSyncIdMap_[syncId].empty();
103         for (const auto &item : devicesMap) {
104             resMap_[syncId][item.first][param.syncQuery.GetRelationTableName()] = static_cast<int>(item.second);
105         }
106     }
107     // block sync do callback in sync function
108     if (allFinish && !param.wait) {
109         DoOnComplete(param, syncId);
110     }
111 }
112 
DoRollBack(std::set<uint32_t> & subSyncIdSet)113 void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
114 {
115     for (const auto &removeId : subSyncIdSet) {
116         int retCode = RemoveSyncOperation(static_cast<int>(removeId));
117         if (retCode != E_OK) {
118             LOGW("[SingleVerRelationalSyncer] RemoveSyncOperation failed errCode:%d, syncId:%d", retCode, removeId);
119         }
120     }
121 }
122 
DoOnComplete(const SyncParma & param,uint32_t syncId)123 void SingleVerRelationalSyncer::DoOnComplete(const SyncParma &param, uint32_t syncId)
124 {
125     if (!param.relationOnComplete) {
126         return;
127     }
128     std::map<std::string, std::vector<TableStatus>> syncRes;
129     std::map<std::string, std::map<std::string, int>> tmpMap;
130     {
131         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
132         tmpMap = resMap_[syncId];
133     }
134     for (const auto &devicesRes : tmpMap) {
135         for (const auto &tableRes : devicesRes.second) {
136             syncRes[devicesRes.first].push_back(
137                 {tableRes.first, static_cast<DBStatus>(tableRes.second)});
138         }
139     }
140     param.relationOnComplete(syncRes);
141     {
142         std::lock_guard<std::mutex> lockGuard(syncMapLock_);
143         resMap_.erase(syncId);
144         fullSyncIdMap_.erase(syncId);
145     }
146 }
147 
EnableAutoSync(bool enable)148 void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
149 {
150     (void)enable;
151 }
152 
LocalDataChanged(int notifyEvent)153 void SingleVerRelationalSyncer::LocalDataChanged(int notifyEvent)
154 {
155     (void)notifyEvent;
156 }
157 
SchemaChangeCallback()158 void SingleVerRelationalSyncer::SchemaChangeCallback()
159 {
160     if (syncEngine_ == nullptr) {
161         return;
162     }
163     RefObject::IncObjRef(syncEngine_);
164     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this] {
165         syncEngine_->SchemaChange();
166         RefObject::DecObjRef(syncEngine_);
167     });
168     if (errCode != E_OK) {
169         LOGE("[SchemaChangeCallback] SchemaChangeCallback retCode:%d", errCode);
170         RefObject::DecObjRef(syncEngine_);
171     }
172 }
173 
SyncConditionCheck(QuerySyncObject & query,int mode,bool isQuerySync,const std::vector<std::string> & devices) const174 int SingleVerRelationalSyncer::SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync,
175     const std::vector<std::string> &devices) const
176 {
177     if (!isQuerySync) {
178         return E_OK;
179     }
180     int errCode = static_cast<RelationalDBSyncInterface *>(syncInterface_)->CheckAndInitQueryCondition(query);
181     if (errCode != E_OK) {
182         LOGE("[SingleVerRelationalSyncer] QuerySyncObject check failed");
183         return errCode;
184     }
185     if (mode == SUBSCRIBE_QUERY) {
186         return -E_NOT_SUPPORT;
187     }
188     return E_OK;
189 }
190 }
191 #endif