1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communicator_aggregator.h"
17 #include <new>
18 #include <sstream>
19 #include <utility>
20 #include <functional>
21 #include "hash.h"
22 #include "log_print.h"
23 #include "db_common.h"
24 #include "communicator.h"
25 #include "endian_convert.h"
26 #include "protocol_proto.h"
27 #include "communicator_linker.h"
28
29 namespace DistributedDB {
30 namespace {
GetThreadId()31 inline std::string GetThreadId()
32 {
33 std::stringstream stream;
34 stream << std::this_thread::get_id();
35 return stream.str();
36 }
37 }
38
39 std::atomic<bool> CommunicatorAggregator::isCommunicatorNotFoundFeedbackEnable_{true};
40
CommunicatorAggregator()41 CommunicatorAggregator::CommunicatorAggregator()
42 : shutdown_(false),
43 incFrameId_(0),
44 localSourceId_(0)
45 {
46 }
47
~CommunicatorAggregator()48 CommunicatorAggregator::~CommunicatorAggregator()
49 {
50 scheduler_.Finalize(); // Clear residual frame dumped by linker after CommunicatorAggregator finalize
51 adapterHandle_ = nullptr;
52 commLinker_ = nullptr;
53 }
54
Initialize(IAdapter * inAdapter)55 int CommunicatorAggregator::Initialize(IAdapter *inAdapter)
56 {
57 if (inAdapter == nullptr) {
58 return -E_INVALID_ARGS;
59 }
60 adapterHandle_ = inAdapter;
61
62 combiner_.Initialize();
63 retainer_.Initialize();
64 scheduler_.Initialize();
65
66 int errCode;
67 commLinker_ = new (std::nothrow) CommunicatorLinker(this);
68 if (commLinker_ == nullptr) {
69 errCode = -E_OUT_OF_MEMORY;
70 goto ROLL_BACK;
71 }
72 commLinker_->Initialize();
73
74 errCode = RegCallbackToAdapter();
75 if (errCode != E_OK) {
76 goto ROLL_BACK;
77 }
78
79 errCode = adapterHandle_->StartAdapter();
80 if (errCode != E_OK) {
81 LOGE("[CommAggr][Init] Start Adapter Fail, errCode=%d.", errCode);
82 goto ROLL_BACK;
83 }
84 GenerateLocalSourceId();
85
86 shutdown_ = false;
87 exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this);
88 return E_OK;
89 ROLL_BACK:
90 UnRegCallbackFromAdapter();
91 if (commLinker_ != nullptr) {
92 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
93 commLinker_ = nullptr;
94 }
95 // Scheduler do not need to do finalize in this roll_back
96 retainer_.Finalize();
97 combiner_.Finalize();
98 return errCode;
99 }
100
Finalize()101 void CommunicatorAggregator::Finalize()
102 {
103 shutdown_ = true;
104 retryCv_.notify_all();
105 {
106 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
107 wakingSignal_ = true;
108 wakingCv_.notify_one();
109 }
110 exclusiveThread_.join(); // Waiting thread to thoroughly quit
111 LOGI("[CommAggr][Final] Sub Thread Exit.");
112 scheduler_.Finalize(); // scheduler_ must finalize here to make space for linker to dump residual frame
113
114 adapterHandle_->StopAdapter();
115 UnRegCallbackFromAdapter();
116 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure all callback thread quit
117
118 // No callback now and later, so combiner, retainer and linker can finalize or delete safely
119 RefObject::DecObjRef(commLinker_); // Refcount of linker is 1 when created, here to unref linker
120 commLinker_ = nullptr;
121 retainer_.Finalize();
122 combiner_.Finalize();
123 }
124
AllocCommunicator(uint64_t commLabel,int & outErrorNo)125 ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo)
126 {
127 uint64_t netOrderLabel = HostToNet(commLabel);
128 uint8_t *eachByte = reinterpret_cast<uint8_t *>(&netOrderLabel);
129 std::vector<uint8_t> realLabel(COMM_LABEL_LENGTH, 0);
130 for (int i = 0; i < static_cast<int>(sizeof(uint64_t)); i++) {
131 realLabel[i] = eachByte[i];
132 }
133 return AllocCommunicator(realLabel, outErrorNo);
134 }
135
AllocCommunicator(const std::vector<uint8_t> & commLabel,int & outErrorNo)136 ICommunicator *CommunicatorAggregator::AllocCommunicator(const std::vector<uint8_t> &commLabel, int &outErrorNo)
137 {
138 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
139 LOGI("[CommAggr][Alloc] Label=%.6s.", VEC_TO_STR(commLabel));
140 if (commLabel.size() != COMM_LABEL_LENGTH) {
141 outErrorNo = -E_INVALID_ARGS;
142 return nullptr;
143 }
144
145 if (commMap_.count(commLabel) != 0) {
146 outErrorNo = -E_ALREADY_ALLOC;
147 return nullptr;
148 }
149
150 Communicator *commPtr = new (std::nothrow) Communicator(this, commLabel);
151 if (commPtr == nullptr) {
152 outErrorNo = -E_OUT_OF_MEMORY;
153 return nullptr;
154 }
155 commMap_[commLabel] = {commPtr, false}; // Communicator is not activated when allocated
156 return commPtr;
157 }
158
ReleaseCommunicator(ICommunicator * inCommunicator)159 void CommunicatorAggregator::ReleaseCommunicator(ICommunicator *inCommunicator)
160 {
161 if (inCommunicator == nullptr) {
162 return;
163 }
164 Communicator *commPtr = static_cast<Communicator *>(inCommunicator);
165 LabelType commLabel = commPtr->GetCommunicatorLabel();
166 LOGI("[CommAggr][Release] Label=%.6s.", VEC_TO_STR(commLabel));
167
168 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
169 if (commMap_.count(commLabel) == 0) {
170 LOGE("[CommAggr][Release] Not Found.");
171 return;
172 }
173 commMap_.erase(commLabel);
174 RefObject::DecObjRef(commPtr); // Refcount of Communicator is 1 when created, here to unref Communicator
175
176 int errCode = commLinker_->DecreaseLocalLabel(commLabel);
177 if (errCode != E_OK) {
178 LOGE("[CommAggr][Release] DecreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
179 }
180 }
181
RegCommunicatorLackCallback(const CommunicatorLackCallback & onCommLack,const Finalizer & inOper)182 int CommunicatorAggregator::RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack,
183 const Finalizer &inOper)
184 {
185 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
186 return RegCallBack(onCommLack, onCommLackHandle_, inOper, onCommLackFinalizer_);
187 }
188
RegOnConnectCallback(const OnConnectCallback & onConnect,const Finalizer & inOper)189 int CommunicatorAggregator::RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper)
190 {
191 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
192 int errCode = RegCallBack(onConnect, onConnectHandle_, inOper, onConnectFinalizer_);
193 if (onConnect && errCode == E_OK) {
194 // Register action and success
195 std::set<std::string> onlineTargets = commLinker_->GetOnlineRemoteTarget();
196 for (auto &entry : onlineTargets) {
197 LOGI("[CommAggr][RegConnect] Online target=%s{private}.", entry.c_str());
198 onConnectHandle_(entry, true);
199 }
200 }
201 return errCode;
202 }
203
GetCommunicatorAggregatorMtuSize() const204 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize() const
205 {
206 return adapterHandle_->GetMtuSize() - ProtocolProto::GetLengthBeforeSerializedData();
207 }
208
GetCommunicatorAggregatorMtuSize(const std::string & target) const209 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorMtuSize(const std::string &target) const
210 {
211 return adapterHandle_->GetMtuSize(target) - ProtocolProto::GetLengthBeforeSerializedData();
212 }
213
GetCommunicatorAggregatorTimeout() const214 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout() const
215 {
216 return adapterHandle_->GetTimeout();
217 }
218
GetCommunicatorAggregatorTimeout(const std::string & target) const219 uint32_t CommunicatorAggregator::GetCommunicatorAggregatorTimeout(const std::string &target) const
220 {
221 return adapterHandle_->GetTimeout(target);
222 }
223
IsDeviceOnline(const std::string & device) const224 bool CommunicatorAggregator::IsDeviceOnline(const std::string &device) const
225 {
226 return adapterHandle_->IsDeviceOnline(device);
227 }
228
GetLocalIdentity(std::string & outTarget) const229 int CommunicatorAggregator::GetLocalIdentity(std::string &outTarget) const
230 {
231 return adapterHandle_->GetLocalIdentity(outTarget);
232 }
233
ActivateCommunicator(const LabelType & commLabel)234 void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel)
235 {
236 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
237 LOGI("[CommAggr][Activate] Label=%.6s.", VEC_TO_STR(commLabel));
238 if (commMap_.count(commLabel) == 0) {
239 LOGW("[CommAggr][Activate] Communicator of this label not allocated.");
240 return;
241 }
242 if (commMap_.at(commLabel).second) {
243 LOGW("[CommAggr][Activate] Communicator of this label had been activated.");
244 return;
245 }
246 commMap_.at(commLabel).second = true; // Mark this communicator as activated
247
248 // IncreaseLocalLabel below and DecreaseLocalLabel in ReleaseCommunicator should all be protected by commMapMutex_
249 // To avoid disordering probably caused by concurrent call to ActivateCommunicator and ReleaseCommunicator
250 std::set<std::string> onlineTargets;
251 int errCode = commLinker_->IncreaseLocalLabel(commLabel, onlineTargets);
252 if (errCode != E_OK) {
253 LOGE("[CommAggr][Activate] IncreaseLocalLabel Fail, Just Log, errCode=%d.", errCode);
254 // Do not return here
255 }
256 for (auto &entry : onlineTargets) {
257 LOGI("[CommAggr][Activate] Already Online Target=%s{private}.", entry.c_str());
258 commMap_.at(commLabel).first->OnConnectChange(entry, true);
259 }
260 // Do Redeliver, the communicator is responsible to deal with the frame
261 std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
262 for (auto &entry : framesToRedeliver) {
263 commMap_.at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
264 }
265 }
266
267 namespace {
DoOnSendEndByTaskIfNeed(const OnSendEnd & onEnd,int result)268 void DoOnSendEndByTaskIfNeed(const OnSendEnd &onEnd, int result)
269 {
270 if (onEnd) {
271 TaskAction onSendEndTask = [onEnd, result]() {
272 LOGD("[CommAggr][SendEndTask] Before On Send End.");
273 onEnd(result);
274 LOGD("[CommAggr][SendEndTask] After On Send End.");
275 };
276 int errCode = RuntimeContext::GetInstance()->ScheduleTask(onSendEndTask);
277 if (errCode != E_OK) {
278 LOGE("[CommAggr][SendEndTask] ScheduleTask failed, errCode = %d.", errCode);
279 }
280 }
281 }
282 }
283
CreateSendTask(const std::string & dstTarget,SerialBuffer * inBuff,FrameType inType,const TaskConfig & inConfig,const OnSendEnd & onEnd)284 int CommunicatorAggregator::CreateSendTask(const std::string &dstTarget, SerialBuffer *inBuff,
285 FrameType inType, const TaskConfig &inConfig, const OnSendEnd &onEnd)
286 {
287 if (inBuff == nullptr) {
288 return -E_INVALID_ARGS;
289 }
290 LOGI("[CommAggr][Create] Enter, thread=%s, target=%s{private}, type=%d, nonBlock=%d, timeout=%u, prio=%d.",
291 GetThreadId().c_str(), dstTarget.c_str(), static_cast<int>(inType), inConfig.nonBlock, inConfig.timeout,
292 static_cast<int>(inConfig.prio));
293
294 if (!ReGenerateLocalSourceIdIfNeed()) {
295 delete inBuff;
296 inBuff = nullptr;
297 DoOnSendEndByTaskIfNeed(onEnd, -E_PERIPHERAL_INTERFACE_FAIL);
298 LOGE("[CommAggr][Create] Exit ok but discard since localSourceId zero, thread=%s.", GetThreadId().c_str());
299 return E_OK; // Returns E_OK here to indicate this buffer was accepted though discard immediately
300 }
301 PhyHeaderInfo info{localSourceId_, incFrameId_.fetch_add(1, std::memory_order_seq_cst), inType};
302 int errCode = ProtocolProto::SetPhyHeader(inBuff, info);
303 if (errCode != E_OK) {
304 LOGE("[CommAggr][Create] Set phyHeader fail, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
305 return errCode;
306 }
307
308 SendTask task{inBuff, dstTarget, onEnd};
309 if (inConfig.nonBlock) {
310 errCode = scheduler_.AddSendTaskIntoSchedule(task, inConfig.prio);
311 } else {
312 errCode = RetryUntilTimeout(task, inConfig.timeout, inConfig.prio);
313 }
314 if (errCode != E_OK) {
315 LOGW("[CommAggr][Create] Exit failed, thread=%s, errCode=%d", GetThreadId().c_str(), errCode);
316 return errCode;
317 }
318
319 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
320 wakingSignal_ = true;
321 wakingCv_.notify_one();
322 LOGI("[CommAggr][Create] Exit ok, thread=%s, frameId=%u", GetThreadId().c_str(), info.frameId); // Delete In Future
323 return E_OK;
324 }
325
EnableCommunicatorNotFoundFeedback(bool isEnable)326 void CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(bool isEnable)
327 {
328 isCommunicatorNotFoundFeedbackEnable_ = isEnable;
329 }
330
GetRemoteCommunicatorVersion(const std::string & target,uint16_t & outVersion) const331 int CommunicatorAggregator::GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const
332 {
333 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
334 auto pair = versionMap_.find(target);
335 if (pair == versionMap_.end()) {
336 return -E_NOT_FOUND;
337 }
338 outVersion = pair->second;
339 return E_OK;
340 }
341
SendDataRoutine()342 void CommunicatorAggregator::SendDataRoutine()
343 {
344 while (!shutdown_) {
345 if (scheduler_.GetNoDelayTaskCount() == 0) {
346 std::unique_lock<std::mutex> wakingUniqueLock(wakingMutex_);
347 LOGI("[CommAggr][Routine] Send done and sleep."); // Delete In Future
348 wakingCv_.wait(wakingUniqueLock, [this] { return this->wakingSignal_; });
349 LOGI("[CommAggr][Routine] Send continue."); // Delete In Future
350 wakingSignal_ = false;
351 continue;
352 }
353
354 SendTask taskToSend;
355 int errCode = scheduler_.ScheduleOutSendTask(taskToSend);
356 if (errCode != E_OK) {
357 continue; // Not possible to happen
358 }
359 // <vector, extendHeadSize>
360 std::vector<std::pair<std::vector<uint8_t>, uint32_t>> piecePackets;
361 errCode = ProtocolProto::SplitFrameIntoPacketsIfNeed(taskToSend.buffer,
362 adapterHandle_->GetMtuSize(taskToSend.dstTarget), piecePackets);
363 if (errCode != E_OK) {
364 LOGE("[CommAggr][Routine] Split frame fail, errCode=%d.", errCode);
365 TaskFinalizer(taskToSend, errCode);
366 continue;
367 }
368 // <addr, <extendHeadSize, totalLen>>
369 std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> eachPacket;
370 if (piecePackets.size() == 0) {
371 // Case that no need to split a frame, just use original buffer as a packet
372 std::pair<const uint8_t *, uint32_t> tmpEntry = taskToSend.buffer->GetReadOnlyBytesForEntireBuffer();
373 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> entry;
374 entry.first = tmpEntry.first - taskToSend.buffer->GetExtendHeadLength();
375 entry.second.first = taskToSend.buffer->GetExtendHeadLength();
376 entry.second.second = tmpEntry.second + entry.second.first;
377 eachPacket.push_back(entry);
378 } else {
379 for (auto &entry : piecePackets) {
380 std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>> tmpEntry = {&(entry.first[0]),
381 {entry.second, entry.first.size()}};
382 eachPacket.push_back(tmpEntry);
383 }
384 }
385
386 SendPacketsAndDisposeTask(taskToSend, eachPacket);
387 }
388 }
389
SendPacketsAndDisposeTask(const SendTask & inTask,const std::vector<std::pair<const uint8_t *,std::pair<uint32_t,uint32_t>>> & eachPacket)390 void CommunicatorAggregator::SendPacketsAndDisposeTask(const SendTask &inTask,
391 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket)
392 {
393 bool taskNeedFinalize = true;
394 int errCode = E_OK;
395 for (auto &entry : eachPacket) {
396 LOGI("[CommAggr][SendPackets] DoSendBytes, dstTarget=%s{private}, extendHeadLength=%u, totalLength=%u.",
397 inTask.dstTarget.c_str(), entry.second.first, entry.second.second);
398 ProtocolProto::DisplayPacketInformation(entry.first + entry.second.first, entry.second.second);
399 errCode = adapterHandle_->SendBytes(inTask.dstTarget, entry.first, entry.second.second);
400 if (errCode == -E_WAIT_RETRY) {
401 LOGE("[CommAggr][SendPackets] SendBytes temporally fail.");
402 scheduler_.DelayTaskByTarget(inTask.dstTarget);
403 taskNeedFinalize = false;
404 break;
405 } else if (errCode != E_OK) {
406 LOGE("[CommAggr][SendPackets] SendBytes totally fail, errCode=%d.", errCode);
407 break;
408 }
409 }
410 if (taskNeedFinalize) {
411 TaskFinalizer(inTask, errCode);
412 }
413 }
414
RetryUntilTimeout(SendTask & inTask,uint32_t timeout,Priority inPrio)415 int CommunicatorAggregator::RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio)
416 {
417 int errCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
418 if (errCode != E_OK) {
419 bool notTimeout = true;
420 auto retryFunc = [this, inPrio, &inTask]()->bool {
421 if (this->shutdown_) {
422 delete inTask.buffer;
423 inTask.buffer = nullptr;
424 return true;
425 }
426 int retCode = scheduler_.AddSendTaskIntoSchedule(inTask, inPrio);
427 if (retCode != E_OK) {
428 return false;
429 }
430 return true;
431 };
432
433 if (timeout == 0) { // Unlimited retry
434 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
435 retryCv_.wait(retryUniqueLock, retryFunc);
436 } else {
437 std::unique_lock<std::mutex> retryUniqueLock(retryMutex_);
438 notTimeout = retryCv_.wait_for(retryUniqueLock, std::chrono::milliseconds(timeout), retryFunc);
439 }
440
441 if (shutdown_) {
442 return E_OK;
443 }
444 if (!notTimeout) {
445 return -E_TIMEOUT;
446 }
447 }
448 return E_OK;
449 }
450
TaskFinalizer(const SendTask & inTask,int result)451 void CommunicatorAggregator::TaskFinalizer(const SendTask &inTask, int result)
452 {
453 // Call the OnSendEnd if need
454 if (inTask.onEnd) {
455 LOGD("[CommAggr][TaskFinal] On Send End.");
456 inTask.onEnd(result);
457 }
458 // Finalize the task that just scheduled
459 int errCode = scheduler_.FinalizeLastScheduleTask();
460 // Notify Sendable To All Communicator If Need
461 if (errCode == -E_CONTAINER_FULL_TO_NOTFULL) {
462 retryCv_.notify_all();
463 }
464 if (errCode == -E_CONTAINER_NOTEMPTY_TO_EMPTY) {
465 NotifySendableToAllCommunicator();
466 }
467 }
468
NotifySendableToAllCommunicator()469 void CommunicatorAggregator::NotifySendableToAllCommunicator()
470 {
471 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
472 for (auto &entry : commMap_) {
473 // Ignore nonactivated communicator
474 if (entry.second.second) {
475 entry.second.first->OnSendAvailable();
476 }
477 }
478 }
479
OnBytesReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const std::string & userId)480 void CommunicatorAggregator::OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
481 const std::string &userId)
482 {
483 ProtocolProto::DisplayPacketInformation(bytes, length); // For debug, delete in the future
484 ParseResult packetResult;
485 int errCode = ProtocolProto::CheckAndParsePacket(srcTarget, bytes, length, packetResult);
486 if (errCode != E_OK) {
487 LOGE("[CommAggr][Receive] Parse packet fail, errCode=%d.", errCode);
488 if (errCode == -E_VERSION_NOT_SUPPORT) {
489 TriggerVersionNegotiation(srcTarget);
490 }
491 return;
492 }
493
494 // Update version of remote target
495 SetRemoteCommunicatorVersion(srcTarget, packetResult.GetDbVersion());
496 if (packetResult.GetFrameTypeInfo() == FrameType::EMPTY) { // Empty frame will never be fragmented
497 LOGI("[CommAggr][Receive] Empty frame, just ignore in this version of distributeddb.");
498 return;
499 }
500
501 if (packetResult.IsFragment()) {
502 OnFragmentReceive(srcTarget, bytes, length, packetResult, userId);
503 } else if (packetResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
504 errCode = OnCommLayerFrameReceive(srcTarget, packetResult);
505 if (errCode != E_OK) {
506 LOGE("[CommAggr][Receive] CommLayer receive fail, errCode=%d.", errCode);
507 }
508 } else {
509 errCode = OnAppLayerFrameReceive(srcTarget, bytes, length, packetResult, userId);
510 if (errCode != E_OK) {
511 LOGE("[CommAggr][Receive] AppLayer receive fail, errCode=%d.", errCode);
512 }
513 }
514 }
515
OnTargetChange(const std::string & target,bool isConnect)516 void CommunicatorAggregator::OnTargetChange(const std::string &target, bool isConnect)
517 {
518 if (target.empty()) {
519 LOGE("[CommAggr][OnTarget] Target empty string.");
520 return;
521 }
522 // For process level target change
523 {
524 std::lock_guard<std::mutex> onConnectLockGuard(onConnectMutex_);
525 if (onConnectHandle_) {
526 onConnectHandle_(target, isConnect);
527 LOGI("[CommAggr][OnTarget] On Connect End."); // Log in case callback block this thread
528 } else {
529 LOGI("[CommAggr][OnTarget] ConnectHandle invalid currently.");
530 }
531 }
532 std::set<LabelType> relatedLabels;
533 // For communicator level target change
534 if (isConnect) {
535 int errCode = commLinker_->TargetOnline(target, relatedLabels);
536 if (errCode != E_OK) {
537 LOGE("[CommAggr][OnTarget] TargetOnline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
538 }
539 } else {
540 int errCode = commLinker_->TargetOffline(target, relatedLabels);
541 if (errCode != E_OK) {
542 LOGE("[CommAggr][OnTarget] TargetOffline fail, target=%s{private}, errCode=%d.", target.c_str(), errCode);
543 }
544 }
545 // All related communicator online or offline this target, no matter TargetOnline or TargetOffline fail or not
546 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
547 for (auto &entry : commMap_) {
548 // Ignore nonactivated communicator
549 if (entry.second.second && (!isConnect || (relatedLabels.count(entry.first) != 0))) {
550 entry.second.first->OnConnectChange(target, isConnect);
551 }
552 }
553 }
554
OnSendable(const std::string & target)555 void CommunicatorAggregator::OnSendable(const std::string &target)
556 {
557 int errCode = scheduler_.NoDelayTaskByTarget(target);
558 if (errCode != E_OK) {
559 LOGE("[CommAggr][Sendable] NoDelay target=%s{private} fail, errCode=%d.", target.c_str(), errCode);
560 return;
561 }
562 std::lock_guard<std::mutex> wakingLockGuard(wakingMutex_);
563 wakingSignal_ = true;
564 wakingCv_.notify_one();
565 }
566
OnFragmentReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)567 void CommunicatorAggregator::OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
568 const ParseResult &inResult, const std::string &userId)
569 {
570 int errorNo = E_OK;
571 ParseResult frameResult;
572 SerialBuffer *frameBuffer = combiner_.AssembleFrameFragment(bytes, length, inResult, frameResult, errorNo);
573 if (errorNo != E_OK) {
574 LOGE("[CommAggr][Receive] Combine fail, errCode=%d.", errorNo);
575 return;
576 }
577 if (frameBuffer == nullptr) {
578 LOGW("[CommAggr][Receive] Combine undone.");
579 return;
580 }
581
582 int errCode = ProtocolProto::CheckAndParseFrame(frameBuffer, frameResult);
583 if (errCode != E_OK) {
584 LOGE("[CommAggr][Receive] Parse frame fail, errCode=%d.", errCode);
585 delete frameBuffer;
586 frameBuffer = nullptr;
587 if (errCode == -E_VERSION_NOT_SUPPORT) {
588 TriggerVersionNegotiation(srcTarget);
589 }
590 return;
591 }
592
593 if (frameResult.GetFrameTypeInfo() != FrameType::APPLICATION_MESSAGE) {
594 errCode = OnCommLayerFrameReceive(srcTarget, frameResult);
595 if (errCode != E_OK) {
596 LOGE("[CommAggr][Receive] CommLayer receive fail after combination, errCode=%d.", errCode);
597 }
598 delete frameBuffer;
599 frameBuffer = nullptr;
600 } else {
601 errCode = OnAppLayerFrameReceive(srcTarget, frameBuffer, frameResult, userId);
602 if (errCode != E_OK) {
603 LOGE("[CommAggr][Receive] AppLayer receive fail after combination, errCode=%d.", errCode);
604 }
605 }
606 }
607
OnCommLayerFrameReceive(const std::string & srcTarget,const ParseResult & inResult)608 int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult)
609 {
610 if (inResult.GetFrameTypeInfo() == FrameType::COMMUNICATION_LABEL_EXCHANGE_ACK) {
611 int errCode = commLinker_->ReceiveLabelExchangeAck(srcTarget, inResult.GetLabelExchangeDistinctValue(),
612 inResult.GetLabelExchangeSequenceId());
613 if (errCode != E_OK) {
614 LOGE("[CommAggr][CommReceive] Receive LabelExchangeAck Fail.");
615 return errCode;
616 }
617 } else {
618 std::map<LabelType, bool> changedLabels;
619 int errCode = commLinker_->ReceiveLabelExchange(srcTarget, inResult.GetLatestCommLabels(),
620 inResult.GetLabelExchangeDistinctValue(), inResult.GetLabelExchangeSequenceId(), changedLabels);
621 if (errCode != E_OK) {
622 LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail.");
623 return errCode;
624 }
625 if (!commLinker_->IsRemoteTargetOnline(srcTarget)) {
626 LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str());
627 for (const auto &entry : changedLabels) {
628 LOGW("[CommAggr][CommReceive] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second);
629 }
630 return E_OK;
631 }
632 // Do target change notify
633 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
634 for (auto &entry : changedLabels) {
635 // Ignore nonactivated communicator
636 if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) {
637 LOGI("[CommAggr][CommReceive] label=%s, srcTarget=%s{private}, isOnline=%d.",
638 VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second);
639 commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second);
640 }
641 }
642 }
643 return E_OK;
644 }
645
OnAppLayerFrameReceive(const std::string & srcTarget,const uint8_t * bytes,uint32_t length,const ParseResult & inResult,const std::string & userId)646 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
647 uint32_t length, const ParseResult &inResult, const std::string &userId)
648 {
649 SerialBuffer *buffer = new (std::nothrow) SerialBuffer();
650 if (buffer == nullptr) {
651 LOGE("[CommAggr][AppReceive] New SerialBuffer fail.");
652 return -E_OUT_OF_MEMORY;
653 }
654 int errCode = buffer->SetExternalBuff(bytes, length - inResult.GetPaddingLen(),
655 ProtocolProto::GetAppLayerFrameHeaderLength());
656 if (errCode != E_OK) {
657 LOGE("[CommAggr][AppReceive] SetExternalBuff fail, errCode=%d.", errCode);
658 delete buffer;
659 buffer = nullptr;
660 return -E_INTERNAL_ERROR;
661 }
662 return OnAppLayerFrameReceive(srcTarget, buffer, inResult, userId);
663 }
664
665 // In early time, we cover "OnAppLayerFrameReceive" totally by commMapMutex_, then search communicator, if not found,
666 // we call onCommLackHandle_ if exist to ask whether to retain this frame or not, if the answer is yes we retain this
667 // frame, otherwise we discard this frame and send out CommunicatorNotFound feedback.
668 // We design so(especially cover this function totally by commMapMutex_) to avoid current situation described below
669 // 1:This func find that target communicator not allocated or activated, so decide to retain this frame.
670 // 2:Thread switch out, the target communicator is allocated and activated, previous retained frame is fetched out.
671 // 3:Thread switch back, this frame is then retained into the retainer, no chance to be fetched out.
672 // In conclusion: the decision to retain a frame and the action to retain a frame should not be separated.
673 // Otherwise, at the action time, the retain decision may be obsolete and wrong.
674 // #### BUT #### since onCommLackHandle_ callback is go beyond DistributedDB and there is the risk that the final upper
675 // user may do something such as GetKvStore(we can prevent them to so) which could result in calling AllocCommunicator
676 // in the same callback thread finally causing DeadLock on commMapMutex_.
677 // #### SO #### we have to make a change described below
678 // 1:Search communicator under commMapMutex_, if found then deliver frame to that communicator and end.
679 // 2:Call onCommLackHandle_ if exist to ask whether to retain this frame or not, without commMapMutex_.
680 // Note: during this period, commMap_ maybe changed, and communicator not found before may exist now.
681 // 3:Search communicator under commMapMutex_ again, if found then deliver frame to that communicator and end.
682 // 4:If still not found, retain this frame if need or otherwise send CommunicatorNotFound feedback.
OnAppLayerFrameReceive(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const ParseResult & inResult,const std::string & userId)683 int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
684 const ParseResult &inResult, const std::string &userId)
685 {
686 LabelType toLabel = inResult.GetCommLabel();
687 {
688 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
689 int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
690 if (errCode == E_OK) { // Attention: Here is equal to E_OK
691 return E_OK;
692 }
693 }
694 LOGI("[CommAggr][AppReceive] Communicator of %s not found or nonactivated.", VEC_TO_STR(toLabel));
695 int errCode = -E_NOT_FOUND;
696 {
697 std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
698 if (onCommLackHandle_) {
699 errCode = onCommLackHandle_(toLabel, userId);
700 LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
701 } else {
702 LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
703 }
704 }
705 // Here we have to lock commMapMutex_ and search communicator again.
706 std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
707 int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel);
708 if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
709 LOGI("[CommAggr][AppReceive] Communicator of %s found after try again(rare case).", VEC_TO_STR(toLabel));
710 return E_OK;
711 }
712 // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_
713 if (errCode != E_OK) {
714 TryToFeedbackWhenCommunicatorNotFound(srcTarget, toLabel, inFrameBuffer);
715 delete inFrameBuffer;
716 inFrameBuffer = nullptr;
717 return errCode; // The caller will display errCode in log
718 }
719 // Do Retention, the retainer is responsible to deal with the frame
720 retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
721 inFrameBuffer = nullptr;
722 return E_OK;
723 }
724
TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string & srcTarget,SerialBuffer * & inFrameBuffer,const LabelType & toLabel)725 int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
726 SerialBuffer *&inFrameBuffer, const LabelType &toLabel)
727 {
728 // Ignore nonactivated communicator, which is regarded as inexistent
729 if (commMap_.count(toLabel) != 0 && commMap_.at(toLabel).second) {
730 commMap_.at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
731 // Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
732 inFrameBuffer = nullptr;
733 return E_OK;
734 }
735 return -E_NOT_FOUND;
736 }
737
RegCallbackToAdapter()738 int CommunicatorAggregator::RegCallbackToAdapter()
739 {
740 RefObject::IncObjRef(this); // Reference to be hold by adapter
741 int errCode = adapterHandle_->RegBytesReceiveCallback(
742 std::bind(&CommunicatorAggregator::OnBytesReceive, this, std::placeholders::_1, std::placeholders::_2,
743 std::placeholders::_3, std::placeholders::_4),
744 [this]() { RefObject::DecObjRef(this); });
745 if (errCode != E_OK) {
746 RefObject::DecObjRef(this); // Rollback in case reg failed
747 return errCode;
748 }
749
750 RefObject::IncObjRef(this); // Reference to be hold by adapter
751 errCode = adapterHandle_->RegTargetChangeCallback(
752 std::bind(&CommunicatorAggregator::OnTargetChange, this, std::placeholders::_1, std::placeholders::_2),
753 [this]() { RefObject::DecObjRef(this); });
754 if (errCode != E_OK) {
755 RefObject::DecObjRef(this); // Rollback in case reg failed
756 return errCode;
757 }
758
759 RefObject::IncObjRef(this); // Reference to be hold by adapter
760 errCode = adapterHandle_->RegSendableCallback(
761 std::bind(&CommunicatorAggregator::OnSendable, this, std::placeholders::_1),
762 [this]() { RefObject::DecObjRef(this); });
763 if (errCode != E_OK) {
764 RefObject::DecObjRef(this); // Rollback in case reg failed
765 return errCode;
766 }
767
768 return E_OK;
769 }
770
UnRegCallbackFromAdapter()771 void CommunicatorAggregator::UnRegCallbackFromAdapter()
772 {
773 adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr);
774 adapterHandle_->RegTargetChangeCallback(nullptr, nullptr);
775 adapterHandle_->RegSendableCallback(nullptr, nullptr);
776 }
777
GenerateLocalSourceId()778 void CommunicatorAggregator::GenerateLocalSourceId()
779 {
780 std::string identity;
781 adapterHandle_->GetLocalIdentity(identity);
782 // When GetLocalIdentity fail, the identity be an empty string, the localSourceId be zero, need regenerate
783 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk
784 uint64_t identityHash = Hash::HashFunc(identity);
785 if (identityHash != localSourceId_) {
786 LOGI("[CommAggr][GenSrcId] identity=%s{private}, localSourceId=%llu.", identity.c_str(), ULL(identityHash));
787 }
788 localSourceId_ = identityHash;
789 }
790
ReGenerateLocalSourceIdIfNeed()791 bool CommunicatorAggregator::ReGenerateLocalSourceIdIfNeed()
792 {
793 // The deviceId will change when switch user from A to B
794 // We can't listen to the user change, because it's hard to ensure the timing is correct.
795 // So we regenerate to make sure the deviceId and localSourceId is correct when we create send task.
796 // The localSourceId is std::atomic<uint64_t>, so there is no concurrency risk, no need lockguard here.
797 GenerateLocalSourceId();
798 return (localSourceId_ != 0);
799 }
800
TriggerVersionNegotiation(const std::string & dstTarget)801 void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTarget)
802 {
803 LOGI("[CommAggr][TrigVer] Do version negotiate with target=%s{private}.", dstTarget.c_str());
804 int errCode = E_OK;
805 SerialBuffer *buffer = ProtocolProto::BuildEmptyFrameForVersionNegotiate(errCode);
806 if (errCode != E_OK) {
807 LOGE("[CommAggr][TrigVer] Build empty frame fail, errCode=%d", errCode);
808 return;
809 }
810
811 TaskConfig config{true, 0, Priority::HIGH};
812 errCode = CreateSendTask(dstTarget, buffer, FrameType::EMPTY, config);
813 if (errCode != E_OK) {
814 LOGE("[CommAggr][TrigVer] Send empty frame fail, errCode=%d", errCode);
815 // if send fails, free buffer, otherwise buffer will be taked over by SendTaskScheduler
816 delete buffer;
817 buffer = nullptr;
818 }
819 }
820
TryToFeedbackWhenCommunicatorNotFound(const std::string & dstTarget,const LabelType & dstLabel,const SerialBuffer * inOriFrame)821 void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget,
822 const LabelType &dstLabel, const SerialBuffer *inOriFrame)
823 {
824 if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) {
825 return;
826 }
827 int errCode = E_OK;
828 Message *message = ProtocolProto::ToMessage(inOriFrame, errCode, true);
829 if (message == nullptr) {
830 if (errCode == -E_VERSION_NOT_SUPPORT) {
831 TriggerVersionNegotiation(dstTarget);
832 }
833 return;
834 }
835 // Message is release in TriggerCommunicatorNotFoundFeedback
836 TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message);
837 }
838
TriggerCommunicatorNotFoundFeedback(const std::string & dstTarget,const LabelType & dstLabel,Message * & oriMsg)839 void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget,
840 const LabelType &dstLabel, Message* &oriMsg)
841 {
842 if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) {
843 LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request.");
844 // Do not have to do feedback if the message is not a request type message
845 delete oriMsg;
846 oriMsg = nullptr;
847 return;
848 }
849
850 LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str());
851 oriMsg->SetMessageType(TYPE_RESPONSE);
852 oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
853
854 int errCode = E_OK;
855 SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode);
856 delete oriMsg;
857 oriMsg = nullptr;
858 if (errCode != E_OK) {
859 LOGE("[CommAggr][TrigNotFound] Build communicator not found feedback frame fail, errCode=%d", errCode);
860 return;
861 }
862
863 TaskConfig config{true, 0, Priority::HIGH};
864 errCode = CreateSendTask(dstTarget, buffer, FrameType::APPLICATION_MESSAGE, config);
865 if (errCode != E_OK) {
866 LOGE("[CommAggr][TrigNotFound] Send communicator not found feedback frame fail, errCode=%d", errCode);
867 // if send fails, free buffer, otherwise buffer will be taked over by CreateSendTask
868 delete buffer;
869 buffer = nullptr;
870 }
871 }
872
SetRemoteCommunicatorVersion(const std::string & target,uint16_t version)873 void CommunicatorAggregator::SetRemoteCommunicatorVersion(const std::string &target, uint16_t version)
874 {
875 std::lock_guard<std::mutex> versionMapLockGuard(versionMapMutex_);
876 versionMap_[target] = version;
877 }
878
GetExtendHeaderHandle(const ExtendInfo & paramInfo)879 std::shared_ptr<ExtendHeaderHandle> CommunicatorAggregator::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
880 {
881 if (adapterHandle_ == nullptr) {
882 return nullptr;
883 }
884 return adapterHandle_->GetExtendHeaderHandle(paramInfo);
885 }
886
887 DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator)
888 } // namespace DistributedDB
889