1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_aggregator.h"
17
18 #include "hash.h"
19 #include "communicator.h"
20 #include "communicator_linker.h"
21 #include "db_common.h"
22 #include "endian_convert.h"
23 #include "log_print.h"
24 #include "protocol_proto.h"
25
26 namespace DistributedDB {
27 namespace {
GetThreadId()28 inline std::string GetThreadId()
29 {
30 std::stringstream stream;
31 stream << std::this_thread::get_id();
32 return stream.str();
33 }
34 }
35
36 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
37
CommunicatorAggregator()38 CommunicatorAggregator::CommunicatorAggregator()
39 : shutdown_(false),
40 incFrameId_(0),
41 localSourceId_(0)
42 {
43 }
44
~CommunicatorAggregator()45 CommunicatorAggregator::~CommunicatorAggregator()
46 {
47 scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
48 adapterHandle_ = nullptr;
49 commLinker_ = nullptr;
50 }
51
Initialize(IAdapter * inAdapter)52 int CommunicatorAggregator::Initialize(IAdapter *inAdapter)
53 {
54 if (inAdapter == nullptr) {
55 return -E_INVALID_ARGS;
56 }
57 adapterHandle_ = inAdapter;
58
59 combiner_.Initialize();
60 retainer_.Initialize();
61 scheduler_.Initialize();
62
63 int errCode;
64 commLinker_ = new (std::nothrow) CommunicatorLinker(this);
65 if (commLinker_ == nullptr) {
66 errCode = -E_OUT_OF_MEMORY;
67 goto ROLL_BACK;
68 }
69 commLinker_->Initialize();
70
71 errCode = RegCallbackToAdapter();
72 if (errCode != E_OK) {
73 goto ROLL_BACK;
74 }
75
76 errCode = adapterHandle_->StartAdapter();
77 if (errCode != E_OK) {
78 LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
79 goto ROLL_BACK;
80 }
81 GenerateLocalSourceId();
82
83 shutdown_ = false;
84 exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this);
85 return E_OK;
86 ROLL_BACK:
87 UnRegCallbackFromAdapter();
88 if (commLinker_ != nullptr) {
89 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
90 commLinker_ = nullptr;
91 }
92 // Scheduler do not need to do finalize in this roll_back
93 retainer_.Finalize();
94 combiner_.Finalize();
95 return errCode;
96 }
97
Finalize()98 void CommunicatorAggregator::Finalize()
99 {
100 shutdown_ = true;
101 retryCv_.notify_all();
102 {
103 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
104 wakingSignal_ = true;
105 wakingCv_.notify_one();
106 }
107 exclusiveThread_.join(); // Waiting thread to thoroughly quit
108 LOGI("[CommAggr][Final] Sub Thread Exit.");
109 scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
110
111 adapterHandle_->StopAdapter();
112 UnRegCallbackFromAdapter();
113 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
114
115 // No callback now and later, so combiner, retainer and linker can finalize or delete safely
116 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
117 commLinker_ = nullptr;
118 retainer_.Finalize();
119 combiner_.Finalize();
120 }
121
AllocCommunicator(uint64_t commLabel,int & outErrorNo)122 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
123 {
124 uint64_t netOrderLabel = HostToNet(commLabel);
125 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
126 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
127 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
128 realLabel[i] = eachByte[i];
129 }
130 return AllocCommunicator(realLabel, outErrorNo);
131 }
132
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)133 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
134 {
135 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
136 LOGI("[CommAggr][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
137 if (commLabel.size() != COMM_LABEL_LENGTH) {
138 outErrorNo = -E_INVALID_ARGS;
139 return nullptr;
140 }
141
142 if (commMap_.count(commLabel) != 0) {
143 outErrorNo = -E_ALREADY_ALLOC;
144 return nullptr;
145 }
146
147 Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
148 if (commPtr == nullptr) {
149 outErrorNo = -E_OUT_OF_MEMORY;
150 return nullptr;
151 }
152 commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
153 return commPtr;
154 }
155
ReleaseCommunicator(ICommunicator * inCommunicator)156 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
157 {
158 if (inCommunicator == nullptr) {
159 return;
160 }
161 Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
162 LabelType commLabel = commPtr->GetCommunicatorLabel();
163 LOGI("[CommAggr][Release] Label=%.6s.", VEC_TO_STR(commLabel));
164
165 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
166 if (commMap_.count(commLabel) == 0) {
167 LOGE("[CommAggr][Release] Not Found.");
168 return;
169 }
170 commMap_.erase(commLabel);
171 RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
172
173 int errCode = commLinker_->DecreaseLocalLabel(commLabel);
174 if (errCode != E_OK) {
175 LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
176 }
177 }
178
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)179 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
180 const Finalizer &inOper)
181 {
182 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
183 return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
184 }
185
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)186 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
187 {
188 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
189 int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
190 if (onConnect && errCode == E_OK) {
191 // Register action and success
192 std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
193 for (auto &entry : onlineTargets) {
194 LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
195 onConnectHandle_(entry, true);
196 }
197 }
198 return errCode;
199 }
200
GetCommunicatorAggregatorMtuSize() const201 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
202 {
203 return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
204 }
205
GetCommunicatorAggregatorMtuSize(const std::string & target) const206 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
207 {
208 return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
209 }
210
GetCommunicatorAggregatorTimeout() const211 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
212 {
213 return adapterHandle_->GetTimeout();
214 }
215
GetCommunicatorAggregatorTimeout(const std::string & target) const216 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
217 {
218 return adapterHandle_->GetTimeout(target);
219 }
220
IsDeviceOnline(const std::string & device) const221 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
222 {
223 return adapterHandle_->IsDeviceOnline(device);
224 }
225
GetLocalIdentity(std::string & outTarget) const226 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
227 {
228 return adapterHandle_->GetLocalIdentity(outTarget);
229 }
230
ActivateCommunicator(const LabelType & commLabel)231 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
232 {
233 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
234 LOGI("[CommAggr][Activate] Label=%.6s.", VEC_TO_STR(commLabel));
235 if (commMap_.count(commLabel) == 0) {
236 LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
237 return;
238 }
239 if (commMap_.at(commLabel).second) {
240 LOGW("[CommAggr][Activate] Communicator of this label had been activated.");
241 return;
242 }
243 commMap_.at(commLabel).second = true; // Mark this communicator as activated
244
245 // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
246 // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
247 std::set<std::string> onlineTargets;
248 int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
249 if (errCode != E_OK) {
250 LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
251 // Do not return here
252 }
253 for (auto &entry : onlineTargets) {
254 LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
255 commMap_.at(commLabel).first->OnConnectChange(entry, true);
256 }
257 // Do Redeliver, the communicator is responsible to deal with the frame
258 std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
259 for (auto &entry : framesToRedeliver) {
260 commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
261 }
262 }
263
264 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)265 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
266 {
267 if (onEnd) {
268 TaskAction onSendEndTask = [onEnd, result]() {
269 LOGD("[CommAggr][SendEndTask] Before On Send End.");
270 onEnd(result);
271 LOGD("[CommAggr][SendEndTask] After On Send End.");
272 };
273 int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
274 if (errCode != E_OK) {
275 LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
276 }
277 }
278 }
279 }
280
CreateSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)281 int CommunicatorAggregator::CreateSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
282 FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
283 {
284 if (inBuff == nullptr) {
285 return -E_INVALID_ARGS;
286 }
287 LOGI("[CommAggr][Create] Enter, thread=%s, target=%s{private}, type=%d, nonBlock=%d, timeout=%u, prio=%d.",
288 GetThreadId().c_str(), dstTarget.c_str(), static_cast<int>(inType), inConfig.nonBlock, inConfig.timeout,
289 static_cast<int>(inConfig.prio));
290
291 if (!ReGenerateLocalSourceIdIfNeed()) {
292 delete inBuff;
293 inBuff = nullptr;
294 DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
295 LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
296 return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
297 }
298 PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType};
299 int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
300 if (errCode != E_OK) {
301 LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
302 return errCode;
303 }
304
305 SendTask task{inBuff, dstTarget, onEnd};
306 if (inConfig.nonBlock) {
307 errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
308 } else {
309 errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
310 }
311 if (errCode != E_OK) {
312 LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
313 return errCode;
314 }
315
316 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
317 wakingSignal_ = true;
318 wakingCv_.notify_one();
319 LOGI("[CommAggr][Create] Exit ok, thread=%s, frameId=%u", GetThreadId().c_str(), info.frameId); // Delete In Future
320 return E_OK;
321 }
322
EnableCommunicatorNotFoundFeedback(bool isEnable)323 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
324 {
325 isCommunicatorNotFoundFeedbackEnable_ = isEnable;
326 }
327
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const328 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
329 {
330 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
331 auto pair = versionMap_.find(target);
332 if (pair == versionMap_.end()) {
333 return -E_NOT_FOUND;
334 }
335 outVersion = pair->second;
336 return E_OK;
337 }
338
SendDataRoutine()339 void CommunicatorAggregator::SendDataRoutine()
340 {
341 while (!shutdown_) {
342 if (scheduler_.GetNoDelayTaskCount() == 0) {
343 std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
344 LOGI("[CommAggr][Routine] Send done and sleep."); // Delete In Future
345 wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
346 LOGI("[CommAggr][Routine] Send continue."); // Delete In Future
347 wakingSignal_ = false;
348 continue;
349 }
350
351 SendTask taskToSend;
352 int errCode = scheduler_.ScheduleOutSendTask(taskToSend);
353 if (errCode != E_OK) {
354 continue; // Not possible to happen
355 }
356 // <vector, extendHeadSize>
357 std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
358 errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer,
359 adapterHandle_->GetMtuSize(taskToSend.dstTarget), piecePackets);
360 if (errCode != E_OK) {
361 LOGE("[CommAggr][Routine] Split frame fail, errCode=%d.", errCode);
362 TaskFinalizer(taskToSend, errCode);
363 continue;
364 }
365 // <addr, <extendHeadSize, totalLen>>
366 std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
367 if (piecePackets.size() == 0) {
368 // Case that no need to split a frame, just use original buffer as a packet
369 std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
370 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
371 entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
372 entry.second.first = taskToSend.buffer->GetExtendHeadLength();
373 entry.second.second = tmpEntry.second + entry.second.first;
374 eachPacket.push_back(entry);
375 } else {
376 for (auto &entry : piecePackets) {
377 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
378 {entry.second, entry.first.size()}};
379 eachPacket.push_back(tmpEntry);
380 }
381 }
382
383 SendPacketsAndDisposeTask(taskToSend, eachPacket);
384 }
385 }
386
SendPacketsAndDisposeTask(const SendTask & inTask,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket)387 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask,
388 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket)
389 {
390 bool taskNeedFinalize = true;
391 int errCode = E_OK;
392 for (auto &entry : eachPacket) {
393 LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%u, totalLength=%u.",
394 inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
395 ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
396 errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second);
397 if (errCode == -E_WAIT_RETRY) {
398 LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
399 scheduler_.DelayTaskByTarget(inTask.dstTarget);
400 taskNeedFinalize = false;
401 break;
402 } else if (errCode != E_OK) {
403 LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
404 break;
405 }
406 }
407 if (taskNeedFinalize) {
408 TaskFinalizer(inTask, errCode);
409 }
410 }
411
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)412 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
413 {
414 int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
415 if (errCode != E_OK) {
416 bool notTimeout = true;
417 auto retryFunc = [this, inPrio, &inTask]()->bool {
418 if (this->shutdown_) {
419 delete inTask.buffer;
420 inTask.buffer = nullptr;
421 return true;
422 }
423 int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
424 if (retCode != E_OK) {
425 return false;
426 }
427 return true;
428 };
429
430 if (timeout == 0) { // Unlimited retry
431 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
432 retryCv_.wait(retryUniqueLock, retryFunc);
433 } else {
434 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
435 notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
436 }
437
438 if (shutdown_) {
439 return E_OK;
440 }
441 if (!notTimeout) {
442 return -E_TIMEOUT;
443 }
444 }
445 return E_OK;
446 }
447
TaskFinalizer(const SendTask & inTask,int result)448 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
449 {
450 // Call the OnSendEnd if need
451 if (inTask.onEnd) {
452 LOGD("[CommAggr][TaskFinal] On Send End.");
453 inTask.onEnd(result);
454 }
455 // Finalize the task that just scheduled
456 int errCode = scheduler_.FinalizeLastScheduleTask();
457 // Notify Sendable To All Communicator If Need
458 if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
459 retryCv_.notify_all();
460 }
461 if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
462 NotifySendableToAllCommunicator();
463 }
464 }
465
NotifySendableToAllCommunicator()466 void CommunicatorAggregator::NotifySendableToAllCommunicator()
467 {
468 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
469 for (auto &entry : commMap_) {
470 // Ignore nonactivated communicator
471 if (entry.second.second) {
472 entry.second.first->OnSendAvailable();
473 }
474 }
475 }
476
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)477 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
478 const std::string &userId)
479 {
480 ProtocolProto::DisplayPacketInformation(bytes, length); // For debug, delete in the future
481 ParseResult packetResult;
482 int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
483 if (errCode != E_OK) {
484 LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
485 if (errCode == -E_VERSION_NOT_SUPPORT) {
486 TriggerVersionNegotiation(srcTarget);
487 }
488 return;
489 }
490
491 // Update version of remote target
492 SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
493 if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
494 LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
495 return;
496 }
497
498 if (packetResult.IsFragment()) {
499 OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
500 } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
501 errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
502 if (errCode != E_OK) {
503 LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
504 }
505 } else {
506 errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
507 if (errCode != E_OK) {
508 LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
509 }
510 }
511 }
512
OnTargetChange(const std::string & target,bool isConnect)513 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
514 {
515 if (target.empty()) {
516 LOGE("[CommAggr][OnTarget] Target empty string.");
517 return;
518 }
519 // For process level target change
520 {
521 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
522 if (onConnectHandle_) {
523 onConnectHandle_(target, isConnect);
524 LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
525 } else {
526 LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
527 }
528 }
529 std::set<LabelType> relatedLabels;
530 // For communicator level target change
531 if (isConnect) {
532 int errCode = commLinker_->TargetOnline(target, relatedLabels);
533 if (errCode != E_OK) {
534 LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
535 }
536 } else {
537 int errCode = commLinker_->TargetOffline(target, relatedLabels);
538 if (errCode != E_OK) {
539 LOGE("[CommAggr][OnTarget] TargetOffline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
540 }
541 }
542 // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
543 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
544 for (auto &entry : commMap_) {
545 // Ignore nonactivated communicator
546 if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
547 entry.second.first->OnConnectChange(target, isConnect);
548 }
549 }
550 }
551
OnSendable(const std::string & target)552 void CommunicatorAggregator::OnSendable(const std::string &target)
553 {
554 int errCode = scheduler_.NoDelayTaskByTarget(target);
555 if (errCode != E_OK) {
556 LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
557 return;
558 }
559 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
560 wakingSignal_ = true;
561 wakingCv_.notify_one();
562 }
563
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)564 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
565 const ParseResult &inResult, const std::string &userId)
566 {
567 int errorNo = E_OK;
568 ParseResult frameResult;
569 SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
570 if (errorNo != E_OK) {
571 LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
572 return;
573 }
574 if (frameBuffer == nullptr) {
575 LOGW("[CommAggr][Receive] Combine undone.");
576 return;
577 }
578
579 int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
580 if (errCode != E_OK) {
581 LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
582 delete frameBuffer;
583 frameBuffer = nullptr;
584 if (errCode == -E_VERSION_NOT_SUPPORT) {
585 TriggerVersionNegotiation(srcTarget);
586 }
587 return;
588 }
589
590 if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
591 errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
592 if (errCode != E_OK) {
593 LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
594 }
595 delete frameBuffer;
596 frameBuffer = nullptr;
597 } else {
598 errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
599 if (errCode != E_OK) {
600 LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
601 }
602 }
603 }
604
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)605 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
606 {
607 if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
608 int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
609 inResult.GetLabelExchangeSequenceId());
610 if (errCode != E_OK) {
611 LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
612 return errCode;
613 }
614 } else {
615 std::map<LabelType, bool> changedLabels;
616 int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
617 inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
618 if (errCode != E_OK) {
619 LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
620 return errCode;
621 }
622 if (!commLinker_->IsRemoteTargetOnline(srcTarget)) {
623 LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str());
624 for (const auto &entry : changedLabels) {
625 LOGW("[CommAggr][CommReceive] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
626 }
627 return E_OK;
628 }
629 // Do target change notify
630 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
631 for (auto &entry : changedLabels) {
632 // Ignore nonactivated communicator
633 if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
634 LOGI("[CommAggr][CommReceive] label=%s, srcTarget=%s{private}, isOnline=%d.",
635 VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
636 commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
637 }
638 }
639 }
640 return E_OK;
641 }
642
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)643 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
644 uint32_t length, const ParseResult &inResult, const std::string &userId)
645 {
646 SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
647 if (buffer == nullptr) {
648 LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
649 return -E_OUT_OF_MEMORY;
650 }
651 int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
652 ProtocolProto::GetAppLayerFrameHeaderLength());
653 if (errCode != E_OK) {
654 LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
655 delete buffer;
656 buffer = nullptr;
657 return -E_INTERNAL_ERROR;
658 }
659 return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
660 }
661
662 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
663 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
664 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
665 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
666 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
667 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
668 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
669 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
670 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
671 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
672 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
673 // in the same callback thread finally causing DeadLock on commMapMutex_.
674 // #### SO #### we have to make a change described below
675 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
676 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
677 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
678 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
679 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)680 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
681 const ParseResult &inResult, const std::string &userId)
682 {
683 LabelType toLabel = inResult.GetCommLabel();
684 {
685 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
686 int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
687 if (errCode == E_OK) { // Attention: Here is equal to E_OK
688 return E_OK;
689 }
690 }
691 LOGI("[CommAggr][AppReceive] Communicator of %s not found or nonactivated.", VEC_TO_STR(toLabel));
692 int errCode = -E_NOT_FOUND;
693 {
694 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
695 if (onCommLackHandle_) {
696 errCode = onCommLackHandle_(toLabel, userId);
697 LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
698 } else {
699 LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
700 }
701 }
702 // Here we have to lock commMapMutex_ and search communicator again.
703 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
704 int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
705 if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
706 LOGI("[CommAggr][AppReceive] Communicator of %s found after try again(rare case).", VEC_TO_STR(toLabel));
707 return E_OK;
708 }
709 // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
710 if (errCode != E_OK) {
711 TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
712 delete inFrameBuffer;
713 inFrameBuffer = nullptr;
714 return errCode; // The caller will display errCode in log
715 }
716 // Do Retention, the retainer is responsible to deal with the frame
717 retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
718 inFrameBuffer = nullptr;
719 return E_OK;
720 }
721
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)722 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
723 SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
724 {
725 // Ignore nonactivated communicator, which is regarded as inexistent
726 if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
727 commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
728 // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
729 inFrameBuffer = nullptr;
730 return E_OK;
731 }
732 return -E_NOT_FOUND;
733 }
734
RegCallbackToAdapter()735 int CommunicatorAggregator::RegCallbackToAdapter()
736 {
737 RefObject::IncObjRef(this); // Reference to be hold by adapter
738 int errCode = adapterHandle_->RegBytesReceiveCallback(
739 std::bind(&CommunicatorAggregator::OnBytesReceive, this, std::placeholders::_1, std::placeholders::_2,
740 std::placeholders::_3, std::placeholders::_4),
741 [this]() { RefObject::DecObjRef(this); });
742 if (errCode != E_OK) {
743 RefObject::DecObjRef(this); // Rollback in case reg failed
744 return errCode;
745 }
746
747 RefObject::IncObjRef(this); // Reference to be hold by adapter
748 errCode = adapterHandle_->RegTargetChangeCallback(
749 std::bind(&CommunicatorAggregator::OnTargetChange, this, std::placeholders::_1, std::placeholders::_2),
750 [this]() { RefObject::DecObjRef(this); });
751 if (errCode != E_OK) {
752 RefObject::DecObjRef(this); // Rollback in case reg failed
753 return errCode;
754 }
755
756 RefObject::IncObjRef(this); // Reference to be hold by adapter
757 errCode = adapterHandle_->RegSendableCallback(
758 std::bind(&CommunicatorAggregator::OnSendable, this, std::placeholders::_1),
759 [this]() { RefObject::DecObjRef(this); });
760 if (errCode != E_OK) {
761 RefObject::DecObjRef(this); // Rollback in case reg failed
762 return errCode;
763 }
764
765 return E_OK;
766 }
767
UnRegCallbackFromAdapter()768 void CommunicatorAggregator::UnRegCallbackFromAdapter()
769 {
770 adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
771 adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
772 adapterHandle_->RegSendableCallback(nullptr, nullptr);
773 }
774
GenerateLocalSourceId()775 void CommunicatorAggregator::GenerateLocalSourceId()
776 {
777 std::string identity;
778 adapterHandle_->GetLocalIdentity(identity);
779 // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
780 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
781 uint64_t identityHash = Hash::HashFunc(identity);
782 if (identityHash != localSourceId_) {
783 LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%llu.", identity.c_str(), ULL(identityHash));
784 }
785 localSourceId_ = identityHash;
786 }
787
ReGenerateLocalSourceIdIfNeed()788 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
789 {
790 // The deviceId will change when switch user from A to B
791 // We can't listen to the user change, because it's hard to ensure the timing is correct.
792 // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
793 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
794 GenerateLocalSourceId();
795 return (localSourceId_ != 0);
796 }
797
TriggerVersionNegotiation(const std::string & dstTarget)798 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
799 {
800 LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
801 int errCode = E_OK;
802 SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
803 if (errCode != E_OK) {
804 LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
805 return;
806 }
807
808 TaskConfig config{true, 0, Priority::HIGH};
809 errCode = CreateSendTask(dstTarget, buffer, FrameType::EMPTY, config);
810 if (errCode != E_OK) {
811 LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
812 // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
813 delete buffer;
814 buffer = nullptr;
815 }
816 }
817
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)818 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
819 const LabelType &dstLabel, const SerialBuffer *inOriFrame)
820 {
821 if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
822 return;
823 }
824 int errCode = E_OK;
825 Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
826 if (message == nullptr) {
827 if (errCode == -E_VERSION_NOT_SUPPORT) {
828 TriggerVersionNegotiation(dstTarget);
829 }
830 return;
831 }
832 // Message is release in TriggerCommunicatorNotFoundFeedback
833 TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
834 }
835
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)836 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
837 const LabelType &dstLabel, Message* &oriMsg)
838 {
839 if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
840 LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
841 // Do not have to do feedback if the message is not a request type message
842 delete oriMsg;
843 oriMsg = nullptr;
844 return;
845 }
846
847 LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
848 oriMsg->SetMessageType(TYPE_RESPONSE);
849 oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
850
851 int errCode = E_OK;
852 SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
853 delete oriMsg;
854 oriMsg = nullptr;
855 if (errCode != E_OK) {
856 LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
857 return;
858 }
859
860 TaskConfig config{true, 0, Priority::HIGH};
861 errCode = CreateSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
862 if (errCode != E_OK) {
863 LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
864 // if send fails, free buffer, otherwise buffer will be taked over by CreateSendTask
865 delete buffer;
866 buffer = nullptr;
867 }
868 }
869
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)870 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
871 {
872 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
873 versionMap_[target] = version;
874 }
875
GetExtendHeaderHandle(const ExtendInfo & paramInfo)876 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
877 {
878 if (adapterHandle_ == nullptr) {
879 return nullptr;
880 }
881 return adapterHandle_->GetExtendHeaderHandle(paramInfo);
882 }
883
884 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
885 } // namespace DistributedDB
886