1 /* 2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef CONTACTSDATAABILITY_ASYNC_TASK_H 17 #define CONTACTSDATAABILITY_ASYNC_TASK_H 18 19 #include <atomic> 20 #include <chrono> 21 #include <ctime> 22 #include <exception> 23 #include <iostream> 24 #include <memory> 25 #include <mutex> 26 #include <queue> 27 #include <string> 28 #include <thread> 29 #include <vector> 30 31 #include "common.h" 32 #include "contacts_database.h" 33 #include "contacts_update_helper.h" 34 #include "hilog_wrapper.h" 35 #include "match_candidate.h" 36 37 namespace OHOS { 38 namespace Contacts { 39 class AsyncItem { 40 public: ~AsyncItem()41 virtual ~AsyncItem() 42 { 43 } 44 45 virtual void Run() = 0; 46 }; 47 48 class AsyncTaskMutex { 49 public: lock()50 void lock() 51 { 52 while (flag.test_and_set(std::memory_order_acquire)) { 53 } 54 } 55 unlock()56 void unlock() 57 { 58 flag.clear(std::memory_order_release); 59 } 60 61 private: 62 std::atomic_flag flag = ATOMIC_FLAG_INIT; 63 }; 64 65 class AsyncTaskQueue { 66 public: 67 // single instance Instance()68 static AsyncTaskQueue *Instance() 69 { 70 static AsyncTaskQueue obj; 71 return &obj; 72 } 73 74 public: 75 // clear task Clear()76 void Clear() 77 { 78 std::lock_guard<AsyncTaskMutex> lk(mtx); 79 while (que.size() > 0) 80 que.pop(); 81 } 82 83 // que empty Empty()84 bool Empty() const 85 { 86 std::lock_guard<AsyncTaskMutex> lk(mtx); 87 return que.empty(); 88 } 89 Size()90 size_t Size() const 91 { 92 std::lock_guard<AsyncTaskMutex> lk(mtx); 93 return que.size(); 94 } 95 GetThreads()96 size_t GetThreads() const 97 { 98 return threads; 99 } 100 Push(std::unique_ptr<AsyncItem> & task)101 bool Push(std::unique_ptr<AsyncItem> &task) 102 { 103 std::lock_guard<AsyncTaskMutex> lk(mtx); 104 if (maxSize > 0 && que.size() >= maxSize) { 105 HILOG_ERROR("AsyncTask maxSize error"); 106 return false; 107 } 108 que.push(task.release()); 109 return true; 110 } 111 112 // startTask 113 void Start(size_t threads = 1, size_t maxSize = 1000000) 114 { 115 if (this->threads > 0) { 116 return; 117 } 118 this->threads = threads; 119 this->maxSize = maxSize; 120 for (size_t i = 0; i < this->threads; i++) { 121 std::thread(std::bind(&AsyncTaskQueue::Run, this)).detach(); 122 } 123 } 124 125 public: Run()126 void Run() 127 { 128 AsyncItem *item = nullptr; 129 while (this->threads > 0) { 130 if (Pop(&item)) { 131 if (item != nullptr) { 132 item->Run(); 133 delete item; 134 item = nullptr; 135 } 136 } else { 137 std::chrono::milliseconds dura(1); 138 std::this_thread::sleep_for(dura); 139 } 140 } 141 } 142 143 private: 144 size_t maxSize; 145 size_t threads; 146 mutable AsyncTaskMutex mtx; 147 std::queue<AsyncItem *> que; AsyncTaskQueue()148 AsyncTaskQueue() 149 { 150 this->maxSize = 0; 151 this->threads = 0; 152 } 153 Pop(AsyncItem ** item)154 bool Pop(AsyncItem **item) 155 { 156 std::lock_guard<AsyncTaskMutex> lk(mtx); 157 if (que.empty()) { 158 return false; 159 } 160 *item = que.front(); 161 que.pop(); 162 return true; 163 } 164 }; 165 166 // impl run 167 class AsyncTask : public AsyncItem { 168 std::shared_ptr<OHOS::NativeRdb::RdbStore> store; 169 std::vector<int> rawContactIdVector; 170 bool isDeleted; 171 172 public: Run()173 void Run() 174 { 175 ContactsUpdateHelper contactsUpdateHelper; 176 contactsUpdateHelper.UpdateCallLogByPhoneNum(rawContactIdVector, store, isDeleted); 177 std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance(); 178 contactsDataBase->InsertMergeData(store, rawContactIdVector); 179 contactsDataBase->MarkMerge(store); 180 } 181 182 public: AsyncTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> & store,std::vector<int> & rawContactIdVector,bool isDeleted)183 AsyncTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store, std::vector<int> &rawContactIdVector, bool isDeleted) 184 { 185 this->store = store; 186 this->rawContactIdVector = rawContactIdVector; 187 this->isDeleted = isDeleted; 188 } 189 190 public: AsyncTask()191 AsyncTask() 192 { 193 } 194 }; 195 196 class AsyncDeleteContactsTask : public AsyncItem { 197 std::vector<OHOS::NativeRdb::ValuesBucket> queryValuesBucket; 198 std::shared_ptr<OHOS::NativeRdb::RdbStore> store; 199 200 public: Run()201 void Run() 202 { 203 std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance(); 204 contactsDataBase->DeleteRecordInsert(store, queryValuesBucket); 205 } 206 207 public: AsyncDeleteContactsTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> & store,std::vector<OHOS::NativeRdb::ValuesBucket> & queryValuesBucket)208 AsyncDeleteContactsTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store, 209 std::vector<OHOS::NativeRdb::ValuesBucket> &queryValuesBucket) 210 { 211 this->queryValuesBucket = queryValuesBucket; 212 this->store = store; 213 } 214 215 public: AsyncDeleteContactsTask()216 AsyncDeleteContactsTask() 217 { 218 } 219 }; 220 } // namespace Contacts 221 } // namespace OHOS 222 223 #endif // CONTACTSDATAABILITY_ASYNC_TASK_H 224