1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_data_sync.h"
17
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31
32 namespace DistributedDB {
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)33 int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
34 {
35 std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
36 if (subManager == nullptr) {
37 return -E_INVALID_ARGS;
38 }
39 int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
40 if (errCode != E_OK) {
41 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
42 return errCode;
43 }
44 const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
45 if (packet == nullptr) {
46 return -E_INVALID_ARGS;
47 }
48 int32_t recvCode = packet->GetRecvCode();
49 uint32_t cmdType = packet->GetcontrolCmdType();
50 if (recvCode != E_OK) {
51 LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
52 STR_MASK(GetDeviceId()), cmdType);
53 // for unsubscribe no need to do something
54 SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
55 return recvCode;
56 }
57 if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
58 errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
59 } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
60 subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
61 }
62 if (errCode != E_OK) {
63 LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
64 return errCode;
65 }
66 return -E_NO_DATA_SEND; // means control msg send finished
67 }
68
ControlCmdStartCheck(SingleVerSyncTaskContext * context)69 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
70 {
71 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
72 (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
73 LOGE("[ControlCmdStartCheck] not support controlCmd");
74 return -E_INVALID_ARGS;
75 }
76 if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
77 context->GetQuery().HasInKeys() &&
78 context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
79 return -E_NOT_SUPPORT;
80 }
81 if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
82 return E_OK;
83 }
84 bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
85 if (permitReceive) {
86 context->SetReceivcPermitCheck(true);
87 } else {
88 return -E_SECURITY_OPTION_CHECK_ERROR;
89 }
90 return E_OK;
91 }
92
SendControlPacket(SubscribeRequest * packet,SingleVerSyncTaskContext * context)93 int SingleVerDataSync::SendControlPacket(SubscribeRequest *packet, SingleVerSyncTaskContext *context)
94 {
95 Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
96 if (message == nullptr) {
97 LOGE("[DataSync][SendControlPacket] new message error");
98 delete packet;
99 packet = nullptr;
100 return -E_OUT_OF_MEMORY;
101 }
102 uint32_t packetLen = packet->CalculateLen();
103 int errCode = message->SetExternalObject(packet);
104 if (errCode != E_OK) {
105 delete message;
106 message = nullptr;
107 delete packet;
108 packet = nullptr;
109 LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
110 return errCode;
111 }
112 SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
113 context->GetSequenceId(), context->GetRequestSessionId());
114 CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
115 SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
116 };
117 errCode = Send(context, message, handler, packetLen);
118 if (errCode != E_OK) {
119 delete message;
120 message = nullptr;
121 }
122 return errCode;
123 }
124
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)125 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
126 uint32_t controlCmdType, const CommErrHandler &handler)
127 {
128 Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
129 if (ackMessage == nullptr) {
130 LOGE("[DataSync][SendControlAck] new message error");
131 return -E_OUT_OF_MEMORY;
132 }
133 uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
134 ControlAckPacket ack;
135 ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
136 int errCode = ackMessage->SetCopiedObject(ack);
137 if (errCode != E_OK) {
138 delete ackMessage;
139 ackMessage = nullptr;
140 LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
141 return errCode;
142 }
143 SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
144 message->GetSequenceId(), message->GetSessionId());
145 errCode = Send(context, ackMessage, handler, 0);
146 if (errCode != E_OK) {
147 delete ackMessage;
148 ackMessage = nullptr;
149 }
150 return errCode;
151 }
152
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)153 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
154 {
155 if (context == nullptr || message == nullptr) {
156 return -E_INVALID_ARGS;
157 }
158 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
159 if (packet == nullptr) {
160 return -E_INVALID_ARGS;
161 }
162 uint32_t controlCmdType = packet->GetcontrolCmdType();
163 if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
164 return DoAbilitySyncIfNeed(context, message, true);
165 }
166 if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
167 SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
168 return -E_WAIT_NEXT_MESSAGE;
169 }
170 return E_OK;
171 }
172
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)173 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
174 const Message *message)
175 {
176 uint32_t controlCmdType = packet->GetcontrolCmdType();
177 if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
178 return E_OK;
179 }
180 QuerySyncObject syncQuery = packet->GetQuery();
181 int errCode;
182 if (!packet->IsAutoSubscribe()) {
183 errCode = storage_->CheckAndInitQueryCondition(syncQuery);
184 if (errCode != E_OK) {
185 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
186 SendControlAck(context, message, errCode, controlCmdType);
187 return -E_WAIT_NEXT_MESSAGE;
188 }
189 }
190 int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
191 static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
192 if (mode >= SyncModeType::INVALID_MODE) {
193 LOGE("[SingleVerDataSync] invalid mode");
194 SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
195 return -E_WAIT_NEXT_MESSAGE;
196 }
197 errCode = CheckPermitSendData(mode, context);
198 if (errCode != E_OK) {
199 LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
200 SendControlAck(context, message, errCode, controlCmdType);
201 }
202 return errCode;
203 }
204
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)205 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
206 {
207 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
208 if (packet == nullptr) {
209 return -E_INVALID_ARGS;
210 }
211 int errCode = SubscribeRequestRecvPre(context, packet, message);
212 if (errCode != E_OK) {
213 return errCode;
214 }
215 uint32_t controlCmdType = packet->GetcontrolCmdType();
216 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
217 if (subscribeManager == nullptr) {
218 LOGE("[SingleVerDataSync] subscribeManager check failed");
219 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
220 return -E_INVALID_ARGS;
221 }
222 errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
223 if (errCode != E_OK) {
224 LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
225 STR_MASK(GetDeviceId()));
226 SendControlAck(context, message, errCode, controlCmdType);
227 return errCode;
228 }
229 errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
230 if (errCode != E_OK) {
231 LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
232 STR_MASK(GetDeviceId()));
233 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
234 SendControlAck(context, message, errCode, controlCmdType);
235 return errCode;
236 }
237 errCode = SendControlAck(context, message, E_OK, controlCmdType);
238 if (errCode != E_OK) {
239 subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
240 RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
241 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
242 STR_MASK(GetDeviceId()));
243 return errCode;
244 }
245 subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
246 DBInfo dbInfo;
247 storage_->GetDBInfo(dbInfo);
248 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
249 return errCode;
250 }
251
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)252 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
253 {
254 const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
255 if (packet == nullptr) {
256 return -E_INVALID_ARGS;
257 }
258 uint32_t controlCmdType = packet->GetcontrolCmdType();
259 std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
260 if (subscribeManager == nullptr) {
261 LOGE("[SingleVerDataSync] subscribeManager check failed");
262 SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
263 return -E_INVALID_ARGS;
264 }
265 int errCode;
266 std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
267 if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
268 errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
269 if (errCode != E_OK) {
270 LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
271 STR_MASK(GetDeviceId()));
272 SendControlAck(context, message, errCode, controlCmdType);
273 return errCode;
274 }
275 }
276 errCode = SendControlAck(context, message, E_OK, controlCmdType);
277 if (errCode != E_OK) {
278 LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
279 STR_MASK(GetDeviceId()));
280 return errCode;
281 }
282 subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
283 DBInfo dbInfo;
284 storage_->GetDBInfo(dbInfo);
285 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
286 metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
287 return errCode;
288 }
289
PutDataMsg(Message * message)290 void SingleVerDataSync::PutDataMsg(Message *message)
291 {
292 return msgSchedule_.PutMsg(message);
293 }
294
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)295 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
296 bool &isNeedContinue)
297 {
298 return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
299 }
300
IsNeedReloadQueue()301 bool SingleVerDataSync::IsNeedReloadQueue()
302 {
303 return msgSchedule_.IsNeedReloadQueue();
304 }
305
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)306 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
307 {
308 msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
309 }
310
ClearDataMsg()311 void SingleVerDataSync::ClearDataMsg()
312 {
313 msgSchedule_.ClearMsg();
314 }
315
QuerySyncCheck(SingleVerSyncTaskContext * context)316 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
317 {
318 if (context == nullptr) {
319 return -E_INVALID_ARGS;
320 }
321 bool isCheckStatus = false;
322 int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
323 if (errCode != E_OK) {
324 return errCode;
325 }
326 if (!isCheckStatus) {
327 context->SetTaskErrCode(-E_NOT_SUPPORT);
328 return -E_NOT_SUPPORT;
329 }
330 return E_OK;
331 }
332
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)333 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
334 const std::shared_ptr<SubscribeManager> &subscribeManager)
335 {
336 if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
337 storage_->RemoveSubscribe(queryId);
338 }
339 }
340 } // namespace DistributedDB
341