1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_aggregator.h"
17
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26
27 namespace DistributedDB {
28 namespace {
29 constexpr int MAX_SEND_RETRY = 2;
30 constexpr int RETRY_TIME_SPLIT = 4;
GetThreadId()31 inline std::string GetThreadId()
32 {
33 std::stringstream stream;
34 stream << std::this_thread::get_id();
35 return stream.str();
36 }
37 }
38
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42 : shutdown_(false),
43 incFrameId_(0),
44 localSourceId_(0)
45 {
46 }
47
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50 scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51 adapterHandle_ = nullptr;
52 commLinker_ = nullptr;
53 }
54
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter)
56 {
57 if (inAdapter == nullptr) {
58 return -E_INVALID_ARGS;
59 }
60 adapterHandle_ = inAdapter;
61
62 combiner_.Initialize();
63 retainer_.Initialize();
64 scheduler_.Initialize();
65
66 int errCode;
67 commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter);
68 if (commLinker_ == nullptr) {
69 errCode = -E_OUT_OF_MEMORY;
70 goto ROLL_BACK;
71 }
72 commLinker_->Initialize();
73
74 errCode = RegCallbackToAdapter();
75 if (errCode != E_OK) {
76 goto ROLL_BACK;
77 }
78
79 errCode = adapterHandle_->StartAdapter();
80 if (errCode != E_OK) {
81 LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82 goto ROLL_BACK;
83 }
84 GenerateLocalSourceId();
85
86 shutdown_ = false;
87 InitSendThread();
88 dbStatusAdapter_ = statusAdapter;
89 RegDBChangeCallback();
90 return E_OK;
91 ROLL_BACK:
92 UnRegCallbackFromAdapter();
93 if (commLinker_ != nullptr) {
94 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
95 commLinker_ = nullptr;
96 }
97 // Scheduler do not need to do finalize in this roll_back
98 retainer_.Finalize();
99 combiner_.Finalize();
100 return errCode;
101 }
102
Finalize()103 void CommunicatorAggregator::Finalize()
104 {
105 shutdown_ = true;
106 retryCv_.notify_all();
107 {
108 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
109 wakingSignal_ = true;
110 wakingCv_.notify_one();
111 }
112 if (useExclusiveThread_) {
113 exclusiveThread_.join(); // Waiting thread to thoroughly quit
114 LOGI("[CommAggr][Final] Sub Thread Exit.");
115 } else {
116 LOGI("[CommAggr][Final] Begin wait send task exit.");
117 std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
118 finalizeCv_.wait(scheduleSendTaskLock, [this]() {
119 return !sendTaskStart_;
120 });
121 LOGI("[CommAggr][Final] End wait send task exit.");
122 }
123 scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
124
125 adapterHandle_->StopAdapter();
126 UnRegCallbackFromAdapter();
127 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
128
129 // No callback now and later, so combiner, retainer and linker can finalize or delete safely
130 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
131 commLinker_ = nullptr;
132 retainer_.Finalize();
133 combiner_.Finalize();
134 dbStatusAdapter_ = nullptr;
135 }
136
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)137 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId)
138 {
139 uint64_t netOrderLabel = HostToNet(commLabel);
140 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
141 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
142 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
143 realLabel[i] = eachByte[i];
144 }
145 return AllocCommunicator(realLabel, outErrorNo, userId);
146 }
147
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo,const std::string & userId)148 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo,
149 const std::string &userId)
150 {
151 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
152 LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
153 if (commLabel.size() != COMM_LABEL_LENGTH) {
154 outErrorNo = -E_INVALID_ARGS;
155 return nullptr;
156 }
157
158 if (commMap_.count(userId) != 0 && commMap_[userId].count(commLabel) != 0) {
159 outErrorNo = -E_ALREADY_ALLOC;
160 return nullptr;
161 }
162
163 Communicator *commPtr = new(std::nothrow) Communicator(this, commLabel);
164 if (commPtr == nullptr) {
165 LOGE("[CommAggr][Alloc] Communicator create failed, may be no available memory.");
166 outErrorNo = -E_OUT_OF_MEMORY;
167 return nullptr;
168 }
169 commMap_[userId][commLabel] = {commPtr, false}; // Communicator is not activated when allocated
170 return commPtr;
171 }
172
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)173 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
174 {
175 if (inCommunicator == nullptr) {
176 return;
177 }
178 Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
179 LabelType commLabel = commPtr->GetCommunicatorLabel();
180 LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
181
182 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
183 if (commMap_.count(userId) == 0 || commMap_[userId].count(commLabel) == 0) {
184 LOGE("[CommAggr][Release] Not Found.");
185 return;
186 }
187 commMap_[userId].erase(commLabel);
188 if (commMap_[userId].empty()) {
189 commMap_.erase(userId);
190 }
191 RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
192
193 int errCode = commLinker_->DecreaseLocalLabel(commLabel);
194 if (errCode != E_OK) {
195 LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
196 }
197 }
198
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)199 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
200 const Finalizer &inOper)
201 {
202 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
203 return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
204 }
205
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)206 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
207 {
208 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
209 int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
210 if (onConnect && errCode == E_OK) {
211 // Register action and success
212 std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
213 for (auto &entry : onlineTargets) {
214 LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
215 onConnectHandle_(entry, true);
216 }
217 }
218 return errCode;
219 }
220
GetCommunicatorAggregatorMtuSize() const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
222 {
223 return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
224 }
225
GetCommunicatorAggregatorMtuSize(const std::string & target) const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
227 {
228 return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
229 }
230
GetCommunicatorAggregatorTimeout() const231 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
232 {
233 return adapterHandle_->GetTimeout();
234 }
235
GetCommunicatorAggregatorTimeout(const std::string & target) const236 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
237 {
238 return adapterHandle_->GetTimeout(target);
239 }
240
IsDeviceOnline(const std::string & device) const241 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
242 {
243 return adapterHandle_->IsDeviceOnline(device);
244 }
245
GetLocalIdentity(std::string & outTarget) const246 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
247 {
248 return adapterHandle_->GetLocalIdentity(outTarget);
249 }
250
ActivateCommunicator(const LabelType & commLabel,const std::string & userId)251 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, const std::string &userId)
252 {
253 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
254 LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
255 if (commMap_[userId].count(commLabel) == 0) {
256 LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
257 return;
258 }
259 if (commMap_[userId].at(commLabel).second) {
260 return;
261 }
262 commMap_[userId].at(commLabel).second = true; // Mark this communicator as activated
263
264 // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
265 // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
266 std::set<std::string> onlineTargets;
267 int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
268 if (errCode != E_OK) {
269 LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
270 // Do not return here
271 }
272 for (auto &entry : onlineTargets) {
273 LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
274 commMap_[userId].at(commLabel).first->OnConnectChange(entry, true);
275 }
276 // Do Redeliver, the communicator is responsible to deal with the frame
277 std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
278 for (auto &entry : framesToRedeliver) {
279 commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer, entry.sendUser,
280 entry.remoteDbVersion);
281 }
282 }
283
284 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)285 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
286 {
287 if (onEnd) {
288 TaskAction onSendEndTask = [onEnd, result]() {
289 LOGD("[CommAggr][SendEndTask] Before On Send End.");
290 onEnd(result, true);
291 LOGD("[CommAggr][SendEndTask] After On Send End.");
292 };
293 int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
294 if (errCode != E_OK) {
295 LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
296 }
297 }
298 }
299 }
300
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)301 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
302 FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
303 {
304 if (inBuff == nullptr) {
305 return -E_INVALID_ARGS;
306 }
307
308 if (!ReGenerateLocalSourceIdIfNeed()) {
309 delete inBuff;
310 inBuff = nullptr;
311 DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
312 LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
313 return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
314 }
315 bool sendLabelExchange = true;
316 if (dbStatusAdapter_ != nullptr) {
317 sendLabelExchange = dbStatusAdapter_->IsSendLabelExchange();
318 }
319 PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType,
320 sendLabelExchange};
321 int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
322 if (errCode != E_OK) {
323 LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
324 return errCode;
325 }
326 {
327 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
328 sendRecord_[info.frameId] = {};
329 }
330 SendTask task{inBuff, dstTarget, onEnd, info.frameId, true, inConfig.isRetryTask, inConfig.infos};
331 if (inConfig.nonBlock) {
332 errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
333 } else {
334 errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
335 }
336 if (errCode != E_OK) {
337 LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
338 return errCode;
339 }
340 TriggerSendData();
341 LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u, isRetry=%d", dstTarget.c_str(), info.frameId,
342 task.isRetryTask);
343 return E_OK;
344 }
345
EnableCommunicatorNotFoundFeedback(bool isEnable)346 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
347 {
348 isCommunicatorNotFoundFeedbackEnable_ = isEnable;
349 }
350
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const351 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
352 {
353 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
354 auto pair = versionMap_.find(target);
355 if (pair == versionMap_.end()) {
356 return -E_NOT_FOUND;
357 }
358 outVersion = pair->second;
359 return E_OK;
360 }
361
SendDataRoutine()362 void CommunicatorAggregator::SendDataRoutine()
363 {
364 while (!shutdown_) {
365 if (scheduler_.GetNoDelayTaskCount() == 0) {
366 std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
367 LOGI("[CommAggr][Routine] Send done and sleep.");
368 wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
369 LOGI("[CommAggr][Routine] Send continue.");
370 wakingSignal_ = false;
371 continue;
372 }
373 SendOnceData();
374 }
375 }
376
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)377 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
378 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
379 {
380 bool taskNeedFinalize = true;
381 int errCode = E_OK;
382 ResetFrameRecordIfNeed(inTask.frameId, mtu);
383 uint32_t startIndex;
384 {
385 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
386 startIndex = sendRecord_[inTask.frameId].sendIndex;
387 }
388 uint64_t currentSendSequenceId = IncreaseSendSequenceId(inTask.dstTarget);
389 DeviceInfos deviceInfos = {inTask.dstTarget, inTask.infos, inTask.isRetryTask};
390 for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()) && inTask.isValid; ++index) {
391 auto &entry = eachPacket[index];
392 LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
393 ", packetLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
394 ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
395 errCode = adapterHandle_->SendBytes(deviceInfos, entry.first, entry.second.second, totalLength);
396 {
397 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
398 sendRecord_[inTask.frameId].sendIndex = index;
399 }
400 if (errCode == -E_WAIT_RETRY) {
401 LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
402 taskNeedFinalize = false;
403 break;
404 } else if (errCode != E_OK) {
405 LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
406 break;
407 } else {
408 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
409 retryCount_[inTask.dstTarget] = 0;
410 }
411 }
412 if (errCode == -E_WAIT_RETRY) {
413 RetrySendTaskIfNeed(inTask.dstTarget, currentSendSequenceId);
414 }
415 if (taskNeedFinalize) {
416 TaskFinalizer(inTask, errCode);
417 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
418 sendRecord_.erase(inTask.frameId);
419 }
420 }
421
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)422 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
423 {
424 int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
425 if (errCode != E_OK) {
426 bool notTimeout = true;
427 auto retryFunc = [this, inPrio, &inTask]()->bool {
428 if (this->shutdown_) {
429 delete inTask.buffer;
430 inTask.buffer = nullptr;
431 return true;
432 }
433 int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
434 if (retCode != E_OK) {
435 return false;
436 }
437 return true;
438 };
439
440 if (timeout == 0) { // Unlimited retry
441 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
442 retryCv_.wait(retryUniqueLock, retryFunc);
443 } else {
444 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
445 notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
446 }
447
448 if (shutdown_) {
449 return E_OK;
450 }
451 if (!notTimeout) {
452 return -E_TIMEOUT;
453 }
454 }
455 return E_OK;
456 }
457
TaskFinalizer(const SendTask & inTask,int result)458 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
459 {
460 // Call the OnSendEnd if need
461 if (inTask.onEnd) {
462 LOGD("[CommAggr][TaskFinal] On Send End.");
463 inTask.onEnd(result, true);
464 }
465 // Finalize the task that just scheduled
466 int errCode = scheduler_.FinalizeLastScheduleTask();
467 // Notify Sendable To All Communicator If Need
468 if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
469 retryCv_.notify_all();
470 }
471 if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
472 NotifySendableToAllCommunicator();
473 }
474 }
475
NotifySendableToAllCommunicator()476 void CommunicatorAggregator::NotifySendableToAllCommunicator()
477 {
478 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
479 for (auto &userCommMap : commMap_) {
480 for (auto &entry : userCommMap.second) {
481 // Ignore nonactivated communicator
482 if (entry.second.second) {
483 entry.second.first->OnSendAvailable();
484 }
485 }
486 }
487 }
488
OnBytesReceive(const ReceiveBytesInfo & receiveBytesInfo,const DataUserInfoProc & userInfoProc)489 void CommunicatorAggregator::OnBytesReceive(const ReceiveBytesInfo &receiveBytesInfo,
490 const DataUserInfoProc &userInfoProc)
491 {
492 ProtocolProto::DisplayPacketInformation(receiveBytesInfo.bytes, receiveBytesInfo.length);
493 ParseResult packetResult;
494 int errCode = ProtocolProto::CheckAndParsePacket(receiveBytesInfo.srcTarget, receiveBytesInfo.bytes,
495 receiveBytesInfo.length, packetResult);
496 if (errCode != E_OK) {
497 LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
498 if (errCode == -E_VERSION_NOT_SUPPORT) {
499 TriggerVersionNegotiation(receiveBytesInfo.srcTarget);
500 }
501 return;
502 }
503
504 // Update version of remote target
505 SetRemoteCommunicatorVersion(receiveBytesInfo.srcTarget, packetResult.GetDbVersion());
506 if (dbStatusAdapter_ != nullptr) {
507 dbStatusAdapter_->SetRemoteOptimizeCommunication(receiveBytesInfo.srcTarget,
508 !packetResult.IsSendLabelExchange());
509 }
510 if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
511 LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
512 return;
513 }
514
515 if (packetResult.IsFragment()) {
516 OnFragmentReceive(receiveBytesInfo, packetResult, userInfoProc);
517 } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
518 errCode = OnCommLayerFrameReceive(receiveBytesInfo.srcTarget, packetResult);
519 if (errCode != E_OK) {
520 LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
521 }
522 } else {
523 errCode = OnAppLayerFrameReceive(receiveBytesInfo, packetResult, userInfoProc);
524 if (errCode != E_OK) {
525 LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
526 }
527 }
528 }
529
OnTargetChange(const std::string & target,bool isConnect)530 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
531 {
532 if (target.empty()) {
533 LOGE("[CommAggr][OnTarget] Target empty string.");
534 return;
535 }
536 // For process level target change
537 {
538 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
539 if (onConnectHandle_) {
540 onConnectHandle_(target, isConnect);
541 LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
542 } else {
543 LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
544 }
545 }
546 std::set<LabelType> relatedLabels;
547 // For communicator level target change
548 if (isConnect) {
549 int errCode = commLinker_->TargetOnline(target, relatedLabels);
550 if (errCode != E_OK) {
551 LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
552 }
553 } else {
554 commLinker_->TargetOffline(target, relatedLabels);
555 }
556 // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
557 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
558 for (auto &userCommMap : commMap_) {
559 for (auto &entry: userCommMap.second) {
560 // Ignore nonactivated communicator
561 if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
562 entry.second.first->OnConnectChange(target, isConnect);
563 }
564 }
565 }
566 }
567
OnSendable(const std::string & target)568 void CommunicatorAggregator::OnSendable(const std::string &target)
569 {
570 int errCode = scheduler_.NoDelayTaskByTarget(target);
571 if (errCode != E_OK) {
572 LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
573 return;
574 }
575 TriggerSendData();
576 }
577
OnFragmentReceive(const ReceiveBytesInfo & receiveBytesInfo,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)578 void CommunicatorAggregator::OnFragmentReceive(const ReceiveBytesInfo &receiveBytesInfo,
579 const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
580 {
581 int errorNo = E_OK;
582 ParseResult frameResult;
583 SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(receiveBytesInfo.bytes, receiveBytesInfo.length,
584 inResult, frameResult, errorNo);
585 if (errorNo != E_OK) {
586 LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
587 return;
588 }
589 if (frameBuffer == nullptr) {
590 LOGW("[CommAggr][Receive] Combine undone.");
591 return;
592 }
593
594 int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
595 if (errCode != E_OK) {
596 LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
597 delete frameBuffer;
598 frameBuffer = nullptr;
599 if (errCode == -E_VERSION_NOT_SUPPORT) {
600 TriggerVersionNegotiation(receiveBytesInfo.srcTarget);
601 }
602 return;
603 }
604
605 if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
606 errCode = OnCommLayerFrameReceive(receiveBytesInfo.srcTarget, frameResult);
607 if (errCode != E_OK) {
608 LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
609 }
610 delete frameBuffer;
611 frameBuffer = nullptr;
612 } else {
613 errCode = OnAppLayerFrameReceive(receiveBytesInfo, frameBuffer, frameResult, userInfoProc);
614 if (errCode != E_OK) {
615 LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
616 }
617 }
618 }
619
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)620 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
621 {
622 if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
623 int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
624 inResult.GetLabelExchangeSequenceId());
625 if (errCode != E_OK) {
626 LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
627 return errCode;
628 }
629 } else {
630 std::map<LabelType, bool> changedLabels;
631 int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
632 inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
633 if (errCode != E_OK) {
634 LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
635 return errCode;
636 }
637 NotifyConnectChange(srcTarget, changedLabels);
638 }
639 return E_OK;
640 }
641
OnAppLayerFrameReceive(const ReceiveBytesInfo & receiveBytesInfo,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)642 int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo,
643 const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
644 {
645 SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
646 if (buffer == nullptr) {
647 LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
648 return -E_OUT_OF_MEMORY;
649 }
650 int errCode = buffer->SetExternalBuff(receiveBytesInfo.bytes, receiveBytesInfo.length - inResult.GetPaddingLen(),
651 ProtocolProto::GetAppLayerFrameHeaderLength());
652 if (errCode != E_OK) {
653 LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
654 delete buffer;
655 buffer = nullptr;
656 return -E_INTERNAL_ERROR;
657 }
658 return OnAppLayerFrameReceive(receiveBytesInfo, buffer, inResult, userInfoProc);
659 }
660
661 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
662 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
663 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
664 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
665 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
666 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
667 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
668 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
669 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
670 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
671 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
672 // in the same callback thread finally causing DeadLock on commMapMutex_.
673 // #### SO #### we have to make a change described below
674 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
675 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
676 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
677 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
678 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const ReceiveBytesInfo & receiveBytesInfo,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)679 int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &receiveBytesInfo,
680 SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
681 {
682 LabelType toLabel = inResult.GetCommLabel();
683 uint16_t remoteDbVersion = inResult.GetDbVersion();
684 UserInfo userInfo = { .sendUser = DBConstant::DEFAULT_USER };
685 if (receiveBytesInfo.isNeedGetUserInfo) {
686 int ret = GetDataUserId(inResult, toLabel, userInfoProc, receiveBytesInfo.srcTarget, userInfo);
687 if (ret == NEED_CORRECT_TARGET_USER) {
688 TryToFeedBackWithErr(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer,
689 E_NEED_CORRECT_TARGET_USER);
690 delete inFrameBuffer;
691 inFrameBuffer = nullptr;
692 return -E_NEED_CORRECT_TARGET_USER;
693 }
694 if (ret != E_OK || userInfo.sendUser.empty()) {
695 LOGE("[CommAggr][AppReceive] get data user id err, ret=%d, empty receiveUser=%d, empty sendUser=%d", ret,
696 userInfo.receiveUser.empty(), userInfo.sendUser.empty());
697 delete inFrameBuffer;
698 inFrameBuffer = nullptr;
699 return ret != E_OK ? ret : -E_NO_TRUSTED_USER;
700 }
701 }
702 {
703 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
704 int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(remoteDbVersion, receiveBytesInfo.srcTarget,
705 inFrameBuffer, toLabel, userInfo);
706 if (errCode == E_OK) { // Attention: Here is equal to E_OK
707 return E_OK;
708 } else if (errCode == -E_FEEDBACK_DB_CLOSING) {
709 TryToFeedbackWhenCommunicatorNotFound(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer,
710 E_FEEDBACK_DB_CLOSING);
711 delete inFrameBuffer;
712 inFrameBuffer = nullptr;
713 return errCode; // The caller will display errCode in log
714 }
715 }
716 LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
717 return ReTryDeliverAppLayerFrameOnCommunicatorNotFound(receiveBytesInfo, inFrameBuffer, inResult, userInfoProc,
718 userInfo);
719 }
720
GetDataUserId(const ParseResult & inResult,const LabelType & toLabel,const DataUserInfoProc & userInfoProc,const std::string & device,UserInfo & userInfo)721 int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel,
722 const DataUserInfoProc &userInfoProc, const std::string &device, UserInfo &userInfo)
723 {
724 if (userInfoProc.processCommunicator == nullptr) {
725 LOGE("[CommAggr][GetDataUserId] processCommunicator is nullptr");
726 return E_INVALID_ARGS;
727 }
728 std::string label(toLabel.begin(), toLabel.end());
729 std::vector<UserInfo> userInfos;
730 DataUserInfo dataUserInfo = {userInfoProc.data, userInfoProc.length, label, device};
731 DBStatus ret = userInfoProc.processCommunicator->GetDataUserInfo(dataUserInfo, userInfos);
732 LOGI("[CommAggr][GetDataUserId] get data user info, ret=%d", ret);
733 if (ret == NO_PERMISSION) {
734 LOGE("[CommAggr][GetDataUserId] userId dismatched, drop packet");
735 return ret;
736 } else if (ret == NEED_CORRECT_TARGET_USER) {
737 LOGW("[CommAggr][GetDataUserId] the target user is incorrect and needs to be corrected");
738 return ret;
739 }
740 if (!userInfos.empty()) {
741 userInfo = userInfos[0];
742 } else {
743 LOGW("[CommAggr][GetDataUserId] userInfos is empty");
744 }
745 return E_OK;
746 }
747
TryDeliverAppLayerFrameToCommunicatorNoMutex(uint16_t remoteDbVersion,const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel,const UserInfo & userInfo)748 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(uint16_t remoteDbVersion,
749 const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo)
750 {
751 // Ignore nonactivated communicator, which is regarded as inexistent
752 const std::string &sendUser = userInfo.sendUser;
753 const std::string &receiveUser = userInfo.receiveUser;
754 if (commMap_[receiveUser].count(toLabel) != 0 && commMap_[receiveUser].at(toLabel).second) {
755 int ret = commMap_[receiveUser].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer, sendUser,
756 remoteDbVersion);
757 // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
758 if (ret == E_OK) {
759 inFrameBuffer = nullptr;
760 }
761 return ret;
762 }
763 Communicator *communicator = nullptr;
764 bool isEmpty = false;
765 for (auto &userCommMap : commMap_) {
766 for (auto &entry : userCommMap.second) {
767 if (entry.first == toLabel && entry.second.second) {
768 communicator = entry.second.first;
769 isEmpty = userCommMap.first.empty();
770 LOGW("[CommAggr][TryDeliver] Found communicator of %s, but required user is %s",
771 userCommMap.first.c_str(), receiveUser.c_str());
772 break;
773 }
774 }
775 if (communicator != nullptr) {
776 break;
777 }
778 }
779 if (communicator != nullptr && (receiveUser.empty() || isEmpty)) {
780 int ret = communicator->OnBufferReceive(srcTarget, inFrameBuffer, sendUser, remoteDbVersion);
781 if (ret == E_OK) {
782 inFrameBuffer = nullptr;
783 }
784 return ret;
785 }
786 LOGE("[CommAggr][TryDeliver] Communicator not found");
787 return -E_NOT_FOUND;
788 }
789
RegCallbackToAdapter()790 int CommunicatorAggregator::RegCallbackToAdapter()
791 {
792 RefObject::IncObjRef(this); // Reference to be hold by adapter
793 int errCode = adapterHandle_->RegBytesReceiveCallback(
794 [this](const ReceiveBytesInfo &receiveBytesInfo, const DataUserInfoProc &userInfoProc) {
795 OnBytesReceive(receiveBytesInfo, userInfoProc);
796 }, [this]() { RefObject::DecObjRef(this); });
797 if (errCode != E_OK) {
798 RefObject::DecObjRef(this); // Rollback in case reg failed
799 return errCode;
800 }
801
802 RefObject::IncObjRef(this); // Reference to be hold by adapter
803 errCode = adapterHandle_->RegTargetChangeCallback(
804 [this](const std::string &target, bool isConnect) { OnTargetChange(target, isConnect); },
805 [this]() { RefObject::DecObjRef(this); });
806 if (errCode != E_OK) {
807 RefObject::DecObjRef(this); // Rollback in case reg failed
808 return errCode;
809 }
810
811 RefObject::IncObjRef(this); // Reference to be hold by adapter
812 errCode = adapterHandle_->RegSendableCallback([this](const std::string &target, int deviceCommErrCode) {
813 LOGI("[CommAggr] Send able dev=%.3s, deviceCommErrCode=%d", target.c_str(), deviceCommErrCode);
814 (void)IncreaseSendSequenceId(target);
815 scheduler_.SetDeviceCommErrCode(target, deviceCommErrCode);
816 OnSendable(target);
817 },
818 [this]() { RefObject::DecObjRef(this); });
819 if (errCode != E_OK) {
820 RefObject::DecObjRef(this); // Rollback in case reg failed
821 return errCode;
822 }
823
824 return E_OK;
825 }
826
UnRegCallbackFromAdapter()827 void CommunicatorAggregator::UnRegCallbackFromAdapter()
828 {
829 adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
830 adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
831 adapterHandle_->RegSendableCallback(nullptr, nullptr);
832 if (dbStatusAdapter_ != nullptr) {
833 dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr, nullptr);
834 }
835 }
836
GenerateLocalSourceId()837 void CommunicatorAggregator::GenerateLocalSourceId()
838 {
839 std::string identity;
840 adapterHandle_->GetLocalIdentity(identity);
841 // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
842 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
843 uint64_t identityHash = Hash::HashFunc(identity);
844 if (identityHash != localSourceId_) {
845 LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
846 }
847 localSourceId_ = identityHash;
848 }
849
ReGenerateLocalSourceIdIfNeed()850 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
851 {
852 // The deviceId will change when switch user from A to B
853 // We can't listen to the user change, because it's hard to ensure the timing is correct.
854 // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
855 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
856 GenerateLocalSourceId();
857 return (localSourceId_ != 0);
858 }
859
TriggerVersionNegotiation(const std::string & dstTarget)860 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
861 {
862 LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
863 int errCode = E_OK;
864 SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
865 if (errCode != E_OK) {
866 LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
867 return;
868 }
869
870 TaskConfig config{true, true, 0, Priority::HIGH};
871 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
872 if (errCode != E_OK) {
873 LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
874 // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
875 delete buffer;
876 buffer = nullptr;
877 }
878 }
879
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame,int inErrCode)880 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
881 const LabelType &dstLabel, const SerialBuffer *inOriFrame, int inErrCode)
882 {
883 if (!isCommunicatorNotFoundFeedbackEnable_) {
884 return;
885 }
886 TryToFeedBackWithErr(dstTarget, dstLabel, inOriFrame, inErrCode);
887 }
888
TryToFeedBackWithErr(const std::string & dstTarget,const DistributedDB::LabelType & dstLabel,const DistributedDB::SerialBuffer * inOriFrame,int inErrCode)889 void CommunicatorAggregator::TryToFeedBackWithErr(const std::string &dstTarget,
890 const DistributedDB::LabelType &dstLabel, const DistributedDB::SerialBuffer *inOriFrame, int inErrCode)
891 {
892 if (dstTarget.empty() || inOriFrame == nullptr) {
893 return;
894 }
895 int errCode = E_OK;
896 Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
897 if (message == nullptr) {
898 if (errCode == -E_VERSION_NOT_SUPPORT) {
899 TriggerVersionNegotiation(dstTarget);
900 }
901 return;
902 }
903 // Message is release in TriggerCommunicatorNotFoundFeedback
904 TriggerCommunicatorFeedback(dstTarget, dstLabel, message, inErrCode);
905 }
906
TriggerCommunicatorFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg,int sendErrNo)907 void CommunicatorAggregator::TriggerCommunicatorFeedback(const std::string &dstTarget,
908 const LabelType &dstLabel, Message* &oriMsg, int sendErrNo)
909 {
910 if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
911 LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
912 // Do not have to do feedback if the message is not a request type message
913 delete oriMsg;
914 oriMsg = nullptr;
915 return;
916 }
917
918 LOGI("[CommAggr][TrigNotFound] Do communicator feedback with target=%s{private}, send error code=%d.",
919 dstTarget.c_str(), sendErrNo);
920 oriMsg->SetMessageType(TYPE_RESPONSE);
921 oriMsg->SetErrorNo(sendErrNo);
922
923 int errCode = E_OK;
924 SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
925 delete oriMsg;
926 oriMsg = nullptr;
927 if (errCode != E_OK) {
928 LOGE("[CommAggr][TrigNotFound] Build communicator feedback frame fail, errCode=%d", errCode);
929 return;
930 }
931
932 TaskConfig config{true, true, 0, Priority::HIGH};
933 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
934 if (errCode != E_OK) {
935 LOGE("[CommAggr][TrigNotFound] Send communicator feedback frame fail, errCode=%d", errCode);
936 // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
937 delete buffer;
938 buffer = nullptr;
939 }
940 }
941
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)942 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
943 {
944 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
945 versionMap_[target] = version;
946 }
947
GetExtendHeaderHandle(const ExtendInfo & paramInfo)948 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
949 {
950 if (adapterHandle_ == nullptr) {
951 return nullptr;
952 }
953 return adapterHandle_->GetExtendHeaderHandle(paramInfo);
954 }
955
OnRemoteDBStatusChange(const std::string & devInfo,const std::vector<DBInfo> & dbInfos)956 void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos)
957 {
958 std::map<LabelType, bool> changedLabels;
959 for (const auto &dbInfo: dbInfos) {
960 std::string label = DBCommon::GenerateHashLabel(dbInfo);
961 LabelType labelType(label.begin(), label.end());
962 changedLabels[labelType] = dbInfo.isNeedSync;
963 }
964 if (commLinker_ != nullptr) {
965 commLinker_->UpdateOnlineLabels(devInfo, changedLabels);
966 }
967 NotifyConnectChange(devInfo, changedLabels);
968 }
969
NotifyConnectChange(const std::string & srcTarget,const std::map<LabelType,bool> & changedLabels)970 void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget,
971 const std::map<LabelType, bool> &changedLabels)
972 {
973 if (commLinker_ != nullptr && !commLinker_->IsRemoteTargetOnline(srcTarget)) {
974 LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str());
975 for (const auto &entry : changedLabels) {
976 LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
977 }
978 return;
979 }
980 // Do target change notify
981 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
982 for (auto &entry : changedLabels) {
983 for (auto &userCommMap : commMap_) {
984 // Ignore nonactivated communicator
985 if (userCommMap.second.count(entry.first) != 0 && userCommMap.second.at(entry.first).second) {
986 LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.",
987 VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
988 userCommMap.second.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
989 }
990 }
991 }
992 }
993
RegDBChangeCallback()994 void CommunicatorAggregator::RegDBChangeCallback()
995 {
996 if (dbStatusAdapter_ != nullptr) {
997 dbStatusAdapter_->SetDBStatusChangeCallback(
998 [this](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
999 OnRemoteDBStatusChange(devInfo, dbInfos);
1000 },
1001 [this]() {
1002 if (commLinker_ != nullptr) {
1003 (void)commLinker_->TriggerLabelExchangeEvent(false);
1004 }
1005 },
1006 [this](const std::string &dev) {
1007 if (commLinker_ != nullptr) {
1008 std::set<LabelType> relatedLabels;
1009 (void)commLinker_->TargetOnline(dev, relatedLabels);
1010 }
1011 });
1012 }
1013 }
InitSendThread()1014 void CommunicatorAggregator::InitSendThread()
1015 {
1016 if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
1017 return;
1018 }
1019 exclusiveThread_ = std::thread([this] { SendDataRoutine(); });
1020 useExclusiveThread_ = true;
1021 }
1022
SendOnceData()1023 void CommunicatorAggregator::SendOnceData()
1024 {
1025 SendTask taskToSend;
1026 uint32_t totalLength = 0;
1027 int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
1028 if (errCode != E_OK) {
1029 return; // Not possible to happen
1030 }
1031 // <vector, extendHeadSize>
1032 std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
1033 uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
1034 if (taskToSend.buffer == nullptr) {
1035 LOGE("[CommAggr] buffer of taskToSend is nullptr.");
1036 return;
1037 }
1038 errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
1039 if (errCode != E_OK) {
1040 LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
1041 TaskFinalizer(taskToSend, errCode);
1042 return;
1043 }
1044 // <addr, <extendHeadSize, totalLen>>
1045 std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
1046 if (piecePackets.empty()) {
1047 // Case that no need to split a frame, just use original buffer as a packet
1048 std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
1049 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
1050 entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
1051 entry.second.first = taskToSend.buffer->GetExtendHeadLength();
1052 entry.second.second = tmpEntry.second + entry.second.first;
1053 eachPacket.push_back(entry);
1054 } else {
1055 for (auto &entry : piecePackets) {
1056 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
1057 {entry.second, entry.first.size()}};
1058 eachPacket.push_back(tmpEntry);
1059 }
1060 }
1061
1062 SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
1063 }
1064
TriggerSendData()1065 void CommunicatorAggregator::TriggerSendData()
1066 {
1067 if (useExclusiveThread_) {
1068 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
1069 wakingSignal_ = true;
1070 wakingCv_.notify_one();
1071 return;
1072 }
1073 {
1074 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1075 if (sendTaskStart_) {
1076 return;
1077 }
1078 sendTaskStart_ = true;
1079 }
1080 RefObject::IncObjRef(this);
1081 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
1082 LOGI("[CommAggr] Send thread start.");
1083 while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1084 SendOnceData();
1085 }
1086 {
1087 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1088 sendTaskStart_ = false;
1089 }
1090 if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1091 TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
1092 }
1093 finalizeCv_.notify_one();
1094 RefObject::DecObjRef(this);
1095 LOGI("[CommAggr] Send thread end.");
1096 });
1097 if (errCode != E_OK) {
1098 LOGW("[CommAggr] Trigger send data failed %d", errCode);
1099 RefObject::DecObjRef(this);
1100 }
1101 }
1102
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)1103 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
1104 {
1105 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
1106 if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
1107 sendRecord_[frameId].splitMtu = mtu;
1108 sendRecord_[frameId].sendIndex = 0u;
1109 }
1110 }
1111
RetrySendTaskIfNeed(const std::string & target,uint64_t sendSequenceId)1112 void CommunicatorAggregator::RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId)
1113 {
1114 if (IsRetryOutOfLimit(target)) {
1115 LOGD("[CommAggr] Retry send task is out of limit! target is %s{private}", target.c_str());
1116 scheduler_.InvalidSendTask(target);
1117 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1118 retryCount_[target] = 0;
1119 } else {
1120 RetrySendTask(target, sendSequenceId);
1121 if (sendSequenceId != GetSendSequenceId(target)) {
1122 LOGD("[CommAggr] %.3s Send sequence id has changed", target.c_str());
1123 return;
1124 }
1125 scheduler_.DelayTaskByTarget(target);
1126 }
1127 }
1128
RetrySendTask(const std::string & target,uint64_t sendSequenceId)1129 void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t sendSequenceId)
1130 {
1131 int32_t currentRetryCount = 0;
1132 {
1133 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1134 retryCount_[target]++;
1135 currentRetryCount = retryCount_[target];
1136 LOGD("[CommAggr] Target %s{private} retry count is %" PRId32, target.c_str(), currentRetryCount);
1137 }
1138 TimerId timerId = 0u;
1139 RefObject::IncObjRef(this);
1140 (void)RuntimeContext::GetInstance()->SetTimer(GetNextRetryInterval(target, currentRetryCount),
1141 [this, target, sendSequenceId](TimerId id) {
1142 if (sendSequenceId == GetSendSequenceId(target)) {
1143 OnSendable(target);
1144 } else {
1145 LOGD("[CommAggr] %.3s Send sequence id has changed in timer", target.c_str());
1146 }
1147 RefObject::DecObjRef(this);
1148 return -E_END_TIMER;
1149 }, nullptr, timerId);
1150 }
1151
IsRetryOutOfLimit(const std::string & target)1152 bool CommunicatorAggregator::IsRetryOutOfLimit(const std::string &target)
1153 {
1154 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1155 return retryCount_[target] >= MAX_SEND_RETRY;
1156 }
1157
GetNextRetryInterval(const std::string & target,int32_t currentRetryCount)1158 int32_t CommunicatorAggregator::GetNextRetryInterval(const std::string &target, int32_t currentRetryCount)
1159 {
1160 uint32_t timeout = DBConstant::MIN_TIMEOUT;
1161 if (adapterHandle_ != nullptr) {
1162 timeout = adapterHandle_->GetTimeout(target);
1163 }
1164 return static_cast<int32_t>(timeout) * currentRetryCount / RETRY_TIME_SPLIT;
1165 }
1166
GetSendSequenceId(const std::string & target)1167 uint64_t CommunicatorAggregator::GetSendSequenceId(const std::string &target)
1168 {
1169 std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1170 return sendSequence_[target];
1171 }
1172
IncreaseSendSequenceId(const std::string & target)1173 uint64_t CommunicatorAggregator::IncreaseSendSequenceId(const std::string &target)
1174 {
1175 std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1176 return ++sendSequence_[target];
1177 }
1178
ClearOnlineLabel()1179 void CommunicatorAggregator::ClearOnlineLabel()
1180 {
1181 std::lock_guard<std::mutex> autoLock(commMapMutex_);
1182 if (commLinker_ == nullptr) {
1183 LOGE("[CommAggr] clear online label with null linker");
1184 return;
1185 }
1186 commLinker_->ClearOnlineLabel();
1187 }
1188
ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo & receiveBytesInfo,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const DataUserInfoProc & userInfoProc,const UserInfo & userInfo)1189 int CommunicatorAggregator::ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo,
1190 SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc,
1191 const UserInfo &userInfo)
1192 {
1193 LabelType toLabel = inResult.GetCommLabel();
1194 uint16_t remoteDbVersion = inResult.GetDbVersion();
1195 int errCode = -E_NOT_FOUND;
1196 {
1197 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
1198 if (onCommLackHandle_) {
1199 errCode = onCommLackHandle_(toLabel, userInfo.receiveUser);
1200 LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
1201 } else {
1202 LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
1203 }
1204 }
1205 // Here we have to lock commMapMutex_ and search communicator again.
1206 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
1207 int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(remoteDbVersion, receiveBytesInfo.srcTarget,
1208 inFrameBuffer, toLabel, userInfo);
1209 if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
1210 LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
1211 return E_OK;
1212 }
1213 // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
1214 if (errCode != E_OK || errCodeAgain == -E_FEEDBACK_DB_CLOSING) {
1215 TryToFeedbackWhenCommunicatorNotFound(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer,
1216 errCodeAgain == -E_FEEDBACK_DB_CLOSING ? E_FEEDBACK_DB_CLOSING : E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
1217 if (inFrameBuffer != nullptr) {
1218 delete inFrameBuffer;
1219 inFrameBuffer = nullptr;
1220 }
1221 return errCode == E_OK ? errCodeAgain : errCode; // The caller will display errCode in log
1222 }
1223 // Do Retention, the retainer is responsible to deal with the frame
1224 retainer_.RetainFrame(FrameInfo{inFrameBuffer, receiveBytesInfo.srcTarget, userInfo.sendUser, toLabel,
1225 inResult.GetFrameId(), remoteDbVersion});
1226 inFrameBuffer = nullptr;
1227 return E_OK;
1228 }
1229
ResetRetryCount()1230 void CommunicatorAggregator::ResetRetryCount()
1231 {
1232 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1233 retryCount_.clear();
1234 }
1235 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
1236 } // namespace DistributedDB
1237