• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communicator_aggregator.h"
17 #include <new>
18 #include <sstream>
19 #include <utility>
20 #include <functional>
21 #include "hash.h"
22 #include "log_print.h"
23 #include "db_common.h"
24 #include "communicator.h"
25 #include "endian_convert.h"
26 #include "protocol_proto.h"
27 #include "communicator_linker.h"
28 
29 namespace DistributedDB {
30 namespace {
GetThreadId()31 inline std::string GetThreadId()
32 {
33     std::stringstream stream;
34     stream << std::this_thread::get_id();
35     return stream.str();
36 }
37 }
38 
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40 
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42     : shutdown_(false),
43       incFrameId_(0),
44       localSourceId_(0)
45 {
46 }
47 
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50     scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51     adapterHandle_ = nullptr;
52     commLinker_ = nullptr;
53 }
54 
Initialize(IAdapter * inAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter)
56 {
57     if (inAdapter == nullptr) {
58         return -E_INVALID_ARGS;
59     }
60     adapterHandle_ = inAdapter;
61 
62     combiner_.Initialize();
63     retainer_.Initialize();
64     scheduler_.Initialize();
65 
66     int errCode;
67     commLinker_ = new (std::nothrow) CommunicatorLinker(this);
68     if (commLinker_ == nullptr) {
69         errCode = -E_OUT_OF_MEMORY;
70         goto ROLL_BACK;
71     }
72     commLinker_->Initialize();
73 
74     errCode = RegCallbackToAdapter();
75     if (errCode != E_OK) {
76         goto ROLL_BACK;
77     }
78 
79     errCode = adapterHandle_->StartAdapter();
80     if (errCode != E_OK) {
81         LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82         goto ROLL_BACK;
83     }
84     GenerateLocalSourceId();
85 
86     shutdown_ = false;
87     exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this);
88     return E_OK;
89 ROLL_BACK:
90     UnRegCallbackFromAdapter();
91     if (commLinker_ != nullptr) {
92         RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
93         commLinker_ = nullptr;
94     }
95     // Scheduler do not need to do finalize in this roll_back
96     retainer_.Finalize();
97     combiner_.Finalize();
98     return errCode;
99 }
100 
Finalize()101 void CommunicatorAggregator::Finalize()
102 {
103     shutdown_ = true;
104     retryCv_.notify_all();
105     {
106         std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
107         wakingSignal_ = true;
108         wakingCv_.notify_one();
109     }
110     exclusiveThread_.join(); // Waiting thread to thoroughly quit
111     LOGI("[CommAggr][Final] Sub Thread Exit.");
112     scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
113 
114     adapterHandle_->StopAdapter();
115     UnRegCallbackFromAdapter();
116     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
117 
118     // No callback now and later, so combiner, retainer and linker can finalize or delete safely
119     RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
120     commLinker_ = nullptr;
121     retainer_.Finalize();
122     combiner_.Finalize();
123 }
124 
AllocCommunicator(uint64_t commLabel,int & outErrorNo)125 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
126 {
127     uint64_t netOrderLabel = HostToNet(commLabel);
128     uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
129     std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
130     for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
131         realLabel[i] = eachByte[i];
132     }
133     return AllocCommunicator(realLabel, outErrorNo);
134 }
135 
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)136 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
137 {
138     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
139     LOGI("[CommAggr][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
140     if (commLabel.size() != COMM_LABEL_LENGTH) {
141         outErrorNo = -E_INVALID_ARGS;
142         return nullptr;
143     }
144 
145     if (commMap_.count(commLabel) != 0) {
146         outErrorNo = -E_ALREADY_ALLOC;
147         return nullptr;
148     }
149 
150     Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
151     if (commPtr == nullptr) {
152         outErrorNo = -E_OUT_OF_MEMORY;
153         return nullptr;
154     }
155     commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
156     return commPtr;
157 }
158 
ReleaseCommunicator(ICommunicator * inCommunicator)159 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
160 {
161     if (inCommunicator == nullptr) {
162         return;
163     }
164     Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
165     LabelType commLabel = commPtr->GetCommunicatorLabel();
166     LOGI("[CommAggr][Release] Label=%.6s.", VEC_TO_STR(commLabel));
167 
168     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
169     if (commMap_.count(commLabel) == 0) {
170         LOGE("[CommAggr][Release] Not Found.");
171         return;
172     }
173     commMap_.erase(commLabel);
174     RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
175 
176     int errCode = commLinker_->DecreaseLocalLabel(commLabel);
177     if (errCode != E_OK) {
178         LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
179     }
180 }
181 
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)182 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
183     const Finalizer &inOper)
184 {
185     std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
186     return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
187 }
188 
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)189 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
190 {
191     std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
192     int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
193     if (onConnect && errCode == E_OK) {
194         // Register action and success
195         std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
196         for (auto &entry : onlineTargets) {
197             LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
198             onConnectHandle_(entry, true);
199         }
200     }
201     return errCode;
202 }
203 
GetCommunicatorAggregatorMtuSize() const204 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
205 {
206     return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
207 }
208 
GetCommunicatorAggregatorMtuSize(const std::string & target) const209 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
210 {
211     return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
212 }
213 
GetCommunicatorAggregatorTimeout() const214 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
215 {
216     return adapterHandle_->GetTimeout();
217 }
218 
GetCommunicatorAggregatorTimeout(const std::string & target) const219 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
220 {
221     return adapterHandle_->GetTimeout(target);
222 }
223 
IsDeviceOnline(const std::string & device) const224 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
225 {
226     return adapterHandle_->IsDeviceOnline(device);
227 }
228 
GetLocalIdentity(std::string & outTarget) const229 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
230 {
231     return adapterHandle_->GetLocalIdentity(outTarget);
232 }
233 
ActivateCommunicator(const LabelType & commLabel)234 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
235 {
236     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
237     LOGI("[CommAggr][Activate] Label=%.6s.", VEC_TO_STR(commLabel));
238     if (commMap_.count(commLabel) == 0) {
239         LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
240         return;
241     }
242     if (commMap_.at(commLabel).second) {
243         LOGW("[CommAggr][Activate] Communicator of this label had been activated.");
244         return;
245     }
246     commMap_.at(commLabel).second = true; // Mark this communicator as activated
247 
248     // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
249     // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
250     std::set<std::string> onlineTargets;
251     int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
252     if (errCode != E_OK) {
253         LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
254         // Do not return here
255     }
256     for (auto &entry : onlineTargets) {
257         LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
258         commMap_.at(commLabel).first->OnConnectChange(entry, true);
259     }
260     // Do Redeliver, the communicator is responsible to deal with the frame
261     std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
262     for (auto &entry : framesToRedeliver) {
263         commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
264     }
265 }
266 
267 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)268 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
269 {
270     if (onEnd) {
271         TaskAction onSendEndTask = [onEnd, result]() {
272             LOGD("[CommAggr][SendEndTask] Before On Send End.");
273             onEnd(result);
274             LOGD("[CommAggr][SendEndTask] After On Send End.");
275         };
276         int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
277         if (errCode != E_OK) {
278             LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
279         }
280     }
281 }
282 }
283 
CreateSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)284 int CommunicatorAggregator::CreateSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
285     FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
286 {
287     if (inBuff == nullptr) {
288         return -E_INVALID_ARGS;
289     }
290     LOGI("[CommAggr][Create] Enter, thread=%s, target=%s{private}, type=%d, nonBlock=%d, timeout=%u, prio=%d.",
291         GetThreadId().c_str(), dstTarget.c_str(), static_cast<int>(inType), inConfig.nonBlock, inConfig.timeout,
292         static_cast<int>(inConfig.prio));
293 
294     if (!ReGenerateLocalSourceIdIfNeed()) {
295         delete inBuff;
296         inBuff = nullptr;
297         DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
298         LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
299         return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
300     }
301     PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType};
302     int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
303     if (errCode != E_OK) {
304         LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
305         return errCode;
306     }
307 
308     SendTask task{inBuff, dstTarget, onEnd};
309     if (inConfig.nonBlock) {
310         errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
311     } else {
312         errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
313     }
314     if (errCode != E_OK) {
315         LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
316         return errCode;
317     }
318 
319     std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
320     wakingSignal_ = true;
321     wakingCv_.notify_one();
322     LOGI("[CommAggr][Create] Exit ok, thread=%s, frameId=%u", GetThreadId().c_str(), info.frameId); // Delete In Future
323     return E_OK;
324 }
325 
EnableCommunicatorNotFoundFeedback(bool isEnable)326 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
327 {
328     isCommunicatorNotFoundFeedbackEnable_ = isEnable;
329 }
330 
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const331 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
332 {
333     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
334     auto pair = versionMap_.find(target);
335     if (pair == versionMap_.end()) {
336         return -E_NOT_FOUND;
337     }
338     outVersion = pair->second;
339     return E_OK;
340 }
341 
SendDataRoutine()342 void CommunicatorAggregator::SendDataRoutine()
343 {
344     while (!shutdown_) {
345         if (scheduler_.GetNoDelayTaskCount() == 0) {
346             std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
347             LOGI("[CommAggr][Routine] Send done and sleep."); // Delete In Future
348             wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
349             LOGI("[CommAggr][Routine] Send continue."); // Delete In Future
350             wakingSignal_ = false;
351             continue;
352         }
353 
354         SendTask taskToSend;
355         int errCode = scheduler_.ScheduleOutSendTask(taskToSend);
356         if (errCode != E_OK) {
357             continue; // Not possible to happen
358         }
359         // <vector, extendHeadSize>
360         std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
361         errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer,
362             adapterHandle_->GetMtuSize(taskToSend.dstTarget), piecePackets);
363         if (errCode != E_OK) {
364             LOGE("[CommAggr][Routine] Split frame fail, errCode=%d.", errCode);
365             TaskFinalizer(taskToSend, errCode);
366             continue;
367         }
368         // <addr, <extendHeadSize, totalLen>>
369         std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
370         if (piecePackets.size() == 0) {
371             // Case that no need to split a frame, just use original buffer as a packet
372             std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
373             std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
374             entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
375             entry.second.first = taskToSend.buffer->GetExtendHeadLength();
376             entry.second.second = tmpEntry.second + entry.second.first;
377             eachPacket.push_back(entry);
378         } else {
379             for (auto &entry : piecePackets) {
380                 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
381                     {entry.second, entry.first.size()}};
382                 eachPacket.push_back(tmpEntry);
383             }
384         }
385 
386         SendPacketsAndDisposeTask(taskToSend, eachPacket);
387     }
388 }
389 
SendPacketsAndDisposeTask(const SendTask & inTask,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket)390 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask,
391     const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket)
392 {
393     bool taskNeedFinalize = true;
394     int errCode = E_OK;
395     for (auto &entry : eachPacket) {
396         LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%u, totalLength=%u.",
397             inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
398         ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
399         errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second);
400         if (errCode == -E_WAIT_RETRY) {
401             LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
402             scheduler_.DelayTaskByTarget(inTask.dstTarget);
403             taskNeedFinalize = false;
404             break;
405         } else if (errCode != E_OK) {
406             LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
407             break;
408         }
409     }
410     if (taskNeedFinalize) {
411         TaskFinalizer(inTask, errCode);
412     }
413 }
414 
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)415 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
416 {
417     int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
418     if (errCode != E_OK) {
419         bool notTimeout = true;
420         auto retryFunc = [this, inPrio, &inTask]()->bool {
421             if (this->shutdown_) {
422                 delete inTask.buffer;
423                 inTask.buffer = nullptr;
424                 return true;
425             }
426             int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
427             if (retCode != E_OK) {
428                 return false;
429             }
430             return true;
431         };
432 
433         if (timeout == 0) { // Unlimited retry
434             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
435             retryCv_.wait(retryUniqueLock, retryFunc);
436         } else {
437             std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
438             notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
439         }
440 
441         if (shutdown_) {
442             return E_OK;
443         }
444         if (!notTimeout) {
445             return -E_TIMEOUT;
446         }
447     }
448     return E_OK;
449 }
450 
TaskFinalizer(const SendTask & inTask,int result)451 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
452 {
453     // Call the OnSendEnd if need
454     if (inTask.onEnd) {
455         LOGD("[CommAggr][TaskFinal] On Send End.");
456         inTask.onEnd(result);
457     }
458     // Finalize the task that just scheduled
459     int errCode = scheduler_.FinalizeLastScheduleTask();
460     // Notify Sendable To All Communicator If Need
461     if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
462         retryCv_.notify_all();
463     }
464     if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
465         NotifySendableToAllCommunicator();
466     }
467 }
468 
NotifySendableToAllCommunicator()469 void CommunicatorAggregator::NotifySendableToAllCommunicator()
470 {
471     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
472     for (auto &entry : commMap_) {
473         // Ignore nonactivated communicator
474         if (entry.second.second) {
475             entry.second.first->OnSendAvailable();
476         }
477     }
478 }
479 
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)480 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
481     const std::string &userId)
482 {
483     ProtocolProto::DisplayPacketInformation(bytes, length); // For debug, delete in the future
484     ParseResult packetResult;
485     int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
486     if (errCode != E_OK) {
487         LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
488         if (errCode == -E_VERSION_NOT_SUPPORT) {
489             TriggerVersionNegotiation(srcTarget);
490         }
491         return;
492     }
493 
494     // Update version of remote target
495     SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
496     if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
497         LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
498         return;
499     }
500 
501     if (packetResult.IsFragment()) {
502         OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
503     } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
504         errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
505         if (errCode != E_OK) {
506             LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
507         }
508     } else {
509         errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
510         if (errCode != E_OK) {
511             LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
512         }
513     }
514 }
515 
OnTargetChange(const std::string & target,bool isConnect)516 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
517 {
518     if (target.empty()) {
519         LOGE("[CommAggr][OnTarget] Target empty string.");
520         return;
521     }
522     // For process level target change
523     {
524         std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
525         if (onConnectHandle_) {
526             onConnectHandle_(target, isConnect);
527             LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
528         } else {
529             LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
530         }
531     }
532     std::set<LabelType> relatedLabels;
533     // For communicator level target change
534     if (isConnect) {
535         int errCode = commLinker_->TargetOnline(target, relatedLabels);
536         if (errCode != E_OK) {
537             LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
538         }
539     } else {
540         int errCode = commLinker_->TargetOffline(target, relatedLabels);
541         if (errCode != E_OK) {
542             LOGE("[CommAggr][OnTarget] TargetOffline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
543         }
544     }
545     // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
546     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
547     for (auto &entry : commMap_) {
548         // Ignore nonactivated communicator
549         if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
550             entry.second.first->OnConnectChange(target, isConnect);
551         }
552     }
553 }
554 
OnSendable(const std::string & target)555 void CommunicatorAggregator::OnSendable(const std::string &target)
556 {
557     int errCode = scheduler_.NoDelayTaskByTarget(target);
558     if (errCode != E_OK) {
559         LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
560         return;
561     }
562     std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
563     wakingSignal_ = true;
564     wakingCv_.notify_one();
565 }
566 
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)567 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
568     const ParseResult &inResult, const std::string &userId)
569 {
570     int errorNo = E_OK;
571     ParseResult frameResult;
572     SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
573     if (errorNo != E_OK) {
574         LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
575         return;
576     }
577     if (frameBuffer == nullptr) {
578         LOGW("[CommAggr][Receive] Combine undone.");
579         return;
580     }
581 
582     int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
583     if (errCode != E_OK) {
584         LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
585         delete frameBuffer;
586         frameBuffer = nullptr;
587         if (errCode == -E_VERSION_NOT_SUPPORT) {
588             TriggerVersionNegotiation(srcTarget);
589         }
590         return;
591     }
592 
593     if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
594         errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
595         if (errCode != E_OK) {
596             LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
597         }
598         delete frameBuffer;
599         frameBuffer = nullptr;
600     } else {
601         errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
602         if (errCode != E_OK) {
603             LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
604         }
605     }
606 }
607 
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)608 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
609 {
610     if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
611         int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
612             inResult.GetLabelExchangeSequenceId());
613         if (errCode != E_OK) {
614             LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
615             return errCode;
616         }
617     } else {
618         std::map<LabelType, bool> changedLabels;
619         int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
620             inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
621         if (errCode != E_OK) {
622             LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
623             return errCode;
624         }
625         if (!commLinker_->IsRemoteTargetOnline(srcTarget)) {
626             LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str());
627             for (const auto &entry : changedLabels) {
628                 LOGW("[CommAggr][CommReceive] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
629             }
630             return E_OK;
631         }
632         // Do target change notify
633         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
634         for (auto &entry : changedLabels) {
635             // Ignore nonactivated communicator
636             if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
637                 LOGI("[CommAggr][CommReceive] label=%s, srcTarget=%s{private}, isOnline=%d.",
638                     VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
639                 commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
640             }
641         }
642     }
643     return E_OK;
644 }
645 
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)646 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
647     uint32_t length, const ParseResult &inResult, const std::string &userId)
648 {
649     SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
650     if (buffer == nullptr) {
651         LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
652         return -E_OUT_OF_MEMORY;
653     }
654     int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
655         ProtocolProto::GetAppLayerFrameHeaderLength());
656     if (errCode != E_OK) {
657         LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
658         delete buffer;
659         buffer = nullptr;
660         return -E_INTERNAL_ERROR;
661     }
662     return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
663 }
664 
665 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
666 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
667 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
668 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
669 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
670 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
671 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
672 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
673 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
674 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
675 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
676 // in the same callback thread finally causing DeadLock on commMapMutex_.
677 // #### SO #### we have to make a change described below
678 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
679 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
680 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
681 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
682 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)683 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
684     const ParseResult &inResult, const std::string &userId)
685 {
686     LabelType toLabel = inResult.GetCommLabel();
687     {
688         std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
689         int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
690         if (errCode == E_OK) { // Attention: Here is equal to E_OK
691             return E_OK;
692         }
693     }
694     LOGI("[CommAggr][AppReceive] Communicator of %s not found or nonactivated.", VEC_TO_STR(toLabel));
695     int errCode = -E_NOT_FOUND;
696     {
697         std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
698         if (onCommLackHandle_) {
699             errCode = onCommLackHandle_(toLabel, userId);
700             LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
701         } else {
702             LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
703         }
704     }
705     // Here we have to lock commMapMutex_ and search communicator again.
706     std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
707     int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
708     if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
709         LOGI("[CommAggr][AppReceive] Communicator of %s found after try again(rare case).", VEC_TO_STR(toLabel));
710         return E_OK;
711     }
712     // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
713     if (errCode != E_OK) {
714         TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
715         delete inFrameBuffer;
716         inFrameBuffer = nullptr;
717         return errCode; // The caller will display errCode in log
718     }
719     // Do Retention, the retainer is responsible to deal with the frame
720     retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
721     inFrameBuffer = nullptr;
722     return E_OK;
723 }
724 
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)725 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
726     SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
727 {
728     // Ignore nonactivated communicator, which is regarded as inexistent
729     if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
730         commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
731         // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
732         inFrameBuffer = nullptr;
733         return E_OK;
734     }
735     return -E_NOT_FOUND;
736 }
737 
RegCallbackToAdapter()738 int CommunicatorAggregator::RegCallbackToAdapter()
739 {
740     RefObject::IncObjRef(this); // Reference to be hold by adapter
741     int errCode = adapterHandle_->RegBytesReceiveCallback(
742         std::bind(&CommunicatorAggregator::OnBytesReceive, this, std::placeholders::_1, std::placeholders::_2,
743             std::placeholders::_3, std::placeholders::_4),
744         [this]() { RefObject::DecObjRef(this); });
745     if (errCode != E_OK) {
746         RefObject::DecObjRef(this); // Rollback in case reg failed
747         return errCode;
748     }
749 
750     RefObject::IncObjRef(this); // Reference to be hold by adapter
751     errCode = adapterHandle_->RegTargetChangeCallback(
752         std::bind(&CommunicatorAggregator::OnTargetChange, this, std::placeholders::_1, std::placeholders::_2),
753         [this]() { RefObject::DecObjRef(this); });
754     if (errCode != E_OK) {
755         RefObject::DecObjRef(this); // Rollback in case reg failed
756         return errCode;
757     }
758 
759     RefObject::IncObjRef(this); // Reference to be hold by adapter
760     errCode = adapterHandle_->RegSendableCallback(
761         std::bind(&CommunicatorAggregator::OnSendable, this, std::placeholders::_1),
762         [this]() { RefObject::DecObjRef(this); });
763     if (errCode != E_OK) {
764         RefObject::DecObjRef(this); // Rollback in case reg failed
765         return errCode;
766     }
767 
768     return E_OK;
769 }
770 
UnRegCallbackFromAdapter()771 void CommunicatorAggregator::UnRegCallbackFromAdapter()
772 {
773     adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
774     adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
775     adapterHandle_->RegSendableCallback(nullptr, nullptr);
776 }
777 
GenerateLocalSourceId()778 void CommunicatorAggregator::GenerateLocalSourceId()
779 {
780     std::string identity;
781     adapterHandle_->GetLocalIdentity(identity);
782     // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
783     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
784     uint64_t identityHash = Hash::HashFunc(identity);
785     if (identityHash != localSourceId_) {
786         LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%llu.", identity.c_str(), ULL(identityHash));
787     }
788     localSourceId_ = identityHash;
789 }
790 
ReGenerateLocalSourceIdIfNeed()791 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
792 {
793     // The deviceId will change when switch user from A to B
794     // We can't listen to the user change, because it's hard to ensure the timing is correct.
795     // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
796     // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
797     GenerateLocalSourceId();
798     return (localSourceId_ != 0);
799 }
800 
TriggerVersionNegotiation(const std::string & dstTarget)801 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
802 {
803     LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
804     int errCode = E_OK;
805     SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
806     if (errCode != E_OK) {
807         LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
808         return;
809     }
810 
811     TaskConfig config{true, 0, Priority::HIGH};
812     errCode = CreateSendTask(dstTarget, buffer, FrameType::EMPTY, config);
813     if (errCode != E_OK) {
814         LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
815         // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
816         delete buffer;
817         buffer = nullptr;
818     }
819 }
820 
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)821 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
822     const LabelType &dstLabel, const SerialBuffer *inOriFrame)
823 {
824     if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
825         return;
826     }
827     int errCode = E_OK;
828     Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
829     if (message == nullptr) {
830         if (errCode == -E_VERSION_NOT_SUPPORT) {
831             TriggerVersionNegotiation(dstTarget);
832         }
833         return;
834     }
835     // Message is release in TriggerCommunicatorNotFoundFeedback
836     TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
837 }
838 
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)839 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
840     const LabelType &dstLabel, Message* &oriMsg)
841 {
842     if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
843         LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
844         // Do not have to do feedback if the message is not a request type message
845         delete oriMsg;
846         oriMsg = nullptr;
847         return;
848     }
849 
850     LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
851     oriMsg->SetMessageType(TYPE_RESPONSE);
852     oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
853 
854     int errCode = E_OK;
855     SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
856     delete oriMsg;
857     oriMsg = nullptr;
858     if (errCode != E_OK) {
859         LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
860         return;
861     }
862 
863     TaskConfig config{true, 0, Priority::HIGH};
864     errCode = CreateSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
865     if (errCode != E_OK) {
866         LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
867         // if send fails, free buffer, otherwise buffer will be taked over by CreateSendTask
868         delete buffer;
869         buffer = nullptr;
870     }
871 }
872 
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)873 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
874 {
875     std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
876     versionMap_[target] = version;
877 }
878 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)879 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
880 {
881     if (adapterHandle_ == nullptr) {
882         return nullptr;
883     }
884     return adapterHandle_->GetExtendHeaderHandle(paramInfo);
885 }
886 
887 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
888 } // namespace DistributedDB
889