• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "db_common.h"
18 #include "db_dfx_adapter.h"
19 #include "log_print.h"
20 #include "multi_ver_sync_state_machine.h"
21 #include "multi_ver_sync_target.h"
22 #include "multi_ver_sync_task_context.h"
23 
24 namespace DistributedDB {
DEFINE_OBJECT_TAG_FACILITIES(MultiVerSyncTaskContext)25 DEFINE_OBJECT_TAG_FACILITIES(MultiVerSyncTaskContext)
26 
27 MultiVerSyncTaskContext::~MultiVerSyncTaskContext()
28 {
29 }
30 
Initialize(const std::string & deviceId,ISyncInterface * syncInterface,std::shared_ptr<Metadata> & metadata,ICommunicator * communicator)31 int MultiVerSyncTaskContext::Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
32     std::shared_ptr<Metadata> &metadata, ICommunicator *communicator)
33 {
34     if (deviceId.empty() || (syncInterface == nullptr) || (communicator == nullptr)) {
35         return -E_INVALID_ARGS;
36     }
37     syncInterface_ = syncInterface;
38     communicator_ = communicator;
39     deviceId_ = deviceId;
40     taskExecStatus_ = INIT;
41     isAutoSync_ = true;
42     timeHelper_ = std::make_unique<TimeHelper>();
43     int errCode = timeHelper_->Initialize(syncInterface, metadata);
44     if (errCode != E_OK) {
45         LOGE("[MultiVerSyncTaskContext] timeHelper Initialize failed, err %d.", errCode);
46         return errCode;
47     }
48 
49     stateMachine_ = new (std::nothrow) MultiVerSyncStateMachine;
50     if (stateMachine_ == nullptr) {
51         return -E_OUT_OF_MEMORY;
52     }
53 
54     errCode = stateMachine_->Initialize(this, syncInterface, metadata, communicator);
55     TimerAction timeOutCallback = std::bind(&SyncStateMachine::TimeoutCallback,
56         static_cast<MultiVerSyncStateMachine *>(stateMachine_),
57         std::placeholders::_1);
58     SetTimeoutCallback(timeOutCallback);
59     OnKill([this]() { this->KillWait(); });
60     {
61         std::lock_guard<std::mutex> lock(synTaskContextSetLock_);
62         synTaskContextSet_.insert(this);
63     }
64     std::vector<uint8_t> label = syncInterface_->GetIdentifier();
65     label.resize(3); // only show 3 bytes
66     syncActionName_ = DBDfxAdapter::SYNC_ACTION + "_" +
67         DBCommon::VectorToHexString(label) + "_" + deviceId_.c_str();
68     return errCode;
69 }
70 
AddSyncOperation(SyncOperation * operation)71 int MultiVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
72 {
73     if (operation == nullptr) {
74         return -E_INVALID_ARGS;
75     }
76 
77     if (operation->IsAutoSync() && !IsTargetQueueEmpty()) {
78         LOGI("[MultiVerSyncTaskContext] Exist operation in queue, skip it!");
79         operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
80         return E_OK;
81     }
82 
83     MultiVerSyncTarget *target = new (std::nothrow) MultiVerSyncTarget;
84     if (target == nullptr) {
85         return -E_OUT_OF_MEMORY;
86     }
87     target->SetSyncOperation(operation);
88     target->SetTaskType(ISyncTarget::REQUEST);
89     AddSyncTarget(target);
90     return E_OK;
91 }
92 
GetCommitIndex() const93 int MultiVerSyncTaskContext::GetCommitIndex() const
94 {
95     return commitsIndex_;
96 }
97 
SetCommitIndex(int index)98 void MultiVerSyncTaskContext::SetCommitIndex(int index)
99 {
100     commitsIndex_ = index;
101 }
102 
GetEntriesIndex() const103 int MultiVerSyncTaskContext::GetEntriesIndex() const
104 {
105     return entriesIndex_;
106 }
107 
SetEntriesIndex(int index)108 void MultiVerSyncTaskContext::SetEntriesIndex(int index)
109 {
110     entriesIndex_ = index;
111 }
112 
GetValueSlicesIndex() const113 int  MultiVerSyncTaskContext::GetValueSlicesIndex() const
114 {
115     return valueSlicesIndex_;
116 }
117 
SetValueSlicesIndex(int index)118 void MultiVerSyncTaskContext::SetValueSlicesIndex(int index)
119 {
120     valueSlicesIndex_ = index;
121 }
122 
GetCommits(std::vector<MultiVerCommitNode> & commits)123 void MultiVerSyncTaskContext::GetCommits(std::vector<MultiVerCommitNode> &commits)
124 {
125     commits = commits_;
126 }
127 
SetCommits(const std::vector<MultiVerCommitNode> & commits)128 void MultiVerSyncTaskContext::SetCommits(const std::vector<MultiVerCommitNode> &commits)
129 {
130     commits_ = commits;
131 }
132 
GetCommit(int index,MultiVerCommitNode & commit) const133 void MultiVerSyncTaskContext::GetCommit(int index, MultiVerCommitNode &commit) const
134 {
135     commit = commits_[index];
136 }
137 
SetCommit(int index,const MultiVerCommitNode & commit)138 void MultiVerSyncTaskContext::SetCommit(int index, const MultiVerCommitNode &commit)
139 {
140     commits_[index] = commit;
141 }
142 
SetEntries(const std::vector<MultiVerKvEntry * > & entries)143 void MultiVerSyncTaskContext::SetEntries(const std::vector<MultiVerKvEntry *> &entries)
144 {
145     entries_ = entries;
146 }
147 
ReleaseEntries(void)148 void MultiVerSyncTaskContext::ReleaseEntries(void)
149 {
150     for (auto &item : entries_) {
151         if (syncInterface_ != nullptr) {
152             static_cast<MultiVerKvDBSyncInterface *>(syncInterface_)->ReleaseKvEntry(item);
153         }
154         item = nullptr;
155     }
156     entries_.clear();
157     entries_.shrink_to_fit();
158 }
159 
GetEntries(std::vector<MultiVerKvEntry * > & entries) const160 void MultiVerSyncTaskContext::GetEntries(std::vector<MultiVerKvEntry *> &entries) const
161 {
162     entries = entries_;
163 }
164 
GetEntry(int index,MultiVerKvEntry * & entry)165 void MultiVerSyncTaskContext::GetEntry(int index, MultiVerKvEntry *&entry)
166 {
167     entry = entries_[index];
168 }
169 
SetCommitsSize(int commitsSize)170 void MultiVerSyncTaskContext::SetCommitsSize(int commitsSize)
171 {
172     commitsSize_ = commitsSize;
173 }
174 
GetCommitsSize() const175 int MultiVerSyncTaskContext::GetCommitsSize() const
176 {
177     return commitsSize_;
178 }
179 
SetEntriesSize(int entriesSize)180 void MultiVerSyncTaskContext::SetEntriesSize(int entriesSize)
181 {
182     entriesSize_ = entriesSize;
183 }
184 
GetEntriesSize() const185 int MultiVerSyncTaskContext::GetEntriesSize() const
186 {
187     return entriesSize_;
188 }
189 
SetValueSlicesSize(int valueSlicesSize)190 void MultiVerSyncTaskContext::SetValueSlicesSize(int valueSlicesSize)
191 {
192     valueSlicesSize_ = valueSlicesSize;
193 }
194 
GetValueSlicesSize() const195 int MultiVerSyncTaskContext::GetValueSlicesSize() const
196 {
197     return valueSlicesSize_;
198 }
199 
GetValueSliceHashNode(int index,ValueSliceHash & hashNode) const200 void MultiVerSyncTaskContext::GetValueSliceHashNode(int index, ValueSliceHash &hashNode) const
201 {
202     hashNode = valueSliceHashNodes_[index];
203 }
204 
SetValueSliceHashNodes(const std::vector<ValueSliceHash> & valueSliceHashNodes)205 void MultiVerSyncTaskContext::SetValueSliceHashNodes(const std::vector<ValueSliceHash> &valueSliceHashNodes)
206 {
207     valueSliceHashNodes_ = valueSliceHashNodes;
208 }
209 
GetValueSliceHashNodes(std::vector<ValueSliceHash> & valueSliceHashNodes) const210 void MultiVerSyncTaskContext::GetValueSliceHashNodes(std::vector<ValueSliceHash> &valueSliceHashNodes) const
211 {
212     valueSliceHashNodes = valueSliceHashNodes_;
213 }
214 
Clear()215 void MultiVerSyncTaskContext::Clear()
216 {
217     commits_.clear();
218     commits_.shrink_to_fit();
219     ReleaseEntries();
220     valueSliceHashNodes_.clear();
221     valueSliceHashNodes_.shrink_to_fit();
222     commitsIndex_ = 0;
223     commitsSize_ = 0;
224     entriesIndex_ = 0;
225     entriesSize_ = 0;
226     valueSlicesIndex_ = 0;
227     valueSlicesSize_ = 0;
228     retryTime_ = 0;
229     isNeedRetry_ = NO_NEED_RETRY;
230     StopTimer();
231     sequenceId_ = 1; // minimum valid ID : 1
232     syncId_ = 0;
233 }
234 
CopyTargetData(const ISyncTarget * target,const TaskParam & taskParam)235 void MultiVerSyncTaskContext::CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam)
236 {
237     SyncTaskContext::CopyTargetData(target, taskParam);
238 }
239 }
240 #endif