• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CONTACTSDATAABILITY_ASYNC_TASK_H
17 #define CONTACTSDATAABILITY_ASYNC_TASK_H
18 
19 #include <atomic>
20 #include <chrono>
21 #include <ctime>
22 #include <exception>
23 #include <iostream>
24 #include <memory>
25 #include <mutex>
26 #include <queue>
27 #include <string>
28 #include <thread>
29 #include <vector>
30 
31 #include "common.h"
32 #include "contacts_database.h"
33 #include "contacts_update_helper.h"
34 #include "hilog_wrapper.h"
35 #include "match_candidate.h"
36 
37 namespace OHOS {
38 namespace Contacts {
39 class AsyncItem {
40 public:
~AsyncItem()41     virtual ~AsyncItem()
42     {
43     }
44 
45     virtual void Run() = 0;
46 };
47 
48 class AsyncTaskMutex {
49 public:
lock()50     void lock()
51     {
52         while (flag.test_and_set(std::memory_order_acquire)) {
53         }
54     }
55 
unlock()56     void unlock()
57     {
58         flag.clear(std::memory_order_release);
59     }
60 
61 private:
62     std::atomic_flag flag = ATOMIC_FLAG_INIT;
63 };
64 
65 class AsyncTaskQueue {
66 public:
67     // single instance
Instance()68     static AsyncTaskQueue *Instance()
69     {
70         static AsyncTaskQueue obj;
71         return &obj;
72     }
73 
74 public:
75     // clear task
Clear()76     void Clear()
77     {
78         std::lock_guard<AsyncTaskMutex> lk(mtx);
79         while (que.size() > 0)
80             que.pop();
81     }
82 
83     // que empty
Empty()84     bool Empty() const
85     {
86         std::lock_guard<AsyncTaskMutex> lk(mtx);
87         return que.empty();
88     }
89 
Size()90     size_t Size() const
91     {
92         std::lock_guard<AsyncTaskMutex> lk(mtx);
93         return que.size();
94     }
95 
GetThreads()96     size_t GetThreads() const
97     {
98         return threads;
99     }
100 
Push(std::unique_ptr<AsyncItem> & task)101     bool Push(std::unique_ptr<AsyncItem> &task)
102     {
103         std::lock_guard<AsyncTaskMutex> lk(mtx);
104         if (maxSize > 0 && que.size() >= maxSize) {
105             HILOG_ERROR("AsyncTask maxSize error");
106             return false;
107         }
108         que.push(task.release());
109         return true;
110     }
111 
112     // startTask
113     void Start(size_t threads = 1, size_t maxSize = 1000000)
114     {
115         if (this->threads > 0) {
116             return;
117         }
118         this->threads = threads;
119         this->maxSize = maxSize;
120         for (size_t i = 0; i < this->threads; i++) {
121             std::thread(std::bind(&AsyncTaskQueue::Run, this)).detach();
122         }
123     }
124 
125 public:
Run()126     void Run()
127     {
128         AsyncItem *item = nullptr;
129         while (this->threads > 0) {
130             if (Pop(&item)) {
131                 if (item != nullptr) {
132                     item->Run();
133                     delete item;
134                     item = nullptr;
135                 }
136             } else {
137                 std::chrono::milliseconds dura(1);
138                 std::this_thread::sleep_for(dura);
139             }
140         }
141     }
142 
143 private:
144     size_t maxSize;
145     size_t threads;
146     mutable AsyncTaskMutex mtx;
147     std::queue<AsyncItem *> que;
AsyncTaskQueue()148     AsyncTaskQueue()
149     {
150         this->maxSize = 0;
151         this->threads = 0;
152     }
153 
Pop(AsyncItem ** item)154     bool Pop(AsyncItem **item)
155     {
156         std::lock_guard<AsyncTaskMutex> lk(mtx);
157         if (que.empty()) {
158             return false;
159         }
160         *item = que.front();
161         que.pop();
162         return true;
163     }
164 };
165 
166 // impl run
167 class AsyncTask : public AsyncItem {
168     std::shared_ptr<OHOS::NativeRdb::RdbStore> store;
169     std::vector<int> rawContactIdVector;
170     bool isDeleted;
171 
172 public:
Run()173     void Run()
174     {
175         ContactsUpdateHelper contactsUpdateHelper;
176         contactsUpdateHelper.UpdateCallLogByPhoneNum(rawContactIdVector, store, isDeleted);
177         std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance();
178         contactsDataBase->InsertMergeData(store, rawContactIdVector);
179         contactsDataBase->MarkMerge(store);
180     }
181 
182 public:
AsyncTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> & store,std::vector<int> & rawContactIdVector,bool isDeleted)183     AsyncTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store, std::vector<int> &rawContactIdVector, bool isDeleted)
184     {
185         this->store = store;
186         this->rawContactIdVector = rawContactIdVector;
187         this->isDeleted = isDeleted;
188     }
189 
190 public:
AsyncTask()191     AsyncTask()
192     {
193     }
194 };
195 
196 class AsyncDeleteContactsTask : public AsyncItem {
197     std::vector<OHOS::NativeRdb::ValuesBucket> queryValuesBucket;
198     std::shared_ptr<OHOS::NativeRdb::RdbStore> store;
199 
200 public:
Run()201     void Run()
202     {
203         std::shared_ptr<ContactsDataBase> contactsDataBase = ContactsDataBase::GetInstance();
204         contactsDataBase->DeleteRecordInsert(store, queryValuesBucket);
205     }
206 
207 public:
AsyncDeleteContactsTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> & store,std::vector<OHOS::NativeRdb::ValuesBucket> & queryValuesBucket)208     AsyncDeleteContactsTask(std::shared_ptr<OHOS::NativeRdb::RdbStore> &store,
209         std::vector<OHOS::NativeRdb::ValuesBucket> &queryValuesBucket)
210     {
211         this->queryValuesBucket = queryValuesBucket;
212         this->store = store;
213     }
214 
215 public:
AsyncDeleteContactsTask()216     AsyncDeleteContactsTask()
217     {
218     }
219 };
220 } // namespace Contacts
221 } // namespace OHOS
222 
223 #endif // CONTACTSDATAABILITY_ASYNC_TASK_H
224