• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_data_sync.h"
17 
18 #include "db_common.h"
19 #include "db_types.h"
20 #include "generic_single_ver_kv_entry.h"
21 #include "intercepted_data_impl.h"
22 #include "log_print.h"
23 #include "message_transform.h"
24 #include "performance_analysis.h"
25 #include "single_ver_data_sync_utils.h"
26 #include "single_ver_sync_state_machine.h"
27 #include "subscribe_manager.h"
28 #ifdef RELATIONAL_STORE
29 #include "relational_db_sync_interface.h"
30 #endif
31 
32 namespace DistributedDB {
ControlCmdAckRecv(SingleVerSyncTaskContext * context,const Message * message)33     int SingleVerDataSync::ControlCmdAckRecv(SingleVerSyncTaskContext *context, const Message *message)
34 {
35     std::shared_ptr<SubscribeManager> subManager = context->GetSubscribeManager();
36     if (subManager == nullptr) {
37         return -E_INVALID_ARGS;
38     }
39     int errCode = SingleVerDataSyncUtils::AckMsgErrnoCheck(context, message);
40     if (errCode != E_OK) {
41         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
42         return errCode;
43     }
44     const ControlAckPacket *packet = message->GetObject<ControlAckPacket>();
45     if (packet == nullptr) {
46         return -E_INVALID_ARGS;
47     }
48     int32_t recvCode = packet->GetRecvCode();
49     uint32_t cmdType = packet->GetcontrolCmdType();
50     if (recvCode != E_OK) {
51         LOGE("[DataSync][AckRecv] control sync abort,recvCode=%d,label=%s,dev=%s,type=%u", recvCode, label_.c_str(),
52             STR_MASK(GetDeviceId()), cmdType);
53         // for unsubscribe no need to do something
54         SingleVerDataSyncUtils::ControlAckErrorHandle(context, subManager);
55         return recvCode;
56     }
57     if (cmdType == ControlCmdType::SUBSCRIBE_QUERY_CMD) {
58         errCode = subManager->ActiveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
59     } else if (cmdType == ControlCmdType::UNSUBSCRIBE_QUERY_CMD) {
60         subManager->RemoveLocalSubscribeQuery(context->GetDeviceId(), context->GetQuery());
61     }
62     if (errCode != E_OK) {
63         LOGE("[DataSync] ack handle failed,label =%s,dev=%s,type=%u", label_.c_str(), STR_MASK(GetDeviceId()), cmdType);
64         return errCode;
65     }
66     return -E_NO_DATA_SEND; // means control msg send finished
67 }
68 
ControlCmdStartCheck(SingleVerSyncTaskContext * context)69 int SingleVerDataSync::ControlCmdStartCheck(SingleVerSyncTaskContext *context)
70 {
71     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) &&
72         (context->GetMode() != SyncModeType::UNSUBSCRIBE_QUERY)) {
73         LOGE("[ControlCmdStartCheck] not support controlCmd");
74         return -E_INVALID_ARGS;
75     }
76     if (context->GetMode() == SyncModeType::SUBSCRIBE_QUERY &&
77         context->GetQuery().HasInKeys() &&
78         context->IsNotSupportAbility(SyncConfig::INKEYS_QUERY)) {
79         return -E_NOT_SUPPORT;
80     }
81     if ((context->GetMode() != SyncModeType::SUBSCRIBE_QUERY) || context->GetReceivcPermitCheck()) {
82         return E_OK;
83     }
84     bool permitReceive = SingleVerDataSyncUtils::CheckPermitReceiveData(context, communicateHandle_, storage_);
85     if (permitReceive) {
86         context->SetReceivcPermitCheck(true);
87     } else {
88         return -E_SECURITY_OPTION_CHECK_ERROR;
89     }
90     return E_OK;
91 }
92 
SendControlPacket(SubscribeRequest * packet,SingleVerSyncTaskContext * context)93 int SingleVerDataSync::SendControlPacket(SubscribeRequest *packet, SingleVerSyncTaskContext *context)
94 {
95     Message *message = new (std::nothrow) Message(CONTROL_SYNC_MESSAGE);
96     if (message == nullptr) {
97         LOGE("[DataSync][SendControlPacket] new message error");
98         delete packet;
99         packet = nullptr;
100         return -E_OUT_OF_MEMORY;
101     }
102     uint32_t packetLen = packet->CalculateLen();
103     int errCode = message->SetExternalObject(packet);
104     if (errCode != E_OK) {
105         delete message;
106         message = nullptr;
107         delete packet;
108         packet = nullptr;
109         LOGE("[DataSync][SendControlPacket] set external object failed errCode=%d", errCode);
110         return errCode;
111     }
112     SingleVerDataSyncUtils::SetMessageHeadInfo(*message, TYPE_REQUEST, context->GetDeviceId(),
113         context->GetSequenceId(), context->GetRequestSessionId());
114     CommErrHandler handler = [this, context, sessionId = message->GetSessionId()](int ret, bool isDirectEnd) {
115         SyncTaskContext::CommErrHandlerFunc(ret, context, sessionId, isDirectEnd);
116     };
117     errCode = Send(context, message, handler, packetLen);
118     if (errCode != E_OK) {
119         delete message;
120         message = nullptr;
121     }
122     return errCode;
123 }
124 
SendControlAck(SingleVerSyncTaskContext * context,const Message * message,int32_t recvCode,uint32_t controlCmdType,const CommErrHandler & handler)125 int SingleVerDataSync::SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode,
126     uint32_t controlCmdType, const CommErrHandler &handler)
127 {
128     Message *ackMessage = new (std::nothrow) Message(message->GetMessageId());
129     if (ackMessage == nullptr) {
130         LOGE("[DataSync][SendControlAck] new message error");
131         return -E_OUT_OF_MEMORY;
132     }
133     uint32_t version = std::min(context->GetRemoteSoftwareVersion(), SOFTWARE_VERSION_CURRENT);
134     ControlAckPacket ack;
135     ack.SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), 0);
136     int errCode = ackMessage->SetCopiedObject(ack);
137     if (errCode != E_OK) {
138         delete ackMessage;
139         ackMessage = nullptr;
140         LOGE("[DataSync][SendControlAck] set copied object failed, errcode=%d", errCode);
141         return errCode;
142     }
143     SingleVerDataSyncUtils::SetMessageHeadInfo(*ackMessage, TYPE_RESPONSE, context->GetDeviceId(),
144         message->GetSequenceId(), message->GetSessionId());
145     errCode = Send(context, ackMessage, handler, 0);
146     if (errCode != E_OK) {
147         delete ackMessage;
148         ackMessage = nullptr;
149     }
150     return errCode;
151 }
152 
ControlCmdRequestRecvPre(SingleVerSyncTaskContext * context,const Message * message)153 int SingleVerDataSync::ControlCmdRequestRecvPre(SingleVerSyncTaskContext *context, const Message *message)
154 {
155     if (context == nullptr || message == nullptr) {
156         return -E_INVALID_ARGS;
157     }
158     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
159     if (packet == nullptr) {
160         return -E_INVALID_ARGS;
161     }
162     uint32_t controlCmdType = packet->GetcontrolCmdType();
163     if (context->GetRemoteSoftwareVersion() <= SOFTWARE_VERSION_BASE) {
164         return DoAbilitySyncIfNeed(context, message, true);
165     }
166     if (controlCmdType >= ControlCmdType::INVALID_CONTROL_CMD) {
167         SendControlAck(context, message, -E_NOT_SUPPORT, controlCmdType);
168         return -E_WAIT_NEXT_MESSAGE;
169     }
170     return E_OK;
171 }
172 
SubscribeRequestRecvPre(SingleVerSyncTaskContext * context,const SubscribeRequest * packet,const Message * message)173 int SingleVerDataSync::SubscribeRequestRecvPre(SingleVerSyncTaskContext *context, const SubscribeRequest *packet,
174     const Message *message)
175 {
176     uint32_t controlCmdType = packet->GetcontrolCmdType();
177     if (controlCmdType != ControlCmdType::SUBSCRIBE_QUERY_CMD) {
178         return E_OK;
179     }
180     QuerySyncObject syncQuery = packet->GetQuery();
181     int errCode;
182     if (!packet->IsAutoSubscribe()) {
183         errCode = storage_->CheckAndInitQueryCondition(syncQuery);
184         if (errCode != E_OK) {
185             LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
186             SendControlAck(context, message, errCode, controlCmdType);
187             return -E_WAIT_NEXT_MESSAGE;
188         }
189     }
190     int mode = SingleVerDataSyncUtils::GetModeByControlCmdType(
191         static_cast<ControlCmdType>(packet->GetcontrolCmdType()));
192     if (mode >= SyncModeType::INVALID_MODE) {
193         LOGE("[SingleVerDataSync] invalid mode");
194         SendControlAck(context, message, -E_INVALID_ARGS, controlCmdType);
195         return -E_WAIT_NEXT_MESSAGE;
196     }
197     errCode = CheckPermitSendData(mode, context);
198     if (errCode != E_OK) {
199         LOGE("[SingleVerDataSync] check sync query failed,errCode=%d", errCode);
200         SendControlAck(context, message, errCode, controlCmdType);
201     }
202     return errCode;
203 }
204 
SubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)205 int SingleVerDataSync::SubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
206 {
207     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
208     if (packet == nullptr) {
209         return -E_INVALID_ARGS;
210     }
211     int errCode = SubscribeRequestRecvPre(context, packet, message);
212     if (errCode != E_OK) {
213         return errCode;
214     }
215     uint32_t controlCmdType = packet->GetcontrolCmdType();
216     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
217     if (subscribeManager == nullptr) {
218         LOGE("[SingleVerDataSync] subscribeManager check failed");
219         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
220         return -E_INVALID_ARGS;
221     }
222     errCode = storage_->AddSubscribe(packet->GetQuery().GetIdentify(), packet->GetQuery(), packet->IsAutoSubscribe());
223     if (errCode != E_OK) {
224         LOGE("[SingleVerDataSync] add trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
225             STR_MASK(GetDeviceId()));
226         SendControlAck(context, message, errCode, controlCmdType);
227         return errCode;
228     }
229     errCode = subscribeManager->ReserveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
230     if (errCode != E_OK) {
231         LOGE("[SingleVerDataSync] add remote subscribe query failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
232             STR_MASK(GetDeviceId()));
233         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
234         SendControlAck(context, message, errCode, controlCmdType);
235         return errCode;
236     }
237     errCode = SendControlAck(context, message, E_OK, controlCmdType);
238     if (errCode != E_OK) {
239         subscribeManager->DeleteRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
240         RemoveSubscribeIfNeed(packet->GetQuery().GetIdentify(), subscribeManager);
241         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
242             STR_MASK(GetDeviceId()));
243         return errCode;
244     }
245     subscribeManager->ActiveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
246     DBInfo dbInfo;
247     storage_->GetDBInfo(dbInfo);
248     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
249     return errCode;
250 }
251 
UnsubscribeRequestRecv(SingleVerSyncTaskContext * context,const Message * message)252 int SingleVerDataSync::UnsubscribeRequestRecv(SingleVerSyncTaskContext *context, const Message *message)
253 {
254     const SubscribeRequest *packet = message->GetObject<SubscribeRequest>();
255     if (packet == nullptr) {
256         return -E_INVALID_ARGS;
257     }
258     uint32_t controlCmdType = packet->GetcontrolCmdType();
259     std::shared_ptr<SubscribeManager> subscribeManager = context->GetSubscribeManager();
260     if (subscribeManager == nullptr) {
261         LOGE("[SingleVerDataSync] subscribeManager check failed");
262         SendControlAck(context, message, -E_NOT_REGISTER, controlCmdType);
263         return -E_INVALID_ARGS;
264     }
265     int errCode;
266     std::lock_guard<std::mutex> autoLock(unsubscribeLock_);
267     if (subscribeManager->IsLastRemoteContainSubscribe(context->GetDeviceId(), packet->GetQuery().GetIdentify())) {
268         errCode = storage_->RemoveSubscribe(packet->GetQuery().GetIdentify());
269         if (errCode != E_OK) {
270             LOGE("[SingleVerDataSync] remove trigger failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
271                 STR_MASK(GetDeviceId()));
272             SendControlAck(context, message, errCode, controlCmdType);
273             return errCode;
274         }
275     }
276     errCode = SendControlAck(context, message, E_OK, controlCmdType);
277     if (errCode != E_OK) {
278         LOGE("[SingleVerDataSync] send control msg failed,err=%d,label=%s,dev=%s", errCode, label_.c_str(),
279             STR_MASK(GetDeviceId()));
280         return errCode;
281     }
282     subscribeManager->RemoveRemoteSubscribeQuery(context->GetDeviceId(), packet->GetQuery());
283     DBInfo dbInfo;
284     storage_->GetDBInfo(dbInfo);
285     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, context->GetDeviceId(), packet->GetQuery());
286     metadata_->RemoveQueryFromRecordSet(context->GetDeviceId(), packet->GetQuery().GetIdentify());
287     return errCode;
288 }
289 
PutDataMsg(Message * message)290 void SingleVerDataSync::PutDataMsg(Message *message)
291 {
292     return msgSchedule_.PutMsg(message);
293 }
294 
MoveNextDataMsg(SingleVerSyncTaskContext * context,bool & isNeedHandle,bool & isNeedContinue)295 Message *SingleVerDataSync::MoveNextDataMsg(SingleVerSyncTaskContext *context, bool &isNeedHandle,
296     bool &isNeedContinue)
297 {
298     return msgSchedule_.MoveNextMsg(context, isNeedHandle, isNeedContinue);
299 }
300 
IsNeedReloadQueue()301 bool SingleVerDataSync::IsNeedReloadQueue()
302 {
303     return msgSchedule_.IsNeedReloadQueue();
304 }
305 
ScheduleInfoHandle(bool isNeedHandleStatus,bool isNeedClearMap,const Message * message)306 void SingleVerDataSync::ScheduleInfoHandle(bool isNeedHandleStatus, bool isNeedClearMap, const Message *message)
307 {
308     msgSchedule_.ScheduleInfoHandle(isNeedHandleStatus, isNeedClearMap, message);
309 }
310 
ClearDataMsg()311 void SingleVerDataSync::ClearDataMsg()
312 {
313     msgSchedule_.ClearMsg();
314 }
315 
QuerySyncCheck(SingleVerSyncTaskContext * context)316 int SingleVerDataSync::QuerySyncCheck(SingleVerSyncTaskContext *context)
317 {
318     if (context == nullptr) {
319         return -E_INVALID_ARGS;
320     }
321     bool isCheckStatus = false;
322     int errCode = SingleVerDataSyncUtils::QuerySyncCheck(context, isCheckStatus);
323     if (errCode != E_OK) {
324         return errCode;
325     }
326     if (!isCheckStatus) {
327         context->SetTaskErrCode(-E_NOT_SUPPORT);
328         return -E_NOT_SUPPORT;
329     }
330     return E_OK;
331 }
332 
RemoveSubscribeIfNeed(const std::string & queryId,const std::shared_ptr<SubscribeManager> & subscribeManager)333 void SingleVerDataSync::RemoveSubscribeIfNeed(const std::string &queryId,
334     const std::shared_ptr<SubscribeManager> &subscribeManager)
335 {
336     if (!subscribeManager->IsQueryExistSubscribe(queryId)) {
337         storage_->RemoveSubscribe(queryId);
338     }
339 }
340 } // namespace DistributedDB
341