1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_aggregator.h"
17
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26
27 namespace DistributedDB {
28 namespace {
29 constexpr int MAX_SEND_RETRY = 2;
30 constexpr int RETRY_TIME_SPLIT = 4;
GetThreadId()31 inline std::string GetThreadId()
32 {
33 std::stringstream stream;
34 stream << std::this_thread::get_id();
35 return stream.str();
36 }
37 }
38
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42 : shutdown_(false),
43 incFrameId_(0),
44 localSourceId_(0)
45 {
46 }
47
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50 scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51 adapterHandle_ = nullptr;
52 commLinker_ = nullptr;
53 }
54
Initialize(IAdapter * inAdapter,const std::shared_ptr<DBStatusAdapter> & statusAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter)
56 {
57 if (inAdapter == nullptr) {
58 return -E_INVALID_ARGS;
59 }
60 adapterHandle_ = inAdapter;
61
62 combiner_.Initialize();
63 retainer_.Initialize();
64 scheduler_.Initialize();
65
66 int errCode;
67 commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter);
68 if (commLinker_ == nullptr) {
69 errCode = -E_OUT_OF_MEMORY;
70 goto ROLL_BACK;
71 }
72 commLinker_->Initialize();
73
74 errCode = RegCallbackToAdapter();
75 if (errCode != E_OK) {
76 goto ROLL_BACK;
77 }
78
79 errCode = adapterHandle_->StartAdapter();
80 if (errCode != E_OK) {
81 LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82 goto ROLL_BACK;
83 }
84 GenerateLocalSourceId();
85
86 shutdown_ = false;
87 InitSendThread();
88 dbStatusAdapter_ = statusAdapter;
89 RegDBChangeCallback();
90 return E_OK;
91 ROLL_BACK:
92 UnRegCallbackFromAdapter();
93 if (commLinker_ != nullptr) {
94 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
95 commLinker_ = nullptr;
96 }
97 // Scheduler do not need to do finalize in this roll_back
98 retainer_.Finalize();
99 combiner_.Finalize();
100 return errCode;
101 }
102
Finalize()103 void CommunicatorAggregator::Finalize()
104 {
105 shutdown_ = true;
106 retryCv_.notify_all();
107 {
108 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
109 wakingSignal_ = true;
110 wakingCv_.notify_one();
111 }
112 if (useExclusiveThread_) {
113 exclusiveThread_.join(); // Waiting thread to thoroughly quit
114 LOGI("[CommAggr][Final] Sub Thread Exit.");
115 } else {
116 LOGI("[CommAggr][Final] Begin wait send task exit.");
117 std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
118 finalizeCv_.wait(scheduleSendTaskLock, [this]() {
119 return !sendTaskStart_;
120 });
121 LOGI("[CommAggr][Final] End wait send task exit.");
122 }
123 scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
124
125 adapterHandle_->StopAdapter();
126 UnRegCallbackFromAdapter();
127 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
128
129 // No callback now and later, so combiner, retainer and linker can finalize or delete safely
130 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
131 commLinker_ = nullptr;
132 retainer_.Finalize();
133 combiner_.Finalize();
134 dbStatusAdapter_ = nullptr;
135 }
136
AllocCommunicator(uint64_t commLabel,int & outErrorNo,const std::string & userId)137 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo, const std::string &userId)
138 {
139 uint64_t netOrderLabel = HostToNet(commLabel);
140 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
141 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
142 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
143 realLabel[i] = eachByte[i];
144 }
145 return AllocCommunicator(realLabel, outErrorNo, userId);
146 }
147
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo,const std::string & userId)148 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo,
149 const std::string &userId)
150 {
151 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
152 LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
153 if (commLabel.size() != COMM_LABEL_LENGTH) {
154 outErrorNo = -E_INVALID_ARGS;
155 return nullptr;
156 }
157
158 if (commMap_.count(userId) != 0 && commMap_[userId].count(commLabel) != 0) {
159 outErrorNo = -E_ALREADY_ALLOC;
160 return nullptr;
161 }
162
163 Communicator *commPtr = new(std::nothrow) Communicator(this, commLabel);
164 if (commPtr == nullptr) {
165 LOGE("[CommAggr][Alloc] Communicator create failed, may be no available memory.");
166 outErrorNo = -E_OUT_OF_MEMORY;
167 return nullptr;
168 }
169 commMap_[userId][commLabel] = {commPtr, false}; // Communicator is not activated when allocated
170 return commPtr;
171 }
172
ReleaseCommunicator(ICommunicator * inCommunicator,const std::string & userId)173 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator, const std::string &userId)
174 {
175 if (inCommunicator == nullptr) {
176 return;
177 }
178 Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
179 LabelType commLabel = commPtr->GetCommunicatorLabel();
180 LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
181
182 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
183 if (commMap_.count(userId) == 0 || commMap_[userId].count(commLabel) == 0) {
184 LOGE("[CommAggr][Release] Not Found.");
185 return;
186 }
187 commMap_[userId].erase(commLabel);
188 if (commMap_[userId].empty()) {
189 commMap_.erase(userId);
190 }
191 RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
192
193 int errCode = commLinker_->DecreaseLocalLabel(commLabel);
194 if (errCode != E_OK) {
195 LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
196 }
197 }
198
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)199 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
200 const Finalizer &inOper)
201 {
202 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
203 return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
204 }
205
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)206 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
207 {
208 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
209 int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
210 if (onConnect && errCode == E_OK) {
211 // Register action and success
212 std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
213 for (auto &entry : onlineTargets) {
214 LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
215 onConnectHandle_(entry, true);
216 }
217 }
218 return errCode;
219 }
220
GetCommunicatorAggregatorMtuSize() const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
222 {
223 return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
224 }
225
GetCommunicatorAggregatorMtuSize(const std::string & target) const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
227 {
228 return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
229 }
230
GetCommunicatorAggregatorTimeout() const231 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
232 {
233 return adapterHandle_->GetTimeout();
234 }
235
GetCommunicatorAggregatorTimeout(const std::string & target) const236 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
237 {
238 return adapterHandle_->GetTimeout(target);
239 }
240
IsDeviceOnline(const std::string & device) const241 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
242 {
243 return adapterHandle_->IsDeviceOnline(device);
244 }
245
GetLocalIdentity(std::string & outTarget) const246 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
247 {
248 return adapterHandle_->GetLocalIdentity(outTarget);
249 }
250
ActivateCommunicator(const LabelType & commLabel,const std::string & userId)251 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, const std::string &userId)
252 {
253 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
254 LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
255 if (commMap_[userId].count(commLabel) == 0) {
256 LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
257 return;
258 }
259 if (commMap_[userId].at(commLabel).second) {
260 return;
261 }
262 commMap_[userId].at(commLabel).second = true; // Mark this communicator as activated
263
264 // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
265 // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
266 std::set<std::string> onlineTargets;
267 int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
268 if (errCode != E_OK) {
269 LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
270 // Do not return here
271 }
272 for (auto &entry : onlineTargets) {
273 LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
274 commMap_[userId].at(commLabel).first->OnConnectChange(entry, true);
275 }
276 // Do Redeliver, the communicator is responsible to deal with the frame
277 std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
278 for (auto &entry : framesToRedeliver) {
279 commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
280 }
281 }
282
283 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)284 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
285 {
286 if (onEnd) { // LCOV_EXCL_BR_LINE
287 TaskAction onSendEndTask = [onEnd, result]() {
288 LOGD("[CommAggr][SendEndTask] Before On Send End.");
289 onEnd(result, true);
290 LOGD("[CommAggr][SendEndTask] After On Send End.");
291 };
292 int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
293 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
294 LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
295 }
296 }
297 }
298 }
299
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)300 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
301 FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
302 {
303 if (inBuff == nullptr) {
304 return -E_INVALID_ARGS;
305 }
306
307 if (!ReGenerateLocalSourceIdIfNeed()) {
308 delete inBuff;
309 inBuff = nullptr;
310 DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
311 LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
312 return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
313 }
314 bool sendLabelExchange = true;
315 if (dbStatusAdapter_ != nullptr) {
316 sendLabelExchange = dbStatusAdapter_->IsSendLabelExchange();
317 }
318 PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType,
319 sendLabelExchange};
320 int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
321 if (errCode != E_OK) {
322 LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
323 return errCode;
324 }
325 {
326 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
327 sendRecord_[info.frameId] = {};
328 }
329 SendTask task{inBuff, dstTarget, onEnd, info.frameId, true};
330 if (inConfig.nonBlock) {
331 errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
332 } else {
333 errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
334 }
335 if (errCode != E_OK) {
336 LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
337 return errCode;
338 }
339 TriggerSendData();
340 LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u", dstTarget.c_str(), info.frameId);
341 return E_OK;
342 }
343
EnableCommunicatorNotFoundFeedback(bool isEnable)344 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
345 {
346 isCommunicatorNotFoundFeedbackEnable_ = isEnable;
347 }
348
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const349 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
350 {
351 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
352 auto pair = versionMap_.find(target);
353 if (pair == versionMap_.end()) { // LCOV_EXCL_BR_LINE
354 return -E_NOT_FOUND;
355 }
356 outVersion = pair->second;
357 return E_OK;
358 }
359
SendDataRoutine()360 void CommunicatorAggregator::SendDataRoutine()
361 {
362 while (!shutdown_) {
363 if (scheduler_.GetNoDelayTaskCount() == 0) {
364 std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
365 LOGI("[CommAggr][Routine] Send done and sleep.");
366 wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
367 LOGI("[CommAggr][Routine] Send continue.");
368 wakingSignal_ = false;
369 continue;
370 }
371 SendOnceData();
372 }
373 }
374
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)375 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
376 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
377 {
378 bool taskNeedFinalize = true;
379 int errCode = E_OK;
380 ResetFrameRecordIfNeed(inTask.frameId, mtu);
381 uint32_t startIndex;
382 {
383 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
384 startIndex = sendRecord_[inTask.frameId].sendIndex;
385 }
386 uint64_t currentSendSequenceId = IncreaseSendSequenceId(inTask.dstTarget);
387 for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()) && inTask.isValid; ++index) {
388 auto &entry = eachPacket[index];
389 LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
390 ", packetLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
391 ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
392 errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second, totalLength);
393 {
394 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
395 sendRecord_[inTask.frameId].sendIndex = index;
396 }
397 if (errCode == -E_WAIT_RETRY) {
398 LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
399 taskNeedFinalize = false;
400 break;
401 } else if (errCode != E_OK) {
402 LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
403 break;
404 } else {
405 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
406 retryCount_[inTask.dstTarget] = 0;
407 }
408 }
409 if (errCode == -E_WAIT_RETRY) {
410 RetrySendTaskIfNeed(inTask.dstTarget, currentSendSequenceId);
411 }
412 if (taskNeedFinalize) {
413 TaskFinalizer(inTask, errCode);
414 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
415 sendRecord_.erase(inTask.frameId);
416 }
417 }
418
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)419 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
420 {
421 int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
422 if (errCode != E_OK) {
423 bool notTimeout = true;
424 auto retryFunc = [this, inPrio, &inTask]()->bool {
425 if (this->shutdown_) {
426 delete inTask.buffer;
427 inTask.buffer = nullptr;
428 return true;
429 }
430 int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
431 if (retCode != E_OK) {
432 return false;
433 }
434 return true;
435 };
436
437 if (timeout == 0) { // Unlimited retry
438 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
439 retryCv_.wait(retryUniqueLock, retryFunc);
440 } else {
441 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
442 notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
443 }
444
445 if (shutdown_) {
446 return E_OK;
447 }
448 if (!notTimeout) {
449 return -E_TIMEOUT;
450 }
451 }
452 return E_OK;
453 }
454
TaskFinalizer(const SendTask & inTask,int result)455 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
456 {
457 // Call the OnSendEnd if need
458 if (inTask.onEnd) {
459 LOGD("[CommAggr][TaskFinal] On Send End.");
460 inTask.onEnd(result, true);
461 }
462 // Finalize the task that just scheduled
463 int errCode = scheduler_.FinalizeLastScheduleTask();
464 // Notify Sendable To All Communicator If Need
465 if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
466 retryCv_.notify_all();
467 }
468 if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
469 NotifySendableToAllCommunicator();
470 }
471 }
472
NotifySendableToAllCommunicator()473 void CommunicatorAggregator::NotifySendableToAllCommunicator()
474 {
475 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
476 for (auto &userCommMap : commMap_) {
477 for (auto &entry : userCommMap.second) {
478 // Ignore nonactivated communicator
479 if (entry.second.second) {
480 entry.second.first->OnSendAvailable();
481 }
482 }
483 }
484 }
485
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const DataUserInfoProc & userInfoProc)486 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
487 const DataUserInfoProc &userInfoProc)
488 {
489 ProtocolProto::DisplayPacketInformation(bytes, length);
490 ParseResult packetResult;
491 int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
492 if (errCode != E_OK) {
493 LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
494 if (errCode == -E_VERSION_NOT_SUPPORT) {
495 TriggerVersionNegotiation(srcTarget);
496 }
497 return;
498 }
499
500 // Update version of remote target
501 SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
502 if (dbStatusAdapter_ != nullptr) {
503 dbStatusAdapter_->SetRemoteOptimizeCommunication(srcTarget, !packetResult.IsSendLabelExchange());
504 }
505 if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
506 LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
507 return;
508 }
509
510 if (packetResult.IsFragment()) {
511 OnFragmentReceive(srcTarget, bytes, length, packetResult, userInfoProc);
512 } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
513 errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
514 if (errCode != E_OK) {
515 LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
516 }
517 } else {
518 errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userInfoProc);
519 if (errCode != E_OK) {
520 LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
521 }
522 }
523 }
524
OnTargetChange(const std::string & target,bool isConnect)525 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
526 {
527 if (target.empty()) {
528 LOGE("[CommAggr][OnTarget] Target empty string.");
529 return;
530 }
531 // For process level target change
532 {
533 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
534 if (onConnectHandle_) {
535 onConnectHandle_(target, isConnect);
536 LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
537 } else {
538 LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
539 }
540 }
541 std::set<LabelType> relatedLabels;
542 // For communicator level target change
543 if (isConnect) {
544 int errCode = commLinker_->TargetOnline(target, relatedLabels);
545 if (errCode != E_OK) {
546 LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
547 }
548 } else {
549 commLinker_->TargetOffline(target, relatedLabels);
550 }
551 // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
552 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
553 for (auto &userCommMap : commMap_) {
554 for (auto &entry: userCommMap.second) {
555 // Ignore nonactivated communicator
556 if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
557 entry.second.first->OnConnectChange(target, isConnect);
558 }
559 }
560 }
561 }
562
OnSendable(const std::string & target)563 void CommunicatorAggregator::OnSendable(const std::string &target)
564 {
565 int errCode = scheduler_.NoDelayTaskByTarget(target);
566 if (errCode != E_OK) {
567 LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
568 return;
569 }
570 TriggerSendData();
571 }
572
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)573 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
574 const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
575 {
576 int errorNo = E_OK;
577 ParseResult frameResult;
578 SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
579 if (errorNo != E_OK) {
580 LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
581 return;
582 }
583 if (frameBuffer == nullptr) {
584 LOGW("[CommAggr][Receive] Combine undone.");
585 return;
586 }
587
588 int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
589 if (errCode != E_OK) {
590 LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
591 delete frameBuffer;
592 frameBuffer = nullptr;
593 if (errCode == -E_VERSION_NOT_SUPPORT) {
594 TriggerVersionNegotiation(srcTarget);
595 }
596 return;
597 }
598
599 if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
600 errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
601 if (errCode != E_OK) {
602 LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
603 }
604 delete frameBuffer;
605 frameBuffer = nullptr;
606 } else {
607 errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userInfoProc);
608 if (errCode != E_OK) {
609 LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
610 }
611 }
612 }
613
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)614 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
615 {
616 if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
617 int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
618 inResult.GetLabelExchangeSequenceId());
619 if (errCode != E_OK) {
620 LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
621 return errCode;
622 }
623 } else {
624 std::map<LabelType, bool> changedLabels;
625 int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
626 inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
627 if (errCode != E_OK) {
628 LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
629 return errCode;
630 }
631 NotifyConnectChange(srcTarget, changedLabels);
632 }
633 return E_OK;
634 }
635
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)636 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
637 uint32_t length, const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
638 {
639 SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
640 if (buffer == nullptr) {
641 LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
642 return -E_OUT_OF_MEMORY;
643 }
644 int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
645 ProtocolProto::GetAppLayerFrameHeaderLength());
646 if (errCode != E_OK) {
647 LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
648 delete buffer;
649 buffer = nullptr;
650 return -E_INTERNAL_ERROR;
651 }
652 return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userInfoProc);
653 }
654
655 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
656 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
657 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
658 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
659 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
660 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
661 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
662 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
663 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
664 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
665 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
666 // in the same callback thread finally causing DeadLock on commMapMutex_.
667 // #### SO #### we have to make a change described below
668 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
669 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
670 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
671 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
672 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const DataUserInfoProc & userInfoProc)673 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
674 const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
675 {
676 LabelType toLabel = inResult.GetCommLabel();
677 std::string userId;
678 int ret = GetDataUserId(inResult, toLabel, userInfoProc, userId);
679 if (ret != E_OK) {
680 LOGE("[CommAggr][AppReceive] get data user id err, ret=%d", ret);
681 delete inFrameBuffer;
682 inFrameBuffer = nullptr;
683 return ret;
684 }
685 {
686 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
687 int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel, userId);
688 if (errCode == E_OK) { // Attention: Here is equal to E_OK
689 return E_OK;
690 }
691 }
692 LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
693 int errCode = -E_NOT_FOUND;
694 {
695 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
696 if (onCommLackHandle_) {
697 errCode = onCommLackHandle_(toLabel, userId);
698 LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
699 } else {
700 LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
701 }
702 }
703 // Here we have to lock commMapMutex_ and search communicator again.
704 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
705 int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel, userId);
706 if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
707 LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
708 return E_OK;
709 }
710 // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
711 if (errCode != E_OK) {
712 TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
713 delete inFrameBuffer;
714 inFrameBuffer = nullptr;
715 return errCode; // The caller will display errCode in log
716 }
717 // Do Retention, the retainer is responsible to deal with the frame
718 retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
719 inFrameBuffer = nullptr;
720 return E_OK;
721 }
722
GetDataUserId(const ParseResult & inResult,const LabelType & toLabel,const DataUserInfoProc & userInfoProc,std::string & userId)723 int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel,
724 const DataUserInfoProc &userInfoProc, std::string &userId)
725 {
726 if (userInfoProc.processCommunicator == nullptr) {
727 LOGE("[CommAggr][GetDataUserId] processCommunicator is nullptr");
728 return E_INVALID_ARGS;
729 }
730 std::string label(toLabel.begin(), toLabel.end());
731 std::vector<UserInfo> userInfos;
732 DBStatus ret = userInfoProc.processCommunicator->GetDataUserInfo(userInfoProc.data, userInfoProc.length, label,
733 userInfos);
734 LOGI("[CommAggr][GetDataUserId] get data user info, ret=%d", ret);
735 if (ret == NO_PERMISSION) {
736 LOGE("[CommAggr][GetDataUserId] userId dismatched, drop packet");
737 return ret;
738 }
739 if (userInfos.size() >= 1) {
740 userId = userInfos[0].receiveUser;
741 } else {
742 LOGW("[CommAggr][GetDataUserId] userInfos is empty");
743 }
744 return E_OK;
745 }
746
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel,const std::string & userId)747 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
748 SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const std::string &userId)
749 {
750 // Ignore nonactivated communicator, which is regarded as inexistent
751 if (commMap_[userId].count(toLabel) != 0 && commMap_[userId].at(toLabel).second) {
752 commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
753 // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
754 inFrameBuffer = nullptr;
755 return E_OK;
756 }
757 Communicator *communicator = nullptr;
758 bool isEmpty = false;
759 for (auto &userCommMap : commMap_) {
760 for (auto &entry : userCommMap.second) {
761 if (entry.first == toLabel && entry.second.second) {
762 communicator = entry.second.first;
763 isEmpty = userCommMap.first.empty();
764 LOGW("[CommAggr][TryDeliver] Found communicator of %s, but required user is %s",
765 userCommMap.first.c_str(), userId.c_str());
766 break;
767 }
768 }
769 if (communicator != nullptr) {
770 break;
771 }
772 }
773 if (communicator != nullptr && (userId.empty() || isEmpty)) {
774 communicator->OnBufferReceive(srcTarget, inFrameBuffer);
775 inFrameBuffer = nullptr;
776 return E_OK;
777 }
778 LOGE("[CommAggr][TryDeliver] Communicator not found");
779 return -E_NOT_FOUND;
780 }
781
RegCallbackToAdapter()782 int CommunicatorAggregator::RegCallbackToAdapter()
783 {
784 RefObject::IncObjRef(this); // Reference to be hold by adapter
785 int errCode = adapterHandle_->RegBytesReceiveCallback(
786 [this](const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
787 const DataUserInfoProc &userInfoProc) {
788 OnBytesReceive(srcTarget, bytes, length, userInfoProc);
789 }, [this]() { RefObject::DecObjRef(this); });
790 if (errCode != E_OK) {
791 RefObject::DecObjRef(this); // Rollback in case reg failed
792 return errCode;
793 }
794
795 RefObject::IncObjRef(this); // Reference to be hold by adapter
796 errCode = adapterHandle_->RegTargetChangeCallback(
797 [this](const std::string &target, bool isConnect) { OnTargetChange(target, isConnect); },
798 [this]() { RefObject::DecObjRef(this); });
799 if (errCode != E_OK) {
800 RefObject::DecObjRef(this); // Rollback in case reg failed
801 return errCode;
802 }
803
804 RefObject::IncObjRef(this); // Reference to be hold by adapter
805 errCode = adapterHandle_->RegSendableCallback([this](const std::string &target, int deviceCommErrCode) {
806 LOGI("[CommAggr] Send able dev=%.3s, deviceCommErrCode=%d", target.c_str(), deviceCommErrCode);
807 if (deviceCommErrCode == E_OK) {
808 (void)IncreaseSendSequenceId(target);
809 OnSendable(target);
810 }
811 scheduler_.SetDeviceCommErrCode(target, deviceCommErrCode);
812 },
813 [this]() { RefObject::DecObjRef(this); });
814 if (errCode != E_OK) {
815 RefObject::DecObjRef(this); // Rollback in case reg failed
816 return errCode;
817 }
818
819 return E_OK;
820 }
821
UnRegCallbackFromAdapter()822 void CommunicatorAggregator::UnRegCallbackFromAdapter()
823 {
824 adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
825 adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
826 adapterHandle_->RegSendableCallback(nullptr, nullptr);
827 if (dbStatusAdapter_ != nullptr) {
828 dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr, nullptr);
829 }
830 }
831
GenerateLocalSourceId()832 void CommunicatorAggregator::GenerateLocalSourceId()
833 {
834 std::string identity;
835 adapterHandle_->GetLocalIdentity(identity);
836 // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
837 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
838 uint64_t identityHash = Hash::HashFunc(identity);
839 if (identityHash != localSourceId_) {
840 LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
841 }
842 localSourceId_ = identityHash;
843 }
844
ReGenerateLocalSourceIdIfNeed()845 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
846 {
847 // The deviceId will change when switch user from A to B
848 // We can't listen to the user change, because it's hard to ensure the timing is correct.
849 // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
850 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
851 GenerateLocalSourceId();
852 return (localSourceId_ != 0);
853 }
854
TriggerVersionNegotiation(const std::string & dstTarget)855 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
856 {
857 LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
858 int errCode = E_OK;
859 SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
860 if (errCode != E_OK) {
861 LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
862 return;
863 }
864
865 TaskConfig config{true, 0, Priority::HIGH};
866 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
867 if (errCode != E_OK) {
868 LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
869 // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
870 delete buffer;
871 buffer = nullptr;
872 }
873 }
874
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)875 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
876 const LabelType &dstLabel, const SerialBuffer *inOriFrame)
877 {
878 if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
879 return;
880 }
881 int errCode = E_OK;
882 Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
883 if (message == nullptr) {
884 if (errCode == -E_VERSION_NOT_SUPPORT) {
885 TriggerVersionNegotiation(dstTarget);
886 }
887 return;
888 }
889 // Message is release in TriggerCommunicatorNotFoundFeedback
890 TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
891 }
892
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)893 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
894 const LabelType &dstLabel, Message* &oriMsg)
895 {
896 if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
897 LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
898 // Do not have to do feedback if the message is not a request type message
899 delete oriMsg;
900 oriMsg = nullptr;
901 return;
902 }
903
904 LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
905 oriMsg->SetMessageType(TYPE_RESPONSE);
906 oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
907
908 int errCode = E_OK;
909 SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
910 delete oriMsg;
911 oriMsg = nullptr;
912 if (errCode != E_OK) {
913 LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
914 return;
915 }
916
917 TaskConfig config{true, 0, Priority::HIGH};
918 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
919 if (errCode != E_OK) {
920 LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
921 // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
922 delete buffer;
923 buffer = nullptr;
924 }
925 }
926
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)927 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
928 {
929 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
930 versionMap_[target] = version;
931 }
932
GetExtendHeaderHandle(const ExtendInfo & paramInfo)933 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
934 {
935 if (adapterHandle_ == nullptr) {
936 return nullptr;
937 }
938 return adapterHandle_->GetExtendHeaderHandle(paramInfo);
939 }
940
OnRemoteDBStatusChange(const std::string & devInfo,const std::vector<DBInfo> & dbInfos)941 void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos)
942 {
943 std::map<LabelType, bool> changedLabels;
944 for (const auto &dbInfo: dbInfos) {
945 std::string label = DBCommon::GenerateHashLabel(dbInfo);
946 LabelType labelType(label.begin(), label.end());
947 changedLabels[labelType] = dbInfo.isNeedSync;
948 }
949 if (commLinker_ != nullptr) {
950 commLinker_->UpdateOnlineLabels(devInfo, changedLabels);
951 }
952 NotifyConnectChange(devInfo, changedLabels);
953 }
954
NotifyConnectChange(const std::string & srcTarget,const std::map<LabelType,bool> & changedLabels)955 void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget,
956 const std::map<LabelType, bool> &changedLabels)
957 {
958 if (commLinker_ != nullptr && !commLinker_->IsRemoteTargetOnline(srcTarget)) {
959 LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str());
960 for (const auto &entry : changedLabels) {
961 LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
962 }
963 return;
964 }
965 // Do target change notify
966 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
967 for (auto &entry : changedLabels) {
968 for (auto &userCommMap : commMap_) {
969 // Ignore nonactivated communicator
970 if (userCommMap.second.count(entry.first) != 0 && userCommMap.second.at(entry.first).second) {
971 LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.",
972 VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
973 userCommMap.second.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
974 }
975 }
976 }
977 }
978
RegDBChangeCallback()979 void CommunicatorAggregator::RegDBChangeCallback()
980 {
981 if (dbStatusAdapter_ != nullptr) {
982 dbStatusAdapter_->SetDBStatusChangeCallback(
983 [this](const std::string &devInfo, const std::vector<DBInfo> &dbInfos) {
984 OnRemoteDBStatusChange(devInfo, dbInfos);
985 },
986 [this]() {
987 if (commLinker_ != nullptr) {
988 (void)commLinker_->TriggerLabelExchangeEvent(false);
989 }
990 },
991 [this](const std::string &dev) {
992 if (commLinker_ != nullptr) {
993 std::set<LabelType> relatedLabels;
994 (void)commLinker_->TargetOnline(dev, relatedLabels);
995 }
996 });
997 }
998 }
InitSendThread()999 void CommunicatorAggregator::InitSendThread()
1000 {
1001 if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
1002 return;
1003 }
1004 exclusiveThread_ = std::thread([this] { SendDataRoutine(); });
1005 useExclusiveThread_ = true;
1006 }
1007
SendOnceData()1008 void CommunicatorAggregator::SendOnceData()
1009 {
1010 SendTask taskToSend;
1011 uint32_t totalLength = 0;
1012 int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
1013 if (errCode != E_OK) {
1014 return; // Not possible to happen
1015 }
1016 // <vector, extendHeadSize>
1017 std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
1018 uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
1019 if (taskToSend.buffer == nullptr) {
1020 LOGE("[CommAggr] buffer of taskToSend is nullptr.");
1021 return;
1022 }
1023 errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
1024 if (errCode != E_OK) {
1025 LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
1026 TaskFinalizer(taskToSend, errCode);
1027 return;
1028 }
1029 // <addr, <extendHeadSize, totalLen>>
1030 std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
1031 if (piecePackets.empty()) {
1032 // Case that no need to split a frame, just use original buffer as a packet
1033 std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
1034 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
1035 entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
1036 entry.second.first = taskToSend.buffer->GetExtendHeadLength();
1037 entry.second.second = tmpEntry.second + entry.second.first;
1038 eachPacket.push_back(entry);
1039 } else {
1040 for (auto &entry : piecePackets) {
1041 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
1042 {entry.second, entry.first.size()}};
1043 eachPacket.push_back(tmpEntry);
1044 }
1045 }
1046
1047 SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
1048 }
1049
TriggerSendData()1050 void CommunicatorAggregator::TriggerSendData()
1051 {
1052 if (useExclusiveThread_) {
1053 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
1054 wakingSignal_ = true;
1055 wakingCv_.notify_one();
1056 return;
1057 }
1058 {
1059 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1060 if (sendTaskStart_) {
1061 return;
1062 }
1063 sendTaskStart_ = true;
1064 }
1065 RefObject::IncObjRef(this);
1066 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
1067 LOGI("[CommAggr] Send thread start.");
1068 while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1069 SendOnceData();
1070 }
1071 {
1072 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
1073 sendTaskStart_ = false;
1074 }
1075 if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
1076 TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
1077 }
1078 finalizeCv_.notify_one();
1079 RefObject::DecObjRef(this);
1080 LOGI("[CommAggr] Send thread end.");
1081 });
1082 if (errCode != E_OK) {
1083 LOGW("[CommAggr] Trigger send data failed %d", errCode);
1084 RefObject::DecObjRef(this);
1085 }
1086 }
1087
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)1088 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
1089 {
1090 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
1091 if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
1092 sendRecord_[frameId].splitMtu = mtu;
1093 sendRecord_[frameId].sendIndex = 0u;
1094 }
1095 }
1096
RetrySendTaskIfNeed(const std::string & target,uint64_t sendSequenceId)1097 void CommunicatorAggregator::RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId)
1098 {
1099 if (IsRetryOutOfLimit(target)) {
1100 LOGD("[CommAggr] Retry send task is out of limit! target is %s{private}", target.c_str());
1101 scheduler_.InvalidSendTask(target);
1102 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1103 retryCount_[target] = 0;
1104 } else {
1105 if (sendSequenceId != GetSendSequenceId(target)) {
1106 LOGD("[CommAggr] %.3s Send sequence id has changed", target.c_str());
1107 return;
1108 }
1109 scheduler_.DelayTaskByTarget(target);
1110 RetrySendTask(target, sendSequenceId);
1111 }
1112 }
1113
RetrySendTask(const std::string & target,uint64_t sendSequenceId)1114 void CommunicatorAggregator::RetrySendTask(const std::string &target, uint64_t sendSequenceId)
1115 {
1116 int32_t currentRetryCount = 0;
1117 {
1118 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1119 retryCount_[target]++;
1120 currentRetryCount = retryCount_[target];
1121 LOGD("[CommAggr] Target %s{private} retry count is %" PRId32, target.c_str(), currentRetryCount);
1122 }
1123 TimerId timerId = 0u;
1124 RefObject::IncObjRef(this);
1125 (void)RuntimeContext::GetInstance()->SetTimer(GetNextRetryInterval(target, currentRetryCount),
1126 [this, target, sendSequenceId](TimerId id) {
1127 if (sendSequenceId == GetSendSequenceId(target)) {
1128 OnSendable(target);
1129 } else {
1130 LOGD("[CommAggr] %.3s Send sequence id has changed in timer", target.c_str());
1131 }
1132 RefObject::DecObjRef(this);
1133 return -E_END_TIMER;
1134 }, nullptr, timerId);
1135 }
1136
IsRetryOutOfLimit(const std::string & target)1137 bool CommunicatorAggregator::IsRetryOutOfLimit(const std::string &target)
1138 {
1139 std::lock_guard<std::mutex> autoLock(retryCountMutex_);
1140 return retryCount_[target] >= MAX_SEND_RETRY;
1141 }
1142
GetNextRetryInterval(const std::string & target,int32_t currentRetryCount)1143 int32_t CommunicatorAggregator::GetNextRetryInterval(const std::string &target, int32_t currentRetryCount)
1144 {
1145 uint32_t timeout = DBConstant::MIN_TIMEOUT;
1146 if (adapterHandle_ != nullptr) {
1147 timeout = adapterHandle_->GetTimeout(target);
1148 }
1149 return static_cast<int32_t>(timeout) * currentRetryCount / RETRY_TIME_SPLIT;
1150 }
1151
GetSendSequenceId(const std::string & target)1152 uint64_t CommunicatorAggregator::GetSendSequenceId(const std::string &target)
1153 {
1154 std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1155 return sendSequence_[target];
1156 }
1157
IncreaseSendSequenceId(const std::string & target)1158 uint64_t CommunicatorAggregator::IncreaseSendSequenceId(const std::string &target)
1159 {
1160 std::lock_guard<std::mutex> autoLock(sendSequenceMutex_);
1161 return ++sendSequence_[target];
1162 }
1163 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
1164 } // namespace DistributedDB
1165