1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_aggregator.h"
17
18 #include <sstream>
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "hash.h"
24 #include "log_print.h"
25 #include "protocol_proto.h"
26
27 namespace DistributedDB {
28 namespace {
GetThreadId()29 inline std::string GetThreadId()
30 {
31 std::stringstream stream;
32 stream << std::this_thread::get_id();
33 return stream.str();
34 }
35 }
36
37 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
38
CommunicatorAggregator()39 CommunicatorAggregator::CommunicatorAggregator()
40 : shutdown_(false),
41 incFrameId_(0),
42 localSourceId_(0)
43 {
44 }
45
~CommunicatorAggregator()46 CommunicatorAggregator::~CommunicatorAggregator()
47 {
48 scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
49 adapterHandle_ = nullptr;
50 commLinker_ = nullptr;
51 }
52
Initialize(IAdapter * inAdapter)53 int CommunicatorAggregator::Initialize(IAdapter *inAdapter)
54 {
55 if (inAdapter == nullptr) {
56 return -E_INVALID_ARGS;
57 }
58 adapterHandle_ = inAdapter;
59
60 combiner_.Initialize();
61 retainer_.Initialize();
62 scheduler_.Initialize();
63
64 int errCode;
65 commLinker_ = new (std::nothrow) CommunicatorLinker(this);
66 if (commLinker_ == nullptr) {
67 errCode = -E_OUT_OF_MEMORY;
68 goto ROLL_BACK;
69 }
70 commLinker_->Initialize();
71
72 errCode = RegCallbackToAdapter();
73 if (errCode != E_OK) {
74 goto ROLL_BACK;
75 }
76
77 errCode = adapterHandle_->StartAdapter();
78 if (errCode != E_OK) {
79 LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
80 goto ROLL_BACK;
81 }
82 GenerateLocalSourceId();
83
84 shutdown_ = false;
85 InitSendThread();
86 return E_OK;
87 ROLL_BACK:
88 UnRegCallbackFromAdapter();
89 if (commLinker_ != nullptr) {
90 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
91 commLinker_ = nullptr;
92 }
93 // Scheduler do not need to do finalize in this roll_back
94 retainer_.Finalize();
95 combiner_.Finalize();
96 return errCode;
97 }
98
Finalize()99 void CommunicatorAggregator::Finalize()
100 {
101 shutdown_ = true;
102 retryCv_.notify_all();
103 {
104 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
105 wakingSignal_ = true;
106 wakingCv_.notify_one();
107 }
108 if (useExclusiveThread_) {
109 exclusiveThread_.join(); // Waiting thread to thoroughly quit
110 LOGI("[CommAggr][Final] Sub Thread Exit.");
111 } else {
112 LOGI("[CommAggr][Final] Begin wait send task exit.");
113 std::unique_lock<std::mutex> scheduleSendTaskLock(scheduleSendTaskMutex_);
114 finalizeCv_.wait(scheduleSendTaskLock, [this]() {
115 return !sendTaskStart_;
116 });
117 LOGI("[CommAggr][Final] End wait send task exit.");
118 }
119 scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
120
121 adapterHandle_->StopAdapter();
122 UnRegCallbackFromAdapter();
123 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
124
125 // No callback now and later, so combiner, retainer and linker can finalize or delete safely
126 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
127 commLinker_ = nullptr;
128 retainer_.Finalize();
129 combiner_.Finalize();
130 }
131
AllocCommunicator(uint64_t commLabel,int & outErrorNo)132 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
133 {
134 uint64_t netOrderLabel = HostToNet(commLabel);
135 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
136 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
137 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
138 realLabel[i] = eachByte[i];
139 }
140 return AllocCommunicator(realLabel, outErrorNo);
141 }
142
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)143 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
144 {
145 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
146 LOGI("[CommAggr][Alloc] Label=%.3s.", VEC_TO_STR(commLabel));
147 if (commLabel.size() != COMM_LABEL_LENGTH) {
148 outErrorNo = -E_INVALID_ARGS;
149 return nullptr;
150 }
151
152 if (commMap_.count(commLabel) != 0) {
153 outErrorNo = -E_ALREADY_ALLOC;
154 return nullptr;
155 }
156
157 Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
158 if (commPtr == nullptr) {
159 outErrorNo = -E_OUT_OF_MEMORY;
160 return nullptr;
161 }
162 commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
163 return commPtr;
164 }
165
ReleaseCommunicator(ICommunicator * inCommunicator)166 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
167 {
168 if (inCommunicator == nullptr) {
169 return;
170 }
171 Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
172 LabelType commLabel = commPtr->GetCommunicatorLabel();
173 LOGI("[CommAggr][Release] Label=%.3s.", VEC_TO_STR(commLabel));
174
175 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
176 if (commMap_.count(commLabel) == 0) {
177 LOGE("[CommAggr][Release] Not Found.");
178 return;
179 }
180 commMap_.erase(commLabel);
181 RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
182
183 int errCode = commLinker_->DecreaseLocalLabel(commLabel);
184 if (errCode != E_OK) {
185 LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
186 }
187 }
188
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)189 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
190 const Finalizer &inOper)
191 {
192 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
193 return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
194 }
195
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)196 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
197 {
198 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
199 int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
200 if (onConnect && errCode == E_OK) {
201 // Register action and success
202 std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
203 for (auto &entry : onlineTargets) {
204 LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
205 onConnectHandle_(entry, true);
206 }
207 }
208 return errCode;
209 }
210
GetCommunicatorAggregatorMtuSize() const211 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
212 {
213 return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
214 }
215
GetCommunicatorAggregatorMtuSize(const std::string & target) const216 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
217 {
218 return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
219 }
220
GetCommunicatorAggregatorTimeout() const221 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
222 {
223 return adapterHandle_->GetTimeout();
224 }
225
GetCommunicatorAggregatorTimeout(const std::string & target) const226 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
227 {
228 return adapterHandle_->GetTimeout(target);
229 }
230
IsDeviceOnline(const std::string & device) const231 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
232 {
233 return adapterHandle_->IsDeviceOnline(device);
234 }
235
GetLocalIdentity(std::string & outTarget) const236 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
237 {
238 return adapterHandle_->GetLocalIdentity(outTarget);
239 }
240
ActivateCommunicator(const LabelType & commLabel)241 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
242 {
243 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
244 LOGI("[CommAggr][Activate] Label=%.3s.", VEC_TO_STR(commLabel));
245 if (commMap_.count(commLabel) == 0) {
246 LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
247 return;
248 }
249 if (commMap_.at(commLabel).second) {
250 return;
251 }
252 commMap_.at(commLabel).second = true; // Mark this communicator as activated
253
254 // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
255 // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
256 std::set<std::string> onlineTargets;
257 int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
258 if (errCode != E_OK) {
259 LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
260 // Do not return here
261 }
262 for (auto &entry : onlineTargets) {
263 LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
264 commMap_.at(commLabel).first->OnConnectChange(entry, true);
265 }
266 // Do Redeliver, the communicator is responsible to deal with the frame
267 std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
268 for (auto &entry : framesToRedeliver) {
269 commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
270 }
271 }
272
273 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)274 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
275 {
276 if (onEnd) {
277 TaskAction onSendEndTask = [onEnd, result]() {
278 LOGD("[CommAggr][SendEndTask] Before On Send End.");
279 onEnd(result);
280 LOGD("[CommAggr][SendEndTask] After On Send End.");
281 };
282 int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
283 if (errCode != E_OK) {
284 LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
285 }
286 }
287 }
288 }
289
ScheduleSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)290 int CommunicatorAggregator::ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
291 FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
292 {
293 if (inBuff == nullptr) {
294 return -E_INVALID_ARGS;
295 }
296
297 if (!ReGenerateLocalSourceIdIfNeed()) {
298 delete inBuff;
299 inBuff = nullptr;
300 DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
301 LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
302 return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
303 }
304 PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType};
305 int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
306 if (errCode != E_OK) {
307 LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
308 return errCode;
309 }
310 {
311 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
312 sendRecord_[info.frameId] = {};
313 }
314 SendTask task{inBuff, dstTarget, onEnd, info.frameId};
315 if (inConfig.nonBlock) {
316 errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
317 } else {
318 errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
319 }
320 if (errCode != E_OK) {
321 LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
322 return errCode;
323 }
324 TriggerSendData();
325 LOGI("[CommAggr][Create] Exit ok, dev=%.3s, frameId=%u", dstTarget.c_str(), info.frameId);
326 return E_OK;
327 }
328
EnableCommunicatorNotFoundFeedback(bool isEnable)329 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
330 {
331 isCommunicatorNotFoundFeedbackEnable_ = isEnable;
332 }
333
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const334 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
335 {
336 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
337 auto pair = versionMap_.find(target);
338 if (pair == versionMap_.end()) {
339 return -E_NOT_FOUND;
340 }
341 outVersion = pair->second;
342 return E_OK;
343 }
344
SendDataRoutine()345 void CommunicatorAggregator::SendDataRoutine()
346 {
347 while (!shutdown_) {
348 if (scheduler_.GetNoDelayTaskCount() == 0) {
349 std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
350 LOGI("[CommAggr][Routine] Send done and sleep.");
351 wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
352 LOGI("[CommAggr][Routine] Send continue.");
353 wakingSignal_ = false;
354 continue;
355 }
356 SendOnceData();
357 }
358 }
359
SendPacketsAndDisposeTask(const SendTask & inTask,uint32_t mtu,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket,uint32_t totalLength)360 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
361 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength)
362 {
363 bool taskNeedFinalize = true;
364 int errCode = E_OK;
365 ResetFrameRecordIfNeed(inTask.frameId, mtu);
366 uint32_t startIndex;
367 {
368 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
369 startIndex = sendRecord_[inTask.frameId].sendIndex;
370 }
371 for (uint32_t index = startIndex; index < static_cast<uint32_t>(eachPacket.size()); ++index) {
372 auto &entry = eachPacket[index];
373 LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%" PRIu32
374 ", totalLength=%" PRIu32 ".", inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
375 ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
376 errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second, totalLength);
377 {
378 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
379 sendRecord_[inTask.frameId].sendIndex = index;
380 }
381 if (errCode == -E_WAIT_RETRY) {
382 LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
383 scheduler_.DelayTaskByTarget(inTask.dstTarget);
384 taskNeedFinalize = false;
385 break;
386 } else if (errCode != E_OK) {
387 LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
388 break;
389 }
390 }
391 if (errCode == -E_WAIT_RETRY) {
392 const int RETRY_INTERVAL = 1000;
393 TimerId timerId = 0u;
394 const std::string target = inTask.dstTarget;
395 RefObject::IncObjRef(this);
396 errCode = RuntimeContext::GetInstance()->SetTimer(RETRY_INTERVAL, [this, target](TimerId id) {
397 OnSendable(target);
398 RefObject::DecObjRef(this);
399 return -E_END_TIMER;
400 }, nullptr, timerId);
401 }
402 if (taskNeedFinalize) {
403 TaskFinalizer(inTask, errCode);
404 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
405 sendRecord_.erase(inTask.frameId);
406 }
407 }
408
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)409 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
410 {
411 int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
412 if (errCode != E_OK) {
413 bool notTimeout = true;
414 auto retryFunc = [this, inPrio, &inTask]()->bool {
415 if (this->shutdown_) {
416 delete inTask.buffer;
417 inTask.buffer = nullptr;
418 return true;
419 }
420 int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
421 if (retCode != E_OK) {
422 return false;
423 }
424 return true;
425 };
426
427 if (timeout == 0) { // Unlimited retry
428 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
429 retryCv_.wait(retryUniqueLock, retryFunc);
430 } else {
431 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
432 notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
433 }
434
435 if (shutdown_) {
436 return E_OK;
437 }
438 if (!notTimeout) {
439 return -E_TIMEOUT;
440 }
441 }
442 return E_OK;
443 }
444
TaskFinalizer(const SendTask & inTask,int result)445 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
446 {
447 // Call the OnSendEnd if need
448 if (inTask.onEnd) {
449 LOGD("[CommAggr][TaskFinal] On Send End.");
450 inTask.onEnd(result);
451 }
452 // Finalize the task that just scheduled
453 int errCode = scheduler_.FinalizeLastScheduleTask();
454 // Notify Sendable To All Communicator If Need
455 if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
456 retryCv_.notify_all();
457 }
458 if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
459 NotifySendableToAllCommunicator();
460 }
461 }
462
NotifySendableToAllCommunicator()463 void CommunicatorAggregator::NotifySendableToAllCommunicator()
464 {
465 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
466 for (auto &entry : commMap_) {
467 // Ignore nonactivated communicator
468 if (entry.second.second) {
469 entry.second.first->OnSendAvailable();
470 }
471 }
472 }
473
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)474 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
475 const std::string &userId)
476 {
477 ProtocolProto::DisplayPacketInformation(bytes, length);
478 ParseResult packetResult;
479 int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
480 if (errCode != E_OK) {
481 LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
482 if (errCode == -E_VERSION_NOT_SUPPORT) {
483 TriggerVersionNegotiation(srcTarget);
484 }
485 return;
486 }
487
488 // Update version of remote target
489 SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
490 if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
491 LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
492 return;
493 }
494
495 if (packetResult.IsFragment()) {
496 OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
497 } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
498 errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
499 if (errCode != E_OK) {
500 LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
501 }
502 } else {
503 errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
504 if (errCode != E_OK) {
505 LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
506 }
507 }
508 }
509
OnTargetChange(const std::string & target,bool isConnect)510 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
511 {
512 if (target.empty()) {
513 LOGE("[CommAggr][OnTarget] Target empty string.");
514 return;
515 }
516 // For process level target change
517 {
518 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
519 if (onConnectHandle_) {
520 onConnectHandle_(target, isConnect);
521 LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
522 } else {
523 LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
524 }
525 }
526 std::set<LabelType> relatedLabels;
527 // For communicator level target change
528 if (isConnect) {
529 int errCode = commLinker_->TargetOnline(target, relatedLabels);
530 if (errCode != E_OK) {
531 LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
532 }
533 } else {
534 commLinker_->TargetOffline(target, relatedLabels);
535 }
536 // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
537 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
538 for (auto &entry : commMap_) {
539 // Ignore nonactivated communicator
540 if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
541 entry.second.first->OnConnectChange(target, isConnect);
542 }
543 }
544 }
545
OnSendable(const std::string & target)546 void CommunicatorAggregator::OnSendable(const std::string &target)
547 {
548 int errCode = scheduler_.NoDelayTaskByTarget(target);
549 if (errCode != E_OK) {
550 LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
551 return;
552 }
553 TriggerSendData();
554 }
555
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)556 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
557 const ParseResult &inResult, const std::string &userId)
558 {
559 int errorNo = E_OK;
560 ParseResult frameResult;
561 SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
562 if (errorNo != E_OK) {
563 LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
564 return;
565 }
566 if (frameBuffer == nullptr) {
567 LOGW("[CommAggr][Receive] Combine undone.");
568 return;
569 }
570
571 int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
572 if (errCode != E_OK) {
573 LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
574 delete frameBuffer;
575 frameBuffer = nullptr;
576 if (errCode == -E_VERSION_NOT_SUPPORT) {
577 TriggerVersionNegotiation(srcTarget);
578 }
579 return;
580 }
581
582 if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
583 errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
584 if (errCode != E_OK) {
585 LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
586 }
587 delete frameBuffer;
588 frameBuffer = nullptr;
589 } else {
590 errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
591 if (errCode != E_OK) {
592 LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
593 }
594 }
595 }
596
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)597 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
598 {
599 if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
600 int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
601 inResult.GetLabelExchangeSequenceId());
602 if (errCode != E_OK) {
603 LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
604 return errCode;
605 }
606 } else {
607 std::map<LabelType, bool> changedLabels;
608 int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
609 inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
610 if (errCode != E_OK) {
611 LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
612 return errCode;
613 }
614 if (!commLinker_->IsRemoteTargetOnline(srcTarget)) {
615 LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str());
616 for (const auto &entry : changedLabels) {
617 LOGW("[CommAggr][CommReceive] REMEMBER: label=%.3s, inOnline=%d.", VEC_TO_STR(entry.first),
618 entry.second);
619 }
620 return E_OK;
621 }
622 // Do target change notify
623 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
624 for (auto &entry : changedLabels) {
625 // Ignore nonactivated communicator
626 if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
627 LOGI("[CommAggr][CommReceive] label=%.3s, srcTarget=%s{private}, isOnline=%d.",
628 VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
629 commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
630 }
631 }
632 }
633 return E_OK;
634 }
635
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)636 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
637 uint32_t length, const ParseResult &inResult, const std::string &userId)
638 {
639 SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
640 if (buffer == nullptr) {
641 LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
642 return -E_OUT_OF_MEMORY;
643 }
644 int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
645 ProtocolProto::GetAppLayerFrameHeaderLength());
646 if (errCode != E_OK) {
647 LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
648 delete buffer;
649 buffer = nullptr;
650 return -E_INTERNAL_ERROR;
651 }
652 return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
653 }
654
655 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
656 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
657 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
658 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
659 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
660 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
661 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
662 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
663 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
664 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
665 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
666 // in the same callback thread finally causing DeadLock on commMapMutex_.
667 // #### SO #### we have to make a change described below
668 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
669 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
670 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
671 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
672 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)673 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
674 const ParseResult &inResult, const std::string &userId)
675 {
676 LabelType toLabel = inResult.GetCommLabel();
677 {
678 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
679 int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
680 if (errCode == E_OK) { // Attention: Here is equal to E_OK
681 return E_OK;
682 }
683 }
684 LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel));
685 int errCode = -E_NOT_FOUND;
686 {
687 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
688 if (onCommLackHandle_) {
689 errCode = onCommLackHandle_(toLabel, userId);
690 LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
691 } else {
692 LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
693 }
694 }
695 // Here we have to lock commMapMutex_ and search communicator again.
696 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
697 int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
698 if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
699 LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
700 return E_OK;
701 }
702 // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
703 if (errCode != E_OK) {
704 TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
705 delete inFrameBuffer;
706 inFrameBuffer = nullptr;
707 return errCode; // The caller will display errCode in log
708 }
709 // Do Retention, the retainer is responsible to deal with the frame
710 retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
711 inFrameBuffer = nullptr;
712 return E_OK;
713 }
714
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)715 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
716 SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
717 {
718 // Ignore nonactivated communicator, which is regarded as inexistent
719 if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
720 commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
721 // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
722 inFrameBuffer = nullptr;
723 return E_OK;
724 }
725 return -E_NOT_FOUND;
726 }
727
RegCallbackToAdapter()728 int CommunicatorAggregator::RegCallbackToAdapter()
729 {
730 RefObject::IncObjRef(this); // Reference to be hold by adapter
731 int errCode = adapterHandle_->RegBytesReceiveCallback(
732 std::bind(&CommunicatorAggregator::OnBytesReceive, this, std::placeholders::_1, std::placeholders::_2,
733 std::placeholders::_3, std::placeholders::_4),
734 [this]() { RefObject::DecObjRef(this); });
735 if (errCode != E_OK) {
736 RefObject::DecObjRef(this); // Rollback in case reg failed
737 return errCode;
738 }
739
740 RefObject::IncObjRef(this); // Reference to be hold by adapter
741 errCode = adapterHandle_->RegTargetChangeCallback(
742 std::bind(&CommunicatorAggregator::OnTargetChange, this, std::placeholders::_1, std::placeholders::_2),
743 [this]() { RefObject::DecObjRef(this); });
744 if (errCode != E_OK) {
745 RefObject::DecObjRef(this); // Rollback in case reg failed
746 return errCode;
747 }
748
749 RefObject::IncObjRef(this); // Reference to be hold by adapter
750 errCode = adapterHandle_->RegSendableCallback(
751 std::bind(&CommunicatorAggregator::OnSendable, this, std::placeholders::_1),
752 [this]() { RefObject::DecObjRef(this); });
753 if (errCode != E_OK) {
754 RefObject::DecObjRef(this); // Rollback in case reg failed
755 return errCode;
756 }
757
758 return E_OK;
759 }
760
UnRegCallbackFromAdapter()761 void CommunicatorAggregator::UnRegCallbackFromAdapter()
762 {
763 adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
764 adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
765 adapterHandle_->RegSendableCallback(nullptr, nullptr);
766 }
767
GenerateLocalSourceId()768 void CommunicatorAggregator::GenerateLocalSourceId()
769 {
770 std::string identity;
771 adapterHandle_->GetLocalIdentity(identity);
772 // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
773 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
774 uint64_t identityHash = Hash::HashFunc(identity);
775 if (identityHash != localSourceId_) {
776 LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%" PRIu64, identity.c_str(), ULL(identityHash));
777 }
778 localSourceId_ = identityHash;
779 }
780
ReGenerateLocalSourceIdIfNeed()781 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
782 {
783 // The deviceId will change when switch user from A to B
784 // We can't listen to the user change, because it's hard to ensure the timing is correct.
785 // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
786 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
787 GenerateLocalSourceId();
788 return (localSourceId_ != 0);
789 }
790
TriggerVersionNegotiation(const std::string & dstTarget)791 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
792 {
793 LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
794 int errCode = E_OK;
795 SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
796 if (errCode != E_OK) {
797 LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
798 return;
799 }
800
801 TaskConfig config{true, 0, Priority::HIGH};
802 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::EMPTY, config);
803 if (errCode != E_OK) {
804 LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
805 // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
806 delete buffer;
807 buffer = nullptr;
808 }
809 }
810
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)811 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
812 const LabelType &dstLabel, const SerialBuffer *inOriFrame)
813 {
814 if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
815 return;
816 }
817 int errCode = E_OK;
818 Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
819 if (message == nullptr) {
820 if (errCode == -E_VERSION_NOT_SUPPORT) {
821 TriggerVersionNegotiation(dstTarget);
822 }
823 return;
824 }
825 // Message is release in TriggerCommunicatorNotFoundFeedback
826 TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
827 }
828
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)829 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
830 const LabelType &dstLabel, Message* &oriMsg)
831 {
832 if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
833 LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
834 // Do not have to do feedback if the message is not a request type message
835 delete oriMsg;
836 oriMsg = nullptr;
837 return;
838 }
839
840 LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
841 oriMsg->SetMessageType(TYPE_RESPONSE);
842 oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
843
844 int errCode = E_OK;
845 SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
846 delete oriMsg;
847 oriMsg = nullptr;
848 if (errCode != E_OK) {
849 LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
850 return;
851 }
852
853 TaskConfig config{true, 0, Priority::HIGH};
854 errCode = ScheduleSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
855 if (errCode != E_OK) {
856 LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
857 // if send fails, free buffer, otherwise buffer will be taked over by ScheduleSendTask
858 delete buffer;
859 buffer = nullptr;
860 }
861 }
862
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)863 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
864 {
865 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
866 versionMap_[target] = version;
867 }
868
GetExtendHeaderHandle(const ExtendInfo & paramInfo)869 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
870 {
871 if (adapterHandle_ == nullptr) {
872 return nullptr;
873 }
874 return adapterHandle_->GetExtendHeaderHandle(paramInfo);
875 }
876
InitSendThread()877 void CommunicatorAggregator::InitSendThread()
878 {
879 if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) {
880 return;
881 }
882 exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this);
883 useExclusiveThread_ = true;
884 }
885
SendOnceData()886 void CommunicatorAggregator::SendOnceData()
887 {
888 SendTask taskToSend;
889 uint32_t totalLength = 0;
890 int errCode = scheduler_.ScheduleOutSendTask(taskToSend, totalLength);
891 if (errCode != E_OK) {
892 return; // Not possible to happen
893 }
894 // <vector, extendHeadSize>
895 std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
896 uint32_t mtu = adapterHandle_->GetMtuSize(taskToSend.dstTarget);
897 errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer, mtu, piecePackets);
898 if (errCode != E_OK) {
899 LOGE("[CommAggr] Split frame fail, errCode=%d.", errCode);
900 TaskFinalizer(taskToSend, errCode);
901 return;
902 }
903 // <addr, <extendHeadSize, totalLen>>
904 std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
905 if (piecePackets.empty()) {
906 // Case that no need to split a frame, just use original buffer as a packet
907 std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
908 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
909 entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
910 entry.second.first = taskToSend.buffer->GetExtendHeadLength();
911 entry.second.second = tmpEntry.second + entry.second.first;
912 eachPacket.push_back(entry);
913 } else {
914 for (auto &entry : piecePackets) {
915 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
916 {entry.second, entry.first.size()}};
917 eachPacket.push_back(tmpEntry);
918 }
919 }
920
921 SendPacketsAndDisposeTask(taskToSend, mtu, eachPacket, totalLength);
922 }
923
TriggerSendData()924 void CommunicatorAggregator::TriggerSendData()
925 {
926 if (useExclusiveThread_) {
927 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
928 wakingSignal_ = true;
929 wakingCv_.notify_one();
930 return;
931 }
932 {
933 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
934 if (sendTaskStart_) {
935 return;
936 }
937 sendTaskStart_ = true;
938 }
939 RefObject::IncObjRef(this);
940 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
941 LOGI("[CommAggr] Send thread start.");
942 while (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
943 SendOnceData();
944 }
945 {
946 std::lock_guard<std::mutex> autoLock(scheduleSendTaskMutex_);
947 sendTaskStart_ = false;
948 }
949 if (!shutdown_ && scheduler_.GetNoDelayTaskCount() != 0) {
950 TriggerSendData(); // avoid sendTaskStart_ was mark false after trigger thread check it
951 }
952 finalizeCv_.notify_one();
953 RefObject::DecObjRef(this);
954 LOGI("[CommAggr] Send thread end.");
955 });
956 if (errCode != E_OK) {
957 LOGW("[CommAggr] Trigger send data failed %d", errCode);
958 RefObject::DecObjRef(this);
959 }
960 }
961
ResetFrameRecordIfNeed(const uint32_t frameId,const uint32_t mtu)962 void CommunicatorAggregator::ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu)
963 {
964 std::lock_guard<std::mutex> autoLock(sendRecordMutex_);
965 if (sendRecord_[frameId].splitMtu == 0u || sendRecord_[frameId].splitMtu != mtu) {
966 sendRecord_[frameId].splitMtu = mtu;
967 sendRecord_[frameId].sendIndex = 0u;
968 }
969 }
970
971 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
972 } // namespace DistributedDB
973