1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "runtime_context_impl.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "db_dfx_adapter.h"
20 #include "log_print.h"
21 #include "communicator_aggregator.h"
22 #include "network_adapter.h"
23
24 namespace DistributedDB {
25 #ifdef RUNNING_ON_TESTCASE
26 static std::atomic_uint taskID = 0;
27 #endif
28
RuntimeContextImpl()29 RuntimeContextImpl::RuntimeContextImpl()
30 : adapter_(nullptr),
31 communicatorAggregator_(nullptr),
32 mainLoop_(nullptr),
33 currentTimerId_(0),
34 taskPool_(nullptr),
35 taskPoolReportsTimerId_(0),
36 timeTickMonitor_(nullptr),
37 systemApiAdapter_(nullptr),
38 lockStatusObserver_(nullptr),
39 currentSessionId_(1),
40 dbStatusAdapter_(nullptr),
41 subscribeRecorder_(nullptr)
42 {
43 }
44
45 // Destruct the object.
~RuntimeContextImpl()46 RuntimeContextImpl::~RuntimeContextImpl()
47 {
48 if (taskPoolReportsTimerId_ > 0) {
49 RemoveTimer(taskPoolReportsTimerId_, true);
50 taskPoolReportsTimerId_ = 0;
51 }
52 if (taskPool_ != nullptr) {
53 taskPool_->Stop();
54 taskPool_->Release(taskPool_);
55 taskPool_ = nullptr;
56 }
57 if (mainLoop_ != nullptr) {
58 mainLoop_->Stop();
59 mainLoop_->KillAndDecObjRef(mainLoop_);
60 mainLoop_ = nullptr;
61 }
62 SetCommunicatorAggregator(nullptr);
63 (void)SetCommunicatorAdapter(nullptr);
64 systemApiAdapter_ = nullptr;
65 delete lockStatusObserver_;
66 lockStatusObserver_ = nullptr;
67 userChangeMonitor_ = nullptr;
68 dbStatusAdapter_ = nullptr;
69 subscribeRecorder_ = nullptr;
70 SetThreadPool(nullptr);
71 }
72
73 // Set the label of this process.
SetProcessLabel(const std::string & label)74 void RuntimeContextImpl::SetProcessLabel(const std::string &label)
75 {
76 std::lock_guard<std::mutex> labelLock(labelMutex_);
77 processLabel_ = label;
78 }
79
GetProcessLabel() const80 std::string RuntimeContextImpl::GetProcessLabel() const
81 {
82 std::lock_guard<std::mutex> labelLock(labelMutex_);
83 return processLabel_;
84 }
85
SetCommunicatorAdapter(IAdapter * adapter)86 int RuntimeContextImpl::SetCommunicatorAdapter(IAdapter *adapter)
87 {
88 {
89 std::lock_guard<std::mutex> autoLock(communicatorLock_);
90 if (adapter_ != nullptr) {
91 if (communicatorAggregator_ != nullptr) {
92 return -E_NOT_SUPPORT;
93 }
94 delete adapter_;
95 }
96 adapter_ = adapter;
97 }
98 ICommunicatorAggregator *communicatorAggregator = nullptr;
99 GetCommunicatorAggregator(communicatorAggregator);
100 autoLaunch_.SetCommunicatorAggregator(communicatorAggregator);
101 return E_OK;
102 }
103
GetCommunicatorAggregator(ICommunicatorAggregator * & outAggregator)104 int RuntimeContextImpl::GetCommunicatorAggregator(ICommunicatorAggregator *&outAggregator)
105 {
106 outAggregator = nullptr;
107 const std::shared_ptr<DBStatusAdapter> statusAdapter = GetDBStatusAdapter();
108 std::lock_guard<std::mutex> lock(communicatorLock_);
109 if (communicatorAggregator_ != nullptr) {
110 outAggregator = communicatorAggregator_;
111 return E_OK;
112 }
113
114 if (adapter_ == nullptr) {
115 LOGE("Adapter has not set!");
116 return -E_NOT_INIT;
117 }
118
119 communicatorAggregator_ = new (std::nothrow) CommunicatorAggregator;
120 if (communicatorAggregator_ == nullptr) {
121 LOGE("CommunicatorAggregator create failed, may be no available memory!");
122 return -E_OUT_OF_MEMORY;
123 }
124
125 int errCode = communicatorAggregator_->Initialize(adapter_, statusAdapter);
126 if (errCode != E_OK) {
127 LOGE("CommunicatorAggregator init failed, err = %d!", errCode);
128 RefObject::KillAndDecObjRef(communicatorAggregator_);
129 communicatorAggregator_ = nullptr;
130 }
131 outAggregator = communicatorAggregator_;
132 return errCode;
133 }
134
SetCommunicatorAggregator(ICommunicatorAggregator * inAggregator)135 void RuntimeContextImpl::SetCommunicatorAggregator(ICommunicatorAggregator *inAggregator)
136 {
137 std::lock_guard<std::mutex> autoLock(communicatorLock_);
138 if (communicatorAggregator_ != nullptr) {
139 autoLaunch_.SetCommunicatorAggregator(nullptr);
140 communicatorAggregator_->Finalize();
141 RefObject::KillAndDecObjRef(communicatorAggregator_);
142 }
143 communicatorAggregator_ = inAggregator;
144 autoLaunch_.SetCommunicatorAggregator(communicatorAggregator_);
145 }
146
GetLocalIdentity(std::string & outTarget)147 int RuntimeContextImpl::GetLocalIdentity(std::string &outTarget)
148 {
149 std::lock_guard<std::mutex> autoLock(communicatorLock_);
150 if (communicatorAggregator_ != nullptr) {
151 return communicatorAggregator_->GetLocalIdentity(outTarget);
152 }
153 return -E_NOT_INIT;
154 }
155
156 // Add and start a timer.
SetTimer(int milliSeconds,const TimerAction & action,const TimerFinalizer & finalizer,TimerId & timerId)157 int RuntimeContextImpl::SetTimer(int milliSeconds, const TimerAction &action,
158 const TimerFinalizer &finalizer, TimerId &timerId)
159 {
160 timerId = 0;
161 if ((milliSeconds < 0) || !action) {
162 return -E_INVALID_ARGS;
163 }
164 int errCode = SetTimerByThreadPool(milliSeconds, action, finalizer, true, timerId);
165 if (errCode != -E_NOT_SUPPORT) {
166 return errCode;
167 }
168 IEventLoop *loop = nullptr;
169 errCode = PrepareLoop(loop);
170 if (errCode != E_OK) {
171 LOGE("SetTimer(), prepare loop failed.");
172 return errCode;
173 }
174
175 IEvent *evTimer = IEvent::CreateEvent(milliSeconds, errCode);
176 if (evTimer == nullptr) {
177 loop->DecObjRef(loop);
178 loop = nullptr;
179 return errCode;
180 }
181
182 errCode = AllocTimerId(evTimer, timerId);
183 if (errCode != E_OK) {
184 evTimer->DecObjRef(evTimer);
185 evTimer = nullptr;
186 loop->DecObjRef(loop);
187 loop = nullptr;
188 return errCode;
189 }
190
191 evTimer->SetAction([this, timerId, action](EventsMask revents) -> int {
192 int errCodeInner = action(timerId);
193 if (errCodeInner != E_OK) {
194 RemoveTimer(timerId, false);
195 }
196 return errCodeInner;
197 },
198 finalizer);
199
200 errCode = loop->Add(evTimer);
201 if (errCode != E_OK) {
202 evTimer->IgnoreFinalizer();
203 RemoveTimer(timerId, false);
204 timerId = 0;
205 }
206
207 loop->DecObjRef(loop);
208 loop = nullptr;
209 return errCode;
210 }
211
212 // Modify the interval of the timer.
ModifyTimer(TimerId timerId,int milliSeconds)213 int RuntimeContextImpl::ModifyTimer(TimerId timerId, int milliSeconds)
214 {
215 if (milliSeconds < 0) {
216 return -E_INVALID_ARGS;
217 }
218 int errCode = ModifyTimerByThreadPool(timerId, milliSeconds);
219 if (errCode != -E_NOT_SUPPORT) {
220 return errCode;
221 }
222 std::lock_guard<std::mutex> autoLock(timersLock_);
223 auto iter = timers_.find(timerId);
224 if (iter == timers_.end()) {
225 return -E_NO_SUCH_ENTRY;
226 }
227
228 IEvent *evTimer = iter->second;
229 if (evTimer == nullptr) {
230 return -E_INTERNAL_ERROR;
231 }
232 return evTimer->SetTimeout(milliSeconds);
233 }
234
235 // Remove the timer.
RemoveTimer(TimerId timerId,bool wait)236 void RuntimeContextImpl::RemoveTimer(TimerId timerId, bool wait)
237 {
238 RemoveTimerByThreadPool(timerId, wait);
239 IEvent *evTimer = nullptr;
240 {
241 std::lock_guard<std::mutex> autoLock(timersLock_);
242 auto iter = timers_.find(timerId);
243 if (iter == timers_.end()) {
244 return;
245 }
246 evTimer = iter->second;
247 timers_.erase(iter);
248 }
249
250 if (evTimer != nullptr) {
251 evTimer->Detach(wait);
252 evTimer->DecObjRef(evTimer);
253 evTimer = nullptr;
254 }
255 }
256
257 // Task interfaces.
ScheduleTask(const TaskAction & task)258 int RuntimeContextImpl::ScheduleTask(const TaskAction &task)
259 {
260 if (ScheduleTaskByThreadPool(task) == E_OK) {
261 return E_OK;
262 }
263 std::lock_guard<std::mutex> autoLock(taskLock_);
264 int errCode = PrepareTaskPool();
265 if (errCode != E_OK) {
266 LOGE("Schedule task failed, fail to prepare task pool.");
267 return errCode;
268 }
269 #ifdef RUNNING_ON_TESTCASE
270 auto id = taskID++;
271 LOGI("Schedule task succeed, ID:%u", id);
272 return taskPool_->Schedule([task, id] {
273 LOGI("Execute task, ID:%u", id);
274 task();
275 });
276 #else
277 return taskPool_->Schedule(task);
278 #endif
279 }
280
ScheduleQueuedTask(const std::string & queueTag,const TaskAction & task)281 int RuntimeContextImpl::ScheduleQueuedTask(const std::string &queueTag,
282 const TaskAction &task)
283 {
284 if (ScheduleTaskByThreadPool(task) == E_OK) {
285 return E_OK;
286 }
287 std::lock_guard<std::mutex> autoLock(taskLock_);
288 int errCode = PrepareTaskPool();
289 if (errCode != E_OK) {
290 LOGE("Schedule queued task failed, fail to prepare task pool.");
291 return errCode;
292 }
293 #ifdef RUNNING_ON_TESTCASE
294 auto id = taskID++;
295 LOGI("Schedule queued task succeed, ID:%u", id);
296 return taskPool_->Schedule(queueTag, [task, id] {
297 LOGI("Execute queued task, ID:%u", id);
298 task();
299 });
300 #else
301 return taskPool_->Schedule(queueTag, task);
302 #endif
303 }
304
ShrinkMemory(const std::string & description)305 void RuntimeContextImpl::ShrinkMemory(const std::string &description)
306 {
307 std::lock_guard<std::mutex> autoLock(taskLock_);
308 if (taskPool_ != nullptr) {
309 taskPool_->ShrinkMemory(description);
310 }
311 }
312
RegisterTimeChangedLister(const TimeChangedAction & action,const TimeFinalizeAction & finalize,int & errCode)313 NotificationChain::Listener *RuntimeContextImpl::RegisterTimeChangedLister(const TimeChangedAction &action,
314 const TimeFinalizeAction &finalize, int &errCode)
315 {
316 std::lock_guard<std::mutex> autoLock(timeTickMonitorLock_);
317 if (timeTickMonitor_ == nullptr) {
318 timeTickMonitor_ = std::make_unique<TimeTickMonitor>();
319 errCode = timeTickMonitor_->StartTimeTickMonitor();
320 if (errCode != E_OK) {
321 LOGE("TimeTickMonitor start failed!");
322 timeTickMonitor_ = nullptr;
323 return nullptr;
324 }
325 LOGD("[RuntimeContext] TimeTickMonitor start success");
326 }
327 LOGD("[RuntimeContext] call RegisterTimeChangedLister");
328 return timeTickMonitor_->RegisterTimeChangedLister(action, finalize, errCode);
329 }
330
PrepareLoop(IEventLoop * & loop)331 int RuntimeContextImpl::PrepareLoop(IEventLoop *&loop)
332 {
333 std::lock_guard<std::mutex> autoLock(loopLock_);
334 if (mainLoop_ != nullptr) {
335 loop = mainLoop_;
336 loop->IncObjRef(loop); // ref 1 returned to caller.
337 return E_OK;
338 }
339
340 int errCode = E_OK;
341 loop = IEventLoop::CreateEventLoop(errCode);
342 if (loop == nullptr) {
343 return errCode;
344 }
345
346 loop->IncObjRef(loop); // ref 1 owned by thread.
347 std::thread loopThread([loop]() {
348 loop->Run();
349 loop->DecObjRef(loop); // ref 1 dropped by thread.
350 });
351 loopThread.detach();
352
353 mainLoop_ = loop;
354 loop->IncObjRef(loop); // ref 1 returned to caller.
355 return E_OK;
356 }
357
PrepareTaskPool()358 int RuntimeContextImpl::PrepareTaskPool()
359 {
360 if (taskPool_ != nullptr) {
361 return E_OK;
362 }
363
364 int errCode = E_OK;
365 TaskPool *taskPool = TaskPool::Create(MAX_TP_THREADS, MIN_TP_THREADS, errCode);
366 if (taskPool == nullptr) {
367 return errCode;
368 }
369
370 errCode = taskPool->Start();
371 if (errCode != E_OK) {
372 taskPool->Release(taskPool);
373 return errCode;
374 }
375
376 taskPool_ = taskPool;
377 return E_OK;
378 }
379
AllocTimerId(IEvent * evTimer,TimerId & timerId)380 int RuntimeContextImpl::AllocTimerId(IEvent *evTimer, TimerId &timerId)
381 {
382 std::lock_guard<std::mutex> autoLock(timersLock_);
383 TimerId startId = currentTimerId_;
384 while (++currentTimerId_ != startId) {
385 if (currentTimerId_ == 0) {
386 continue;
387 }
388 if (timers_.find(currentTimerId_) == timers_.end()) {
389 timerId = currentTimerId_;
390 timers_[timerId] = evTimer;
391 return E_OK;
392 }
393 }
394 return -E_OUT_OF_IDS;
395 }
396
SetPermissionCheckCallback(const PermissionCheckCallback & callback)397 int RuntimeContextImpl::SetPermissionCheckCallback(const PermissionCheckCallback &callback)
398 {
399 std::unique_lock<std::shared_mutex> writeLock(permissionCheckCallbackMutex_);
400 permissionCheckCallback_ = callback;
401 LOGI("SetPermissionCheckCallback ok");
402 return E_OK;
403 }
404
SetPermissionCheckCallback(const PermissionCheckCallbackV2 & callback)405 int RuntimeContextImpl::SetPermissionCheckCallback(const PermissionCheckCallbackV2 &callback)
406 {
407 std::unique_lock<std::shared_mutex> writeLock(permissionCheckCallbackMutex_);
408 permissionCheckCallbackV2_ = callback;
409 LOGI("SetPermissionCheckCallback V2 ok");
410 return E_OK;
411 }
412
SetPermissionCheckCallback(const PermissionCheckCallbackV3 & callback)413 int RuntimeContextImpl::SetPermissionCheckCallback(const PermissionCheckCallbackV3 &callback)
414 {
415 std::unique_lock<std::shared_mutex> writeLock(permissionCheckCallbackMutex_);
416 permissionCheckCallbackV3_ = callback;
417 LOGI("SetPermissionCheckCallback V3 ok");
418 return E_OK;
419 }
420
RunPermissionCheck(const PermissionCheckParam & param,uint8_t flag) const421 int RuntimeContextImpl::RunPermissionCheck(const PermissionCheckParam ¶m, uint8_t flag) const
422 {
423 bool checkResult = false;
424 std::shared_lock<std::shared_mutex> autoLock(permissionCheckCallbackMutex_);
425 if (permissionCheckCallbackV3_) {
426 checkResult = permissionCheckCallbackV3_(param, flag);
427 } else if (permissionCheckCallbackV2_) {
428 checkResult = permissionCheckCallbackV2_(param.userId, param.appId, param.storeId, param.deviceId, flag);
429 } else if (permissionCheckCallback_) {
430 checkResult = permissionCheckCallback_(param.userId, param.appId, param.storeId, flag);
431 } else {
432 return E_OK;
433 }
434 if (checkResult) {
435 return E_OK;
436 }
437 return -E_NOT_PERMIT;
438 }
439
EnableKvStoreAutoLaunch(const KvDBProperties & properties,AutoLaunchNotifier notifier,const AutoLaunchOption & option)440 int RuntimeContextImpl::EnableKvStoreAutoLaunch(const KvDBProperties &properties, AutoLaunchNotifier notifier,
441 const AutoLaunchOption &option)
442 {
443 return autoLaunch_.EnableKvStoreAutoLaunch(properties, notifier, option);
444 }
445
DisableKvStoreAutoLaunch(const std::string & normalIdentifier,const std::string & dualTupleIdentifier,const std::string & userId)446 int RuntimeContextImpl::DisableKvStoreAutoLaunch(const std::string &normalIdentifier,
447 const std::string &dualTupleIdentifier, const std::string &userId)
448 {
449 return autoLaunch_.DisableKvStoreAutoLaunch(normalIdentifier, dualTupleIdentifier, userId);
450 }
451
GetAutoLaunchSyncDevices(const std::string & identifier,std::vector<std::string> & devices) const452 void RuntimeContextImpl::GetAutoLaunchSyncDevices(const std::string &identifier,
453 std::vector<std::string> &devices) const
454 {
455 return autoLaunch_.GetAutoLaunchSyncDevices(identifier, devices);
456 }
457
SetAutoLaunchRequestCallback(const AutoLaunchRequestCallback & callback,DBTypeInner type)458 void RuntimeContextImpl::SetAutoLaunchRequestCallback(const AutoLaunchRequestCallback &callback, DBTypeInner type)
459 {
460 autoLaunch_.SetAutoLaunchRequestCallback(callback, type);
461 }
462
RegisterLockStatusLister(const LockStatusNotifier & action,int & errCode)463 NotificationChain::Listener *RuntimeContextImpl::RegisterLockStatusLister(const LockStatusNotifier &action,
464 int &errCode)
465 {
466 std::lock(lockStatusLock_, systemApiAdapterLock_);
467 std::lock_guard<std::mutex> lockStatusLock(lockStatusLock_, std::adopt_lock);
468 std::lock_guard<std::recursive_mutex> systemApiAdapterLock(systemApiAdapterLock_, std::adopt_lock);
469 if (lockStatusObserver_ == nullptr) {
470 lockStatusObserver_ = new (std::nothrow) LockStatusObserver();
471 if (lockStatusObserver_ == nullptr) {
472 LOGE("lockStatusObserver_ is nullptr");
473 errCode = -E_OUT_OF_MEMORY;
474 return nullptr;
475 }
476 }
477
478 if (!lockStatusObserver_->IsStarted()) {
479 errCode = lockStatusObserver_->Start();
480 if (errCode != E_OK) {
481 LOGE("lockStatusObserver start failed, err = %d", errCode);
482 delete lockStatusObserver_;
483 lockStatusObserver_ = nullptr;
484 return nullptr;
485 }
486
487 if (systemApiAdapter_ != nullptr) {
488 auto callback = std::bind(&LockStatusObserver::OnStatusChange,
489 lockStatusObserver_, std::placeholders::_1);
490 errCode = systemApiAdapter_->RegOnAccessControlledEvent(callback);
491 if (errCode != OK) {
492 delete lockStatusObserver_;
493 lockStatusObserver_ = nullptr;
494 return nullptr;
495 }
496 }
497 }
498
499 NotificationChain::Listener *listener = lockStatusObserver_->RegisterLockStatusChangedLister(action, errCode);
500 if ((listener == nullptr) || (errCode != E_OK)) {
501 LOGE("Register lock status changed listener failed, err = %d", errCode);
502 delete lockStatusObserver_;
503 lockStatusObserver_ = nullptr;
504 return nullptr;
505 }
506 return listener;
507 }
508
IsAccessControlled() const509 bool RuntimeContextImpl::IsAccessControlled() const
510 {
511 std::lock_guard<std::recursive_mutex> autoLock(systemApiAdapterLock_);
512 if (systemApiAdapter_ == nullptr) {
513 return false;
514 }
515 return systemApiAdapter_->IsAccessControlled();
516 }
517
SetSecurityOption(const std::string & filePath,const SecurityOption & option) const518 int RuntimeContextImpl::SetSecurityOption(const std::string &filePath, const SecurityOption &option) const
519 {
520 std::lock_guard<std::recursive_mutex> autoLock(systemApiAdapterLock_);
521 if (systemApiAdapter_ == nullptr || !OS::CheckPathExistence(filePath)) {
522 LOGI("Adapter is not set, or path not existed, not support set security option!");
523 return -E_NOT_SUPPORT;
524 }
525
526 if (option == SecurityOption()) {
527 LOGD("SecurityOption is NOT_SET,Not need to set security option!");
528 return E_OK;
529 }
530
531 std::string fileRealPath;
532 int errCode = OS::GetRealPath(filePath, fileRealPath);
533 if (errCode != E_OK) {
534 LOGE("Get real path failed when set security option!");
535 return errCode;
536 }
537
538 errCode = systemApiAdapter_->SetSecurityOption(fileRealPath, option);
539 if (errCode != OK) {
540 if (errCode == NOT_SUPPORT) {
541 return -E_NOT_SUPPORT;
542 }
543 LOGE("SetSecurityOption failed, errCode = %d", errCode);
544 return -E_SYSTEM_API_ADAPTER_CALL_FAILED;
545 }
546 return E_OK;
547 }
548
GetSecurityOption(const std::string & filePath,SecurityOption & option) const549 int RuntimeContextImpl::GetSecurityOption(const std::string &filePath, SecurityOption &option) const
550 {
551 std::lock_guard<std::recursive_mutex> autoLock(systemApiAdapterLock_);
552 if (systemApiAdapter_ == nullptr) {
553 LOGI("Get Security option, but not set system api adapter!");
554 return -E_NOT_SUPPORT;
555 }
556 int errCode = systemApiAdapter_->GetSecurityOption(filePath, option);
557 if (errCode != OK) {
558 if (errCode == NOT_SUPPORT) {
559 return -E_NOT_SUPPORT;
560 }
561 LOGE("GetSecurityOption failed, errCode = %d", errCode);
562 return -E_SYSTEM_API_ADAPTER_CALL_FAILED;
563 }
564
565 LOGD("Get security option from system adapter [%d, %d]", option.securityLabel, option.securityFlag);
566 // This interface may return success but failed to obtain the flag and modified it to -1
567 if (option.securityFlag == INVALID_SEC_FLAG) {
568 // Currently ignoring the failure to obtain flags -1 other than S3, modify the flag to the default value
569 if (option.securityLabel == S3) {
570 LOGE("GetSecurityOption failed, SecurityOption is invalid [3, -1]!");
571 return -E_SYSTEM_API_ADAPTER_CALL_FAILED;
572 }
573 option.securityFlag = 0; // 0 is default value
574 }
575 return E_OK;
576 }
577
CheckDeviceSecurityAbility(const std::string & devId,const SecurityOption & option) const578 bool RuntimeContextImpl::CheckDeviceSecurityAbility(const std::string &devId, const SecurityOption &option) const
579 {
580 std::shared_ptr<IProcessSystemApiAdapter> tempSystemApiAdapter = nullptr;
581 {
582 std::lock_guard<std::recursive_mutex> autoLock(systemApiAdapterLock_);
583 if (systemApiAdapter_ == nullptr) {
584 LOGI("[CheckDeviceSecurityAbility] security not set");
585 return true;
586 }
587 tempSystemApiAdapter = systemApiAdapter_;
588 }
589
590 return tempSystemApiAdapter->CheckDeviceSecurityAbility(devId, option);
591 }
592
SetProcessSystemApiAdapter(const std::shared_ptr<IProcessSystemApiAdapter> & adapter)593 int RuntimeContextImpl::SetProcessSystemApiAdapter(const std::shared_ptr<IProcessSystemApiAdapter> &adapter)
594 {
595 std::lock(lockStatusLock_, systemApiAdapterLock_);
596 std::lock_guard<std::mutex> lockStatusLock(lockStatusLock_, std::adopt_lock);
597 std::lock_guard<std::recursive_mutex> systemApiAdapterLock(systemApiAdapterLock_, std::adopt_lock);
598 systemApiAdapter_ = adapter;
599 if (systemApiAdapter_ != nullptr && lockStatusObserver_ != nullptr && lockStatusObserver_->IsStarted()) {
600 auto callback = std::bind(&LockStatusObserver::OnStatusChange,
601 lockStatusObserver_, std::placeholders::_1);
602 int errCode = systemApiAdapter_->RegOnAccessControlledEvent(callback);
603 if (errCode != OK) {
604 LOGE("Register access controlled event failed while setting adapter, err = %d", errCode);
605 delete lockStatusObserver_;
606 lockStatusObserver_ = nullptr;
607 return -E_SYSTEM_API_ADAPTER_CALL_FAILED;
608 }
609 }
610 return E_OK;
611 }
612
IsProcessSystemApiAdapterValid() const613 bool RuntimeContextImpl::IsProcessSystemApiAdapterValid() const
614 {
615 std::lock_guard<std::recursive_mutex> autoLock(systemApiAdapterLock_);
616 return (systemApiAdapter_ != nullptr);
617 }
618
NotifyTimestampChanged(TimeOffset offset) const619 void RuntimeContextImpl::NotifyTimestampChanged(TimeOffset offset) const
620 {
621 std::lock_guard<std::mutex> autoLock(timeTickMonitorLock_);
622 if (timeTickMonitor_ == nullptr) {
623 LOGD("NotifyTimestampChanged fail, timeTickMonitor_ is null.");
624 return;
625 }
626 timeTickMonitor_->NotifyTimeChange(offset);
627 }
628
IsCommunicatorAggregatorValid() const629 bool RuntimeContextImpl::IsCommunicatorAggregatorValid() const
630 {
631 std::lock_guard<std::mutex> autoLock(communicatorLock_);
632 if (communicatorAggregator_ == nullptr && adapter_ == nullptr) {
633 return false;
634 }
635 return true;
636 }
637
SetStoreStatusNotifier(const StoreStatusNotifier & notifier)638 void RuntimeContextImpl::SetStoreStatusNotifier(const StoreStatusNotifier ¬ifier)
639 {
640 std::unique_lock<std::shared_mutex> writeLock(databaseStatusCallbackMutex_);
641 databaseStatusNotifyCallback_ = notifier;
642 LOGI("SetStoreStatusNotifier ok");
643 }
644
NotifyDatabaseStatusChange(const std::string & userId,const std::string & appId,const std::string & storeId,const std::string & deviceId,bool onlineStatus)645 void RuntimeContextImpl::NotifyDatabaseStatusChange(const std::string &userId, const std::string &appId,
646 const std::string &storeId, const std::string &deviceId, bool onlineStatus)
647 {
648 ScheduleTask([this, userId, appId, storeId, deviceId, onlineStatus] {
649 std::shared_lock<std::shared_mutex> autoLock(databaseStatusCallbackMutex_);
650 if (databaseStatusNotifyCallback_) {
651 LOGI("start notify database status:%d", onlineStatus);
652 databaseStatusNotifyCallback_(userId, appId, storeId, deviceId, onlineStatus);
653 }
654 });
655 }
656
SetSyncActivationCheckCallback(const SyncActivationCheckCallback & callback)657 int RuntimeContextImpl::SetSyncActivationCheckCallback(const SyncActivationCheckCallback &callback)
658 {
659 std::unique_lock<std::shared_mutex> writeLock(syncActivationCheckCallbackMutex_);
660 syncActivationCheckCallback_ = callback;
661 LOGI("SetSyncActivationCheckCallback ok");
662 return E_OK;
663 }
664
SetSyncActivationCheckCallback(const SyncActivationCheckCallbackV2 & callback)665 int RuntimeContextImpl::SetSyncActivationCheckCallback(const SyncActivationCheckCallbackV2 &callback)
666 {
667 std::unique_lock<std::shared_mutex> writeLock(syncActivationCheckCallbackMutex_);
668 syncActivationCheckCallbackV2_ = callback;
669 LOGI("SetSyncActivationCheckCallbackV2 ok");
670 return E_OK;
671 }
672
IsSyncerNeedActive(const DBProperties & properties) const673 bool RuntimeContextImpl::IsSyncerNeedActive(const DBProperties &properties) const
674 {
675 ActivationCheckParam param = {
676 properties.GetStringProp(DBProperties::USER_ID, ""),
677 properties.GetStringProp(DBProperties::APP_ID, ""),
678 properties.GetStringProp(DBProperties::STORE_ID, ""),
679 properties.GetIntProp(DBProperties::INSTANCE_ID, 0)
680 };
681 std::shared_lock<std::shared_mutex> autoLock(syncActivationCheckCallbackMutex_);
682 if (syncActivationCheckCallbackV2_) {
683 return syncActivationCheckCallbackV2_(param);
684 } else if (syncActivationCheckCallback_) {
685 return syncActivationCheckCallback_(param.userId, param.appId, param.storeId);
686 }
687 return true;
688 }
689
RegisterUserChangedListener(const UserChangedAction & action,EventType event)690 NotificationChain::Listener *RuntimeContextImpl::RegisterUserChangedListener(const UserChangedAction &action,
691 EventType event)
692 {
693 int errCode;
694 std::lock_guard<std::mutex> autoLock(userChangeMonitorLock_);
695 if (userChangeMonitor_ == nullptr) {
696 userChangeMonitor_ = std::make_unique<UserChangeMonitor>();
697 errCode = userChangeMonitor_->Start();
698 if (errCode != E_OK) {
699 LOGE("UserChangeMonitor start failed!");
700 userChangeMonitor_ = nullptr;
701 return nullptr;
702 }
703 }
704 NotificationChain::Listener *listener = userChangeMonitor_->RegisterUserChangedListener(action, event, errCode);
705 if ((listener == nullptr) || (errCode != E_OK)) {
706 LOGE("Register user status changed listener failed, err = %d", errCode);
707 return nullptr;
708 }
709 return listener;
710 }
711
NotifyUserChanged() const712 int RuntimeContextImpl::NotifyUserChanged() const
713 {
714 {
715 std::lock_guard<std::mutex> autoLock(userChangeMonitorLock_);
716 if (userChangeMonitor_ == nullptr) {
717 LOGD("userChangeMonitor is null, all db is in normal sync mode");
718 return E_OK;
719 }
720 }
721 userChangeMonitor_->NotifyUserChanged();
722 return E_OK;
723 }
724
GenerateSessionId()725 uint32_t RuntimeContextImpl::GenerateSessionId()
726 {
727 uint32_t sessionId = currentSessionId_++;
728 if (sessionId == 0) {
729 sessionId = currentSessionId_++;
730 }
731 return sessionId;
732 }
733
DumpCommonInfo(int fd)734 void RuntimeContextImpl::DumpCommonInfo(int fd)
735 {
736 autoLaunch_.Dump(fd);
737 }
738
CloseAutoLaunchConnection(DBTypeInner type,const DBProperties & properties)739 void RuntimeContextImpl::CloseAutoLaunchConnection(DBTypeInner type, const DBProperties &properties)
740 {
741 autoLaunch_.CloseConnection(type, properties);
742 }
743
SetPermissionConditionCallback(const PermissionConditionCallback & callback)744 int RuntimeContextImpl::SetPermissionConditionCallback(const PermissionConditionCallback &callback)
745 {
746 std::unique_lock<std::shared_mutex> autoLock(permissionConditionLock_);
747 permissionConditionCallback_ = callback;
748 return E_OK;
749 }
750
GetPermissionCheckParam(const DBProperties & properties)751 std::map<std::string, std::string> RuntimeContextImpl::GetPermissionCheckParam(const DBProperties &properties)
752 {
753 PermissionConditionParam param = {
754 properties.GetStringProp(DBProperties::USER_ID, ""),
755 properties.GetStringProp(DBProperties::APP_ID, ""),
756 properties.GetStringProp(DBProperties::STORE_ID, ""),
757 properties.GetIntProp(DBProperties::INSTANCE_ID, 0)
758 };
759 std::shared_lock<std::shared_mutex> autoLock(permissionConditionLock_);
760 if (permissionConditionCallback_ == nullptr) {
761 return {};
762 }
763 return permissionConditionCallback_(param);
764 }
765
StopTaskPool()766 void RuntimeContextImpl::StopTaskPool()
767 {
768 std::lock_guard<std::mutex> autoLock(taskLock_);
769 if (taskPool_ != nullptr) {
770 taskPool_->Stop();
771 TaskPool::Release(taskPool_);
772 taskPool_ = nullptr;
773 }
774 }
775
StopTimeTickMonitorIfNeed()776 void RuntimeContextImpl::StopTimeTickMonitorIfNeed()
777 {
778 std::lock_guard<std::mutex> autoLock(timeTickMonitorLock_);
779 if (timeTickMonitor_ == nullptr) {
780 return;
781 }
782 if (timeTickMonitor_->EmptyListener()) {
783 LOGD("[RuntimeContext] TimeTickMonitor exist because no listener");
784 timeTickMonitor_ = nullptr;
785 }
786 LOGD("[RuntimeContext] TimeTickMonitor can not stop because listener is not empty");
787 }
788
SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> & handle)789 void RuntimeContextImpl::SetDBInfoHandle(const std::shared_ptr<DBInfoHandle> &handle)
790 {
791 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
792 if (dbStatusAdapter != nullptr) {
793 dbStatusAdapter->SetDBInfoHandle(handle);
794 }
795 std::shared_ptr<SubscribeRecorder> subscribeRecorder = GetSubscribeRecorder();
796 if (subscribeRecorder != nullptr) {
797 subscribeRecorder->RemoveAllSubscribe();
798 }
799 }
800
NotifyDBInfos(const DeviceInfos & devInfos,const std::vector<DBInfo> & dbInfos)801 void RuntimeContextImpl::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector<DBInfo> &dbInfos)
802 {
803 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
804 if (dbStatusAdapter != nullptr) {
805 dbStatusAdapter->NotifyDBInfos(devInfos, dbInfos);
806 }
807 }
808
GetDBStatusAdapter()809 std::shared_ptr<DBStatusAdapter> RuntimeContextImpl::GetDBStatusAdapter()
810 {
811 std::lock_guard<std::mutex> autoLock(statusAdapterMutex_);
812 if (dbStatusAdapter_ == nullptr) {
813 dbStatusAdapter_ = std::make_unique<DBStatusAdapter>();
814 }
815 if (dbStatusAdapter_ == nullptr) {
816 LOGE("[RuntimeContextImpl] DbStatusAdapter create failed!");
817 }
818 return dbStatusAdapter_;
819 }
820
RecordRemoteSubscribe(const DBInfo & dbInfo,const DeviceID & deviceId,const QuerySyncObject & query)821 void RuntimeContextImpl::RecordRemoteSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId,
822 const QuerySyncObject &query)
823 {
824 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
825 if (dbStatusAdapter != nullptr && dbStatusAdapter->IsSendLabelExchange()) {
826 return;
827 }
828 std::shared_ptr<SubscribeRecorder> subscribeRecorder = GetSubscribeRecorder();
829 if (subscribeRecorder != nullptr) {
830 subscribeRecorder->RecordSubscribe(dbInfo, deviceId, query);
831 }
832 }
833
RemoveRemoteSubscribe(const DeviceID & deviceId)834 void RuntimeContextImpl::RemoveRemoteSubscribe(const DeviceID &deviceId)
835 {
836 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
837 if (dbStatusAdapter != nullptr && dbStatusAdapter->IsSendLabelExchange()) {
838 return;
839 }
840 std::shared_ptr<SubscribeRecorder> subscribeRecorder = GetSubscribeRecorder();
841 if (subscribeRecorder != nullptr) {
842 subscribeRecorder->RemoveRemoteSubscribe(deviceId);
843 }
844 }
845
RemoveRemoteSubscribe(const DBInfo & dbInfo)846 void RuntimeContextImpl::RemoveRemoteSubscribe(const DBInfo &dbInfo)
847 {
848 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
849 if (dbStatusAdapter != nullptr && dbStatusAdapter->IsSendLabelExchange()) {
850 return;
851 }
852 std::shared_ptr<SubscribeRecorder> subscribeRecorder = GetSubscribeRecorder();
853 if (subscribeRecorder != nullptr) {
854 subscribeRecorder->RemoveRemoteSubscribe(dbInfo);
855 }
856 }
857
RemoveRemoteSubscribe(const DBInfo & dbInfo,const DeviceID & deviceId)858 void RuntimeContextImpl::RemoveRemoteSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId)
859 {
860 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
861 if (dbStatusAdapter != nullptr && dbStatusAdapter->IsSendLabelExchange()) {
862 return;
863 }
864 std::shared_ptr<SubscribeRecorder> subscribeRecorder = GetSubscribeRecorder();
865 if (subscribeRecorder != nullptr) {
866 subscribeRecorder->RemoveRemoteSubscribe(dbInfo, deviceId);
867 }
868 }
869
RemoveRemoteSubscribe(const DBInfo & dbInfo,const DeviceID & deviceId,const QuerySyncObject & query)870 void RuntimeContextImpl::RemoveRemoteSubscribe(const DBInfo &dbInfo, const DeviceID &deviceId,
871 const QuerySyncObject &query)
872 {
873 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
874 if (dbStatusAdapter != nullptr && dbStatusAdapter->IsSendLabelExchange()) {
875 return;
876 }
877 std::shared_ptr<SubscribeRecorder> subscribeRecorder = GetSubscribeRecorder();
878 if (subscribeRecorder != nullptr) {
879 subscribeRecorder->RemoveRemoteSubscribe(dbInfo, deviceId, query);
880 }
881 }
882
GetSubscribeQuery(const DBInfo & dbInfo,std::map<std::string,std::vector<QuerySyncObject>> & subscribeQuery)883 void RuntimeContextImpl::GetSubscribeQuery(const DBInfo &dbInfo,
884 std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery)
885 {
886 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
887 if (dbStatusAdapter != nullptr && dbStatusAdapter->IsSendLabelExchange()) {
888 return;
889 }
890 std::shared_ptr<SubscribeRecorder> subscribeRecorder = GetSubscribeRecorder();
891 if (subscribeRecorder != nullptr) {
892 subscribeRecorder->GetSubscribeQuery(dbInfo, subscribeQuery);
893 }
894 }
895
GetSubscribeRecorder()896 std::shared_ptr<SubscribeRecorder> RuntimeContextImpl::GetSubscribeRecorder()
897 {
898 std::lock_guard<std::mutex> autoLock(subscribeRecorderMutex_);
899 if (subscribeRecorder_ == nullptr) {
900 subscribeRecorder_ = std::make_unique<SubscribeRecorder>();
901 }
902 if (subscribeRecorder_ == nullptr) {
903 LOGE("[RuntimeContextImpl] SubscribeRecorder create failed!");
904 }
905 return subscribeRecorder_;
906 }
907
IsNeedAutoSync(const std::string & userId,const std::string & appId,const std::string & storeId,const std::string & devInfo)908 bool RuntimeContextImpl::IsNeedAutoSync(const std::string &userId, const std::string &appId, const std::string &storeId,
909 const std::string &devInfo)
910 {
911 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
912 if (dbStatusAdapter == nullptr) {
913 return true;
914 }
915 return dbStatusAdapter->IsNeedAutoSync(userId, appId, storeId, devInfo);
916 }
917
SetRemoteOptimizeCommunication(const std::string & dev,bool optimize)918 void RuntimeContextImpl::SetRemoteOptimizeCommunication(const std::string &dev, bool optimize)
919 {
920 std::shared_ptr<DBStatusAdapter> dbStatusAdapter = GetDBStatusAdapter();
921 if (dbStatusAdapter == nullptr) {
922 return;
923 }
924 dbStatusAdapter->SetRemoteOptimizeCommunication(dev, optimize);
925 }
926
SetTranslateToDeviceIdCallback(const TranslateToDeviceIdCallback & callback)927 void RuntimeContextImpl::SetTranslateToDeviceIdCallback(const TranslateToDeviceIdCallback &callback)
928 {
929 std::lock_guard<std::mutex> autoLock(translateToDeviceIdLock_);
930 translateToDeviceIdCallback_ = callback;
931 deviceIdCache_.clear();
932 }
933
TranslateDeviceId(const std::string & deviceId,const StoreInfo & info,std::string & newDeviceId)934 int RuntimeContextImpl::TranslateDeviceId(const std::string &deviceId,
935 const StoreInfo &info, std::string &newDeviceId)
936 {
937 const std::string id = DBCommon::GenerateIdentifierId(info.storeId, info.appId, info.userId);
938 std::lock_guard<std::mutex> autoLock(translateToDeviceIdLock_);
939 if (translateToDeviceIdCallback_ == nullptr) {
940 return -E_NOT_SUPPORT;
941 }
942 if (deviceIdCache_.find(deviceId) == deviceIdCache_.end() ||
943 deviceIdCache_[deviceId].find(id) == deviceIdCache_[deviceId].end()) {
944 deviceIdCache_[deviceId][id] = translateToDeviceIdCallback_(deviceId, info);
945 }
946 newDeviceId = deviceIdCache_[deviceId][id];
947 return E_OK;
948 }
949
ExistTranslateDevIdCallback() const950 bool RuntimeContextImpl::ExistTranslateDevIdCallback() const
951 {
952 std::lock_guard<std::mutex> autoLock(translateToDeviceIdLock_);
953 return translateToDeviceIdCallback_ != nullptr;
954 }
955
SetThreadPool(const std::shared_ptr<IThreadPool> & threadPool)956 void RuntimeContextImpl::SetThreadPool(const std::shared_ptr<IThreadPool> &threadPool)
957 {
958 std::unique_lock<std::shared_mutex> writeLock(threadPoolLock_);
959 threadPool_ = threadPool;
960 LOGD("[RuntimeContext] Set thread pool finished");
961 }
962
GetThreadPool() const963 std::shared_ptr<IThreadPool> RuntimeContextImpl::GetThreadPool() const
964 {
965 std::shared_lock<std::shared_mutex> readLock(threadPoolLock_);
966 return threadPool_;
967 }
968
ScheduleTaskByThreadPool(const DistributedDB::TaskAction & task) const969 int RuntimeContextImpl::ScheduleTaskByThreadPool(const DistributedDB::TaskAction &task) const
970 {
971 std::shared_ptr<IThreadPool> threadPool = GetThreadPool();
972 if (threadPool == nullptr) {
973 return -E_NOT_SUPPORT;
974 }
975 (void)threadPool->Execute(task);
976 return E_OK;
977 }
978
SetTimerByThreadPool(int milliSeconds,const TimerAction & action,const TimerFinalizer & finalizer,bool allocTimerId,TimerId & timerId)979 int RuntimeContextImpl::SetTimerByThreadPool(int milliSeconds, const TimerAction &action,
980 const TimerFinalizer &finalizer, bool allocTimerId, TimerId &timerId)
981 {
982 std::shared_ptr<IThreadPool> threadPool = GetThreadPool();
983 if (threadPool == nullptr) {
984 return -E_NOT_SUPPORT;
985 }
986 int errCode = E_OK;
987 if (allocTimerId) {
988 errCode = AllocTimerId(nullptr, timerId);
989 } else {
990 std::lock_guard<std::mutex> autoLock(timerTaskLock_);
991 if (taskIds_.find(timerId) == taskIds_.end()) {
992 LOGD("[SetTimerByThreadPool] Timer has been remove");
993 return -E_NO_SUCH_ENTRY;
994 }
995 }
996 if (errCode != E_OK) {
997 return errCode;
998 }
999 std::lock_guard<std::mutex> autoLock(timerTaskLock_);
1000 if (!allocTimerId && taskIds_.find(timerId) == taskIds_.end()) {
1001 LOGD("[SetTimerByThreadPool] Timer has been remove");
1002 return -E_NO_SUCH_ENTRY;
1003 }
1004 timerFinalizers_[timerId] = finalizer;
1005 Duration duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1006 std::chrono::milliseconds(milliSeconds));
1007 TaskId taskId = threadPool->Execute([milliSeconds, action, timerId, this]() {
1008 ThreadPoolTimerAction(milliSeconds, action, timerId);
1009 }, duration);
1010 taskIds_[timerId] = taskId;
1011 return E_OK;
1012 }
1013
ModifyTimerByThreadPool(TimerId timerId,int milliSeconds)1014 int RuntimeContextImpl::ModifyTimerByThreadPool(TimerId timerId, int milliSeconds)
1015 {
1016 std::shared_ptr<IThreadPool> threadPool = GetThreadPool();
1017 if (threadPool == nullptr) {
1018 return -E_NOT_SUPPORT;
1019 }
1020 TaskId taskId;
1021 {
1022 std::lock_guard<std::mutex> autoLock(timerTaskLock_);
1023 if (taskIds_.find(timerId) == taskIds_.end()) {
1024 return -E_NO_SUCH_ENTRY;
1025 }
1026 taskId = taskIds_[timerId];
1027 }
1028 Duration duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
1029 std::chrono::milliseconds(milliSeconds));
1030 TaskId ret = threadPool->Reset(taskId, duration);
1031 if (ret != taskId) {
1032 return -E_NO_SUCH_ENTRY;
1033 }
1034 return E_OK;
1035 }
1036
RemoveTimerByThreadPool(TimerId timerId,bool wait)1037 void RuntimeContextImpl::RemoveTimerByThreadPool(TimerId timerId, bool wait)
1038 {
1039 std::shared_ptr<IThreadPool> threadPool = GetThreadPool();
1040 if (threadPool == nullptr) {
1041 return;
1042 }
1043 TaskId taskId;
1044 {
1045 std::lock_guard<std::mutex> autoLock(timerTaskLock_);
1046 if (taskIds_.find(timerId) == taskIds_.end()) {
1047 return;
1048 }
1049 taskId = taskIds_[timerId];
1050 taskIds_.erase(timerId);
1051 }
1052 bool removeBeforeExecute = threadPool->Remove(taskId, wait);
1053 TimerFinalizer timerFinalizer = nullptr;
1054 if (removeBeforeExecute) {
1055 std::lock_guard<std::mutex> autoLock(timerTaskLock_);
1056 timerFinalizer = timerFinalizers_[timerId];
1057 timerFinalizers_.erase(timerId);
1058 }
1059 if (timerFinalizer) {
1060 timerFinalizer();
1061 }
1062 }
1063
ThreadPoolTimerAction(int milliSeconds,const TimerAction & action,TimerId timerId)1064 void RuntimeContextImpl::ThreadPoolTimerAction(int milliSeconds, const TimerAction &action, TimerId timerId)
1065 {
1066 TimerFinalizer timerFinalizer = nullptr;
1067 bool timerExist = true;
1068 {
1069 std::lock_guard<std::mutex> autoLock(timerTaskLock_);
1070 if (timerFinalizers_.find(timerId) == timerFinalizers_.end()) {
1071 LOGD("[ThreadPoolTimerAction] Timer has been finalize");
1072 return;
1073 }
1074 timerFinalizer = timerFinalizers_[timerId];
1075 timerFinalizers_.erase(timerId);
1076 if (taskIds_.find(timerId) == taskIds_.end()) {
1077 LOGD("[ThreadPoolTimerAction] Timer has been removed");
1078 timerExist = false;
1079 }
1080 }
1081 if (timerExist && action(timerId) == E_OK) {
1082 // schedule task again
1083 int errCode = SetTimerByThreadPool(milliSeconds, action, timerFinalizer, false, timerId);
1084 if (errCode == E_OK) {
1085 return;
1086 }
1087 LOGW("[RuntimeContext] create timer failed %d", errCode);
1088 }
1089 if (timerFinalizer) {
1090 timerFinalizer();
1091 }
1092 {
1093 std::lock_guard<std::mutex> autoLock(timerTaskLock_);
1094 taskIds_.erase(timerId);
1095 }
1096 std::lock_guard<std::mutex> autoLock(timersLock_);
1097 timers_.erase(timerId);
1098 }
1099
SetCloudTranslate(const std::shared_ptr<ICloudDataTranslate> & dataTranslate)1100 void RuntimeContextImpl::SetCloudTranslate(const std::shared_ptr<ICloudDataTranslate> &dataTranslate)
1101 {
1102 std::unique_lock<std::shared_mutex> autoLock(dataTranslateLock_);
1103 dataTranslate_ = dataTranslate;
1104 }
1105
AssetToBlob(const Asset & asset,std::vector<uint8_t> & blob)1106 int RuntimeContextImpl::AssetToBlob(const Asset &asset, std::vector<uint8_t> &blob)
1107 {
1108 std::shared_lock<std::shared_mutex> autoLock(dataTranslateLock_);
1109 if (dataTranslate_ == nullptr) {
1110 return -E_NOT_INIT;
1111 }
1112 blob = dataTranslate_->AssetToBlob(asset);
1113 return E_OK;
1114 }
1115
AssetsToBlob(const Assets & assets,std::vector<uint8_t> & blob)1116 int RuntimeContextImpl::AssetsToBlob(const Assets &assets, std::vector<uint8_t> &blob)
1117 {
1118 std::shared_lock<std::shared_mutex> autoLock(dataTranslateLock_);
1119 if (dataTranslate_ == nullptr) {
1120 return -E_NOT_INIT;
1121 }
1122 blob = dataTranslate_->AssetsToBlob(assets);
1123 return E_OK;
1124 }
1125
BlobToAsset(const std::vector<uint8_t> & blob,Asset & asset)1126 int RuntimeContextImpl::BlobToAsset(const std::vector<uint8_t> &blob, Asset &asset)
1127 {
1128 std::shared_lock<std::shared_mutex> autoLock(dataTranslateLock_);
1129 if (dataTranslate_ == nullptr) {
1130 return -E_NOT_INIT;
1131 }
1132 asset = dataTranslate_->BlobToAsset(blob);
1133 return E_OK;
1134 }
1135
BlobToAssets(const std::vector<uint8_t> & blob,Assets & assets)1136 int RuntimeContextImpl::BlobToAssets(const std::vector<uint8_t> &blob, Assets &assets)
1137 {
1138 std::shared_lock<std::shared_mutex> autoLock(dataTranslateLock_);
1139 if (dataTranslate_ == nullptr) {
1140 return -E_NOT_INIT;
1141 }
1142 assets = dataTranslate_->BlobToAssets(blob);
1143 return E_OK;
1144 }
1145 } // namespace DistributedDB
1146