1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "db_common.h"
18 #include "db_dfx_adapter.h"
19 #include "log_print.h"
20 #include "multi_ver_sync_state_machine.h"
21 #include "multi_ver_sync_target.h"
22 #include "multi_ver_sync_task_context.h"
23
24 namespace DistributedDB {
DEFINE_OBJECT_TAG_FACILITIES(MultiVerSyncTaskContext)25 DEFINE_OBJECT_TAG_FACILITIES(MultiVerSyncTaskContext)
26
27 MultiVerSyncTaskContext::~MultiVerSyncTaskContext()
28 {
29 }
30
Initialize(const std::string & deviceId,ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)31 int MultiVerSyncTaskContext::Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
32 std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
33 {
34 if (deviceId.empty() || (syncInterface == nullptr) || (communicator == nullptr)) {
35 return -E_INVALID_ARGS;
36 }
37 syncInterface_ = syncInterface;
38 communicator_ = communicator;
39 deviceId_ = deviceId;
40 taskExecStatus_ = INIT;
41 isAutoSync_ = true;
42 timeHelper_ = std::make_unique<TimeHelper>();
43 int errCode = timeHelper_->Initialize(syncInterface, metadata);
44 if (errCode != E_OK) {
45 LOGE("[MultiVerSyncTaskContext] timeHelper Initialize failed, err %d.", errCode);
46 return errCode;
47 }
48
49 stateMachine_ = new (std::nothrow) MultiVerSyncStateMachine;
50 if (stateMachine_ == nullptr) {
51 return -E_OUT_OF_MEMORY;
52 }
53
54 errCode = stateMachine_->Initialize(this, syncInterface, metadata, communicator);
55 TimerAction timeOutCallback = std::bind(&SyncStateMachine::TimeoutCallback,
56 static_cast<MultiVerSyncStateMachine *>(stateMachine_),
57 std::placeholders::_1);
58 SetTimeoutCallback(timeOutCallback);
59 OnKill([this]() { this->KillWait(); });
60 {
61 std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
62 synTaskContextSet_.insert(this);
63 }
64 std::vector<uint8_t> label = syncInterface_->GetIdentifier();
65 label.resize(3); // only show 3 bytes
66 syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" +
67 DBCommon::VectorToHexString(label) + "_" + deviceId_.c_str();
68 return errCode;
69 }
70
AddSyncOperation(SyncOperation * operation)71 int MultiVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
72 {
73 if (operation == nullptr) {
74 return -E_INVALID_ARGS;
75 }
76
77 if (operation->IsAutoSync() && !IsTargetQueueEmpty()) {
78 LOGI("[MultiVerSyncTaskContext] Exist operation in queue, skip it!");
79 operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
80 return E_OK;
81 }
82
83 MultiVerSyncTarget *target = new (std::nothrow) MultiVerSyncTarget;
84 if (target == nullptr) {
85 return -E_OUT_OF_MEMORY;
86 }
87 target->SetSyncOperation(operation);
88 target->SetTaskType(ISyncTarget::REQUEST);
89 AddSyncTarget(target);
90 return E_OK;
91 }
92
GetCommitIndex() const93 int MultiVerSyncTaskContext::GetCommitIndex() const
94 {
95 return commitsIndex_;
96 }
97
SetCommitIndex(int index)98 void MultiVerSyncTaskContext::SetCommitIndex(int index)
99 {
100 commitsIndex_ = index;
101 }
102
GetEntriesIndex() const103 int MultiVerSyncTaskContext::GetEntriesIndex() const
104 {
105 return entriesIndex_;
106 }
107
SetEntriesIndex(int index)108 void MultiVerSyncTaskContext::SetEntriesIndex(int index)
109 {
110 entriesIndex_ = index;
111 }
112
GetValueSlicesIndex() const113 int MultiVerSyncTaskContext::GetValueSlicesIndex() const
114 {
115 return valueSlicesIndex_;
116 }
117
SetValueSlicesIndex(int index)118 void MultiVerSyncTaskContext::SetValueSlicesIndex(int index)
119 {
120 valueSlicesIndex_ = index;
121 }
122
GetCommits(std::vector<MultiVerCommitNode> & commits)123 void MultiVerSyncTaskContext::GetCommits(std::vector<MultiVerCommitNode> &commits)
124 {
125 commits = commits_;
126 }
127
SetCommits(const std::vector<MultiVerCommitNode> & commits)128 void MultiVerSyncTaskContext::SetCommits(const std::vector<MultiVerCommitNode> &commits)
129 {
130 commits_ = commits;
131 }
132
GetCommit(int index,MultiVerCommitNode & commit) const133 void MultiVerSyncTaskContext::GetCommit(int index, MultiVerCommitNode &commit) const
134 {
135 commit = commits_[index];
136 }
137
SetCommit(int index,const MultiVerCommitNode & commit)138 void MultiVerSyncTaskContext::SetCommit(int index, const MultiVerCommitNode &commit)
139 {
140 commits_[index] = commit;
141 }
142
SetEntries(const std::vector<MultiVerKvEntry * > & entries)143 void MultiVerSyncTaskContext::SetEntries(const std::vector<MultiVerKvEntry *> &entries)
144 {
145 entries_ = entries;
146 }
147
ReleaseEntries(void)148 void MultiVerSyncTaskContext::ReleaseEntries(void)
149 {
150 for (auto &item : entries_) {
151 if (syncInterface_ != nullptr) {
152 static_cast<MultiVerKvDBSyncInterface *>(syncInterface_)->ReleaseKvEntry(item);
153 }
154 item = nullptr;
155 }
156 entries_.clear();
157 entries_.shrink_to_fit();
158 }
159
GetEntries(std::vector<MultiVerKvEntry * > & entries) const160 void MultiVerSyncTaskContext::GetEntries(std::vector<MultiVerKvEntry *> &entries) const
161 {
162 entries = entries_;
163 }
164
GetEntry(int index,MultiVerKvEntry * & entry)165 void MultiVerSyncTaskContext::GetEntry(int index, MultiVerKvEntry *&entry)
166 {
167 entry = entries_[index];
168 }
169
SetCommitsSize(int commitsSize)170 void MultiVerSyncTaskContext::SetCommitsSize(int commitsSize)
171 {
172 commitsSize_ = commitsSize;
173 }
174
GetCommitsSize() const175 int MultiVerSyncTaskContext::GetCommitsSize() const
176 {
177 return commitsSize_;
178 }
179
SetEntriesSize(int entriesSize)180 void MultiVerSyncTaskContext::SetEntriesSize(int entriesSize)
181 {
182 entriesSize_ = entriesSize;
183 }
184
GetEntriesSize() const185 int MultiVerSyncTaskContext::GetEntriesSize() const
186 {
187 return entriesSize_;
188 }
189
SetValueSlicesSize(int valueSlicesSize)190 void MultiVerSyncTaskContext::SetValueSlicesSize(int valueSlicesSize)
191 {
192 valueSlicesSize_ = valueSlicesSize;
193 }
194
GetValueSlicesSize() const195 int MultiVerSyncTaskContext::GetValueSlicesSize() const
196 {
197 return valueSlicesSize_;
198 }
199
GetValueSliceHashNode(int index,ValueSliceHash & hashNode) const200 void MultiVerSyncTaskContext::GetValueSliceHashNode(int index, ValueSliceHash &hashNode) const
201 {
202 hashNode = valueSliceHashNodes_[index];
203 }
204
SetValueSliceHashNodes(const std::vector<ValueSliceHash> & valueSliceHashNodes)205 void MultiVerSyncTaskContext::SetValueSliceHashNodes(const std::vector<ValueSliceHash> &valueSliceHashNodes)
206 {
207 valueSliceHashNodes_ = valueSliceHashNodes;
208 }
209
GetValueSliceHashNodes(std::vector<ValueSliceHash> & valueSliceHashNodes) const210 void MultiVerSyncTaskContext::GetValueSliceHashNodes(std::vector<ValueSliceHash> &valueSliceHashNodes) const
211 {
212 valueSliceHashNodes = valueSliceHashNodes_;
213 }
214
Clear()215 void MultiVerSyncTaskContext::Clear()
216 {
217 commits_.clear();
218 commits_.shrink_to_fit();
219 ReleaseEntries();
220 valueSliceHashNodes_.clear();
221 valueSliceHashNodes_.shrink_to_fit();
222 commitsIndex_ = 0;
223 commitsSize_ = 0;
224 entriesIndex_ = 0;
225 entriesSize_ = 0;
226 valueSlicesIndex_ = 0;
227 valueSlicesSize_ = 0;
228 retryTime_ = 0;
229 isNeedRetry_ = NO_NEED_RETRY;
230 StopTimer();
231 sequenceId_ = 1; // minimum valid ID : 1
232 syncId_ = 0;
233 }
234
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)235 void MultiVerSyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
236 {
237 SyncTaskContext::CopyTargetData(target, taskParam);
238 }
239 }
240 #endif