• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_aggregator.h"
17 
18 #include "hash.h"
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "log_print.h"
24 #include "protocol_proto.h"
25 
26 namespace DistributedDB {
27 namespace {
GetThreadId()28 inline std::string GetThreadId()
29 {
30     std::stringstream stream;
31     stream << std::this_thread::get_id();
32     return stream.str();
33 }
34 }
35 
36 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
37 
CommunicatorAggregator()38 CommunicatorAggregator::CommunicatorAggregator()
39     : shutdown_(false),
40       incFrameId_(0),
41       localSourceId_(0)
42 {
43 }
44 
~CommunicatorAggregator()45 CommunicatorAggregator::~CommunicatorAggregator()
46 {
47     scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
48     adapterHandle_ = nullptr;
49     commLinker_ = nullptr;
50 }
51 
Initialize(IAdapter * inAdapter)52 int CommunicatorAggregator::Initialize(IAdapter *inAdapter)
53 {
54     if (inAdapter == nullptr) {
55         return -E_INVALID_ARGS;
56     }
57     adapterHandle_ = inAdapter;
58 
59     combiner_.Initialize();
60     retainer_.Initialize();
61     scheduler_.Initialize();
62 
63     int errCode;
64     commLinker_ = new (std::nothrow) CommunicatorLinker(this);
65     if (commLinker_ == nullptr) {
66         errCode = -E_OUT_OF_MEMORY;
67         goto ROLL_BACK;
68     }
69     commLinker_->Initialize();
70 
71     errCode = RegCallbackToAdapter();
72     if (errCode != E_OK) {
73         goto ROLL_BACK;
74     }
75 
76     errCode = adapterHandle_->StartAdapter();
77     if (errCode != E_OK) {
78         LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
79         goto ROLL_BACK;
80     }
81     GenerateLocalSourceId();
82 
83     shutdown_ = false;
84     exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this);
85     return E_OK;
86 ROLL_BACK:
87     UnRegCallbackFromAdapter();
88     if (commLinker_ != nullptr) {
89         RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
90         commLinker_ = nullptr;
91     }
92     // Scheduler do not need to do finalize in this roll_back
93     retainer_.Finalize();
94     combiner_.Finalize();
95     return errCode;
96 }
97 
Finalize()98 void CommunicatorAggregator::Finalize()
99 {
100     shutdown_ = true;
101     retryCv_.notify_all();
102     {
103         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
104         wakingSignal_ = true;
105         wakingCv_.notify_one();
106     }
107     exclusiveThread_.join(); // Waiting thread to thoroughly quit
108     LOGI("[CommAggr][Final] Sub Thread Exit.");
109     scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
110 
111     adapterHandle_->StopAdapter();
112     UnRegCallbackFromAdapter();
113     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
114 
115     // No callback now and later, so combiner, retainer and linker can finalize or delete safely
116     RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
117     commLinker_ = nullptr;
118     retainer_.Finalize();
119     combiner_.Finalize();
120 }
121 
AllocCommunicator(uint64_t commLabel,int & outErrorNo)122 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
123 {
124     uint64_t netOrderLabel = HostToNet(commLabel);
125     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
126     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
127     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
128         realLabel[i] = eachByte[i];
129     }
130     return AllocCommunicator(realLabel, outErrorNo);
131 }
132 
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)133 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
134 {
135     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
136     LOGI("[CommAggr][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
137     if (commLabel.size() != COMM_LABEL_LENGTH) {
138         outErrorNo = -E_INVALID_ARGS;
139         return nullptr;
140     }
141 
142     if (commMap_.count(commLabel) != 0) {
143         outErrorNo = -E_ALREADY_ALLOC;
144         return nullptr;
145     }
146 
147     Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
148     if (commPtr == nullptr) {
149         outErrorNo = -E_OUT_OF_MEMORY;
150         return nullptr;
151     }
152     commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
153     return commPtr;
154 }
155 
ReleaseCommunicator(ICommunicator * inCommunicator)156 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
157 {
158     if (inCommunicator == nullptr) {
159         return;
160     }
161     Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
162     LabelType commLabel = commPtr->GetCommunicatorLabel();
163     LOGI("[CommAggr][Release] Label=%.6s.", VEC_TO_STR(commLabel));
164 
165     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
166     if (commMap_.count(commLabel) == 0) {
167         LOGE("[CommAggr][Release] Not Found.");
168         return;
169     }
170     commMap_.erase(commLabel);
171     RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
172 
173     int errCode = commLinker_->DecreaseLocalLabel(commLabel);
174     if (errCode != E_OK) {
175         LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
176     }
177 }
178 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)179 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
180     const Finalizer &inOper)
181 {
182     std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
183     return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
184 }
185 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)186 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
187 {
188     std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
189     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
190     if (onConnect && errCode == E_OK) {
191         // Register action and success
192         std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
193         for (auto &entry : onlineTargets) {
194             LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
195             onConnectHandle_(entry, true);
196         }
197     }
198     return errCode;
199 }
200 
GetCommunicatorAggregatorMtuSize() const201 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
202 {
203     return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
204 }
205 
GetCommunicatorAggregatorMtuSize(const std::string & target) const206 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
207 {
208     return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
209 }
210 
GetCommunicatorAggregatorTimeout() const211 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
212 {
213     return adapterHandle_->GetTimeout();
214 }
215 
GetCommunicatorAggregatorTimeout(const std::string & target) const216 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
217 {
218     return adapterHandle_->GetTimeout(target);
219 }
220 
IsDeviceOnline(const std::string & device) const221 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
222 {
223     return adapterHandle_->IsDeviceOnline(device);
224 }
225 
GetLocalIdentity(std::string & outTarget) const226 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
227 {
228     return adapterHandle_->GetLocalIdentity(outTarget);
229 }
230 
ActivateCommunicator(const LabelType & commLabel)231 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
232 {
233     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
234     LOGI("[CommAggr][Activate] Label=%.6s.", VEC_TO_STR(commLabel));
235     if (commMap_.count(commLabel) == 0) {
236         LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
237         return;
238     }
239     if (commMap_.at(commLabel).second) {
240         LOGW("[CommAggr][Activate] Communicator of this label had been activated.");
241         return;
242     }
243     commMap_.at(commLabel).second = true; // Mark this communicator as activated
244 
245     // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
246     // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
247     std::set<std::string> onlineTargets;
248     int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
249     if (errCode != E_OK) {
250         LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
251         // Do not return here
252     }
253     for (auto &entry : onlineTargets) {
254         LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
255         commMap_.at(commLabel).first->OnConnectChange(entry, true);
256     }
257     // Do Redeliver, the communicator is responsible to deal with the frame
258     std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
259     for (auto &entry : framesToRedeliver) {
260         commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
261     }
262 }
263 
264 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)265 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
266 {
267     if (onEnd) {
268         TaskAction onSendEndTask = [onEnd, result]() {
269             LOGD("[CommAggr][SendEndTask] Before On Send End.");
270             onEnd(result);
271             LOGD("[CommAggr][SendEndTask] After On Send End.");
272         };
273         int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
274         if (errCode != E_OK) {
275             LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
276         }
277     }
278 }
279 }
280 
CreateSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)281 int CommunicatorAggregator::CreateSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
282     FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
283 {
284     if (inBuff == nullptr) {
285         return -E_INVALID_ARGS;
286     }
287     LOGI("[CommAggr][Create] Enter, thread=%s, target=%s{private}, type=%d, nonBlock=%d, timeout=%u, prio=%d.",
288         GetThreadId().c_str(), dstTarget.c_str(), static_cast<int>(inType), inConfig.nonBlock, inConfig.timeout,
289         static_cast<int>(inConfig.prio));
290 
291     if (!ReGenerateLocalSourceIdIfNeed()) {
292         delete inBuff;
293         inBuff = nullptr;
294         DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
295         LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
296         return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
297     }
298     PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType};
299     int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
300     if (errCode != E_OK) {
301         LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
302         return errCode;
303     }
304 
305     SendTask task{inBuff, dstTarget, onEnd};
306     if (inConfig.nonBlock) {
307         errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
308     } else {
309         errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
310     }
311     if (errCode != E_OK) {
312         LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
313         return errCode;
314     }
315 
316     std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
317     wakingSignal_ = true;
318     wakingCv_.notify_one();
319     LOGI("[CommAggr][Create] Exit ok, thread=%s, frameId=%u", GetThreadId().c_str(), info.frameId); // Delete In Future
320     return E_OK;
321 }
322 
EnableCommunicatorNotFoundFeedback(bool isEnable)323 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
324 {
325     isCommunicatorNotFoundFeedbackEnable_ = isEnable;
326 }
327 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const328 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
329 {
330     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
331     auto pair = versionMap_.find(target);
332     if (pair == versionMap_.end()) {
333         return -E_NOT_FOUND;
334     }
335     outVersion = pair->second;
336     return E_OK;
337 }
338 
SendDataRoutine()339 void CommunicatorAggregator::SendDataRoutine()
340 {
341     while (!shutdown_) {
342         if (scheduler_.GetNoDelayTaskCount() == 0) {
343             std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
344             LOGI("[CommAggr][Routine] Send done and sleep."); // Delete In Future
345             wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
346             LOGI("[CommAggr][Routine] Send continue."); // Delete In Future
347             wakingSignal_ = false;
348             continue;
349         }
350 
351         SendTask taskToSend;
352         int errCode = scheduler_.ScheduleOutSendTask(taskToSend);
353         if (errCode != E_OK) {
354             continue; // Not possible to happen
355         }
356         // <vector, extendHeadSize>
357         std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
358         errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer,
359             adapterHandle_->GetMtuSize(taskToSend.dstTarget), piecePackets);
360         if (errCode != E_OK) {
361             LOGE("[CommAggr][Routine] Split frame fail, errCode=%d.", errCode);
362             TaskFinalizer(taskToSend, errCode);
363             continue;
364         }
365         // <addr, <extendHeadSize, totalLen>>
366         std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
367         if (piecePackets.size() == 0) {
368             // Case that no need to split a frame, just use original buffer as a packet
369             std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
370             std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
371             entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
372             entry.second.first = taskToSend.buffer->GetExtendHeadLength();
373             entry.second.second = tmpEntry.second + entry.second.first;
374             eachPacket.push_back(entry);
375         } else {
376             for (auto &entry : piecePackets) {
377                 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
378                     {entry.second, entry.first.size()}};
379                 eachPacket.push_back(tmpEntry);
380             }
381         }
382 
383         SendPacketsAndDisposeTask(taskToSend, eachPacket);
384     }
385 }
386 
SendPacketsAndDisposeTask(const SendTask & inTask,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket)387 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask,
388     const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket)
389 {
390     bool taskNeedFinalize = true;
391     int errCode = E_OK;
392     for (auto &entry : eachPacket) {
393         LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%u, totalLength=%u.",
394             inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
395         ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
396         errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second);
397         if (errCode == -E_WAIT_RETRY) {
398             LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
399             scheduler_.DelayTaskByTarget(inTask.dstTarget);
400             taskNeedFinalize = false;
401             break;
402         } else if (errCode != E_OK) {
403             LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
404             break;
405         }
406     }
407     if (taskNeedFinalize) {
408         TaskFinalizer(inTask, errCode);
409     }
410 }
411 
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)412 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
413 {
414     int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
415     if (errCode != E_OK) {
416         bool notTimeout = true;
417         auto retryFunc = [this, inPrio, &inTask]()->bool {
418             if (this->shutdown_) {
419                 delete inTask.buffer;
420                 inTask.buffer = nullptr;
421                 return true;
422             }
423             int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
424             if (retCode != E_OK) {
425                 return false;
426             }
427             return true;
428         };
429 
430         if (timeout == 0) { // Unlimited retry
431             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
432             retryCv_.wait(retryUniqueLock, retryFunc);
433         } else {
434             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
435             notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
436         }
437 
438         if (shutdown_) {
439             return E_OK;
440         }
441         if (!notTimeout) {
442             return -E_TIMEOUT;
443         }
444     }
445     return E_OK;
446 }
447 
TaskFinalizer(const SendTask & inTask,int result)448 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
449 {
450     // Call the OnSendEnd if need
451     if (inTask.onEnd) {
452         LOGD("[CommAggr][TaskFinal] On Send End.");
453         inTask.onEnd(result);
454     }
455     // Finalize the task that just scheduled
456     int errCode = scheduler_.FinalizeLastScheduleTask();
457     // Notify Sendable To All Communicator If Need
458     if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
459         retryCv_.notify_all();
460     }
461     if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
462         NotifySendableToAllCommunicator();
463     }
464 }
465 
NotifySendableToAllCommunicator()466 void CommunicatorAggregator::NotifySendableToAllCommunicator()
467 {
468     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
469     for (auto &entry : commMap_) {
470         // Ignore nonactivated communicator
471         if (entry.second.second) {
472             entry.second.first->OnSendAvailable();
473         }
474     }
475 }
476 
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)477 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
478     const std::string &userId)
479 {
480     ProtocolProto::DisplayPacketInformation(bytes, length); // For debug, delete in the future
481     ParseResult packetResult;
482     int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
483     if (errCode != E_OK) {
484         LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
485         if (errCode == -E_VERSION_NOT_SUPPORT) {
486             TriggerVersionNegotiation(srcTarget);
487         }
488         return;
489     }
490 
491     // Update version of remote target
492     SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
493     if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
494         LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
495         return;
496     }
497 
498     if (packetResult.IsFragment()) {
499         OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
500     } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
501         errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
502         if (errCode != E_OK) {
503             LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
504         }
505     } else {
506         errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
507         if (errCode != E_OK) {
508             LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
509         }
510     }
511 }
512 
OnTargetChange(const std::string & target,bool isConnect)513 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
514 {
515     if (target.empty()) {
516         LOGE("[CommAggr][OnTarget] Target empty string.");
517         return;
518     }
519     // For process level target change
520     {
521         std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
522         if (onConnectHandle_) {
523             onConnectHandle_(target, isConnect);
524             LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
525         } else {
526             LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
527         }
528     }
529     std::set<LabelType> relatedLabels;
530     // For communicator level target change
531     if (isConnect) {
532         int errCode = commLinker_->TargetOnline(target, relatedLabels);
533         if (errCode != E_OK) {
534             LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
535         }
536     } else {
537         int errCode = commLinker_->TargetOffline(target, relatedLabels);
538         if (errCode != E_OK) {
539             LOGE("[CommAggr][OnTarget] TargetOffline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
540         }
541     }
542     // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
543     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
544     for (auto &entry : commMap_) {
545         // Ignore nonactivated communicator
546         if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
547             entry.second.first->OnConnectChange(target, isConnect);
548         }
549     }
550 }
551 
OnSendable(const std::string & target)552 void CommunicatorAggregator::OnSendable(const std::string &target)
553 {
554     int errCode = scheduler_.NoDelayTaskByTarget(target);
555     if (errCode != E_OK) {
556         LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
557         return;
558     }
559     std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
560     wakingSignal_ = true;
561     wakingCv_.notify_one();
562 }
563 
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)564 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
565     const ParseResult &inResult, const std::string &userId)
566 {
567     int errorNo = E_OK;
568     ParseResult frameResult;
569     SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
570     if (errorNo != E_OK) {
571         LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
572         return;
573     }
574     if (frameBuffer == nullptr) {
575         LOGW("[CommAggr][Receive] Combine undone.");
576         return;
577     }
578 
579     int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
580     if (errCode != E_OK) {
581         LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
582         delete frameBuffer;
583         frameBuffer = nullptr;
584         if (errCode == -E_VERSION_NOT_SUPPORT) {
585             TriggerVersionNegotiation(srcTarget);
586         }
587         return;
588     }
589 
590     if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
591         errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
592         if (errCode != E_OK) {
593             LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
594         }
595         delete frameBuffer;
596         frameBuffer = nullptr;
597     } else {
598         errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
599         if (errCode != E_OK) {
600             LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
601         }
602     }
603 }
604 
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)605 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
606 {
607     if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
608         int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
609             inResult.GetLabelExchangeSequenceId());
610         if (errCode != E_OK) {
611             LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
612             return errCode;
613         }
614     } else {
615         std::map<LabelType, bool> changedLabels;
616         int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
617             inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
618         if (errCode != E_OK) {
619             LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
620             return errCode;
621         }
622         if (!commLinker_->IsRemoteTargetOnline(srcTarget)) {
623             LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str());
624             for (const auto &entry : changedLabels) {
625                 LOGW("[CommAggr][CommReceive] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
626             }
627             return E_OK;
628         }
629         // Do target change notify
630         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
631         for (auto &entry : changedLabels) {
632             // Ignore nonactivated communicator
633             if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
634                 LOGI("[CommAggr][CommReceive] label=%s, srcTarget=%s{private}, isOnline=%d.",
635                     VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
636                 commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
637             }
638         }
639     }
640     return E_OK;
641 }
642 
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)643 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
644     uint32_t length, const ParseResult &inResult, const std::string &userId)
645 {
646     SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
647     if (buffer == nullptr) {
648         LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
649         return -E_OUT_OF_MEMORY;
650     }
651     int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
652         ProtocolProto::GetAppLayerFrameHeaderLength());
653     if (errCode != E_OK) {
654         LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
655         delete buffer;
656         buffer = nullptr;
657         return -E_INTERNAL_ERROR;
658     }
659     return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
660 }
661 
662 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
663 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
664 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
665 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
666 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
667 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
668 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
669 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
670 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
671 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
672 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
673 // in the same callback thread finally causing DeadLock on commMapMutex_.
674 // #### SO #### we have to make a change described below
675 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
676 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
677 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
678 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
679 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)680 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
681     const ParseResult &inResult, const std::string &userId)
682 {
683     LabelType toLabel = inResult.GetCommLabel();
684     {
685         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
686         int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
687         if (errCode == E_OK) { // Attention: Here is equal to E_OK
688             return E_OK;
689         }
690     }
691     LOGI("[CommAggr][AppReceive] Communicator of %s not found or nonactivated.", VEC_TO_STR(toLabel));
692     int errCode = -E_NOT_FOUND;
693     {
694         std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
695         if (onCommLackHandle_) {
696             errCode = onCommLackHandle_(toLabel, userId);
697             LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
698         } else {
699             LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
700         }
701     }
702     // Here we have to lock commMapMutex_ and search communicator again.
703     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
704     int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
705     if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
706         LOGI("[CommAggr][AppReceive] Communicator of %s found after try again(rare case).", VEC_TO_STR(toLabel));
707         return E_OK;
708     }
709     // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
710     if (errCode != E_OK) {
711         TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
712         delete inFrameBuffer;
713         inFrameBuffer = nullptr;
714         return errCode; // The caller will display errCode in log
715     }
716     // Do Retention, the retainer is responsible to deal with the frame
717     retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
718     inFrameBuffer = nullptr;
719     return E_OK;
720 }
721 
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)722 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
723     SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
724 {
725     // Ignore nonactivated communicator, which is regarded as inexistent
726     if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
727         commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
728         // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
729         inFrameBuffer = nullptr;
730         return E_OK;
731     }
732     return -E_NOT_FOUND;
733 }
734 
RegCallbackToAdapter()735 int CommunicatorAggregator::RegCallbackToAdapter()
736 {
737     RefObject::IncObjRef(this); // Reference to be hold by adapter
738     int errCode = adapterHandle_->RegBytesReceiveCallback(
739         std::bind(&CommunicatorAggregator::OnBytesReceive, this, std::placeholders::_1, std::placeholders::_2,
740             std::placeholders::_3, std::placeholders::_4),
741         [this]() { RefObject::DecObjRef(this); });
742     if (errCode != E_OK) {
743         RefObject::DecObjRef(this); // Rollback in case reg failed
744         return errCode;
745     }
746 
747     RefObject::IncObjRef(this); // Reference to be hold by adapter
748     errCode = adapterHandle_->RegTargetChangeCallback(
749         std::bind(&CommunicatorAggregator::OnTargetChange, this, std::placeholders::_1, std::placeholders::_2),
750         [this]() { RefObject::DecObjRef(this); });
751     if (errCode != E_OK) {
752         RefObject::DecObjRef(this); // Rollback in case reg failed
753         return errCode;
754     }
755 
756     RefObject::IncObjRef(this); // Reference to be hold by adapter
757     errCode = adapterHandle_->RegSendableCallback(
758         std::bind(&CommunicatorAggregator::OnSendable, this, std::placeholders::_1),
759         [this]() { RefObject::DecObjRef(this); });
760     if (errCode != E_OK) {
761         RefObject::DecObjRef(this); // Rollback in case reg failed
762         return errCode;
763     }
764 
765     return E_OK;
766 }
767 
UnRegCallbackFromAdapter()768 void CommunicatorAggregator::UnRegCallbackFromAdapter()
769 {
770     adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
771     adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
772     adapterHandle_->RegSendableCallback(nullptr, nullptr);
773 }
774 
GenerateLocalSourceId()775 void CommunicatorAggregator::GenerateLocalSourceId()
776 {
777     std::string identity;
778     adapterHandle_->GetLocalIdentity(identity);
779     // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
780     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
781     uint64_t identityHash = Hash::HashFunc(identity);
782     if (identityHash != localSourceId_) {
783         LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%llu.", identity.c_str(), ULL(identityHash));
784     }
785     localSourceId_ = identityHash;
786 }
787 
ReGenerateLocalSourceIdIfNeed()788 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
789 {
790     // The deviceId will change when switch user from A to B
791     // We can't listen to the user change, because it's hard to ensure the timing is correct.
792     // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
793     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
794     GenerateLocalSourceId();
795     return (localSourceId_ != 0);
796 }
797 
TriggerVersionNegotiation(const std::string & dstTarget)798 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
799 {
800     LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
801     int errCode = E_OK;
802     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
803     if (errCode != E_OK) {
804         LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
805         return;
806     }
807 
808     TaskConfig config{true, 0, Priority::HIGH};
809     errCode = CreateSendTask(dstTarget, buffer, FrameType::EMPTY, config);
810     if (errCode != E_OK) {
811         LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
812         // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
813         delete buffer;
814         buffer = nullptr;
815     }
816 }
817 
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)818 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
819     const LabelType &dstLabel, const SerialBuffer *inOriFrame)
820 {
821     if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
822         return;
823     }
824     int errCode = E_OK;
825     Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
826     if (message == nullptr) {
827         if (errCode == -E_VERSION_NOT_SUPPORT) {
828             TriggerVersionNegotiation(dstTarget);
829         }
830         return;
831     }
832     // Message is release in TriggerCommunicatorNotFoundFeedback
833     TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
834 }
835 
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)836 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
837     const LabelType &dstLabel, Message* &oriMsg)
838 {
839     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
840         LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
841         // Do not have to do feedback if the message is not a request type message
842         delete oriMsg;
843         oriMsg = nullptr;
844         return;
845     }
846 
847     LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
848     oriMsg->SetMessageType(TYPE_RESPONSE);
849     oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
850 
851     int errCode = E_OK;
852     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
853     delete oriMsg;
854     oriMsg = nullptr;
855     if (errCode != E_OK) {
856         LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
857         return;
858     }
859 
860     TaskConfig config{true, 0, Priority::HIGH};
861     errCode = CreateSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
862     if (errCode != E_OK) {
863         LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
864         // if send fails, free buffer, otherwise buffer will be taked over by CreateSendTask
865         delete buffer;
866         buffer = nullptr;
867     }
868 }
869 
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)870 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
871 {
872     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
873     versionMap_[target] = version;
874 }
875 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)876 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
877 {
878     if (adapterHandle_ == nullptr) {
879         return nullptr;
880     }
881     return adapterHandle_->GetExtendHeaderHandle(paramInfo);
882 }
883 
884 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
885 } // namespace DistributedDB
886