• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "KvAdaptor"
16 #include "kv_delegate.h"
17 
18 #include "datashare_errno.h"
19 #include "directory/directory_manager.h"
20 #include "grd_base/grd_error.h"
21 #include "grd_document/grd_document_api.h"
22 #include "ipc_skeleton.h"
23 #include "log_print.h"
24 
25 namespace OHOS::DataShare {
26 constexpr int WAIT_TIME = 30;
Upsert(const std::string & collectionName,const std::string & filter,const std::string & value)27 int64_t KvDelegate::Upsert(const std::string &collectionName, const std::string &filter, const std::string &value)
28 {
29     std::lock_guard<decltype(mutex_)> lock(mutex_);
30     if (!Init()) {
31         ZLOGE("init failed, %{public}s", collectionName.c_str());
32         return E_ERROR;
33     }
34     int count = GRD_UpsertDoc(db_, collectionName.c_str(), filter.c_str(), value.c_str(), 0);
35     if (count <= 0) {
36         ZLOGE("GRD_UpSertDoc failed,status %{public}d", count);
37         return count;
38     }
39     Flush();
40     return E_OK;
41 }
42 
Delete(const std::string & collectionName,const std::string & filter)43 int32_t KvDelegate::Delete(const std::string &collectionName, const std::string &filter)
44 {
45     std::lock_guard<decltype(mutex_)> lock(mutex_);
46     if (!Init()) {
47         ZLOGE("init failed, %{public}s", collectionName.c_str());
48         return E_ERROR;
49     }
50     std::vector<std::string> queryResults;
51 
52     int32_t status = GetBatch(collectionName, filter, "{\"id_\": true}", queryResults);
53     if (status != E_OK) {
54         ZLOGE("db GetBatch failed, %{public}s %{public}d", filter.c_str(), status);
55         return status;
56     }
57     for (auto &result : queryResults) {
58         auto count = GRD_DeleteDoc(db_, collectionName.c_str(), result.c_str(), 0);
59         if (count < 0) {
60             ZLOGE("GRD_UpSertDoc failed,status %{public}d %{public}s", count, result.c_str());
61             continue;
62         }
63     }
64     Flush();
65     if (queryResults.size() > 0) {
66         ZLOGI("Delete, %{public}s, count %{public}zu", collectionName.c_str(), queryResults.size());
67     }
68     return E_OK;
69 }
70 
Init()71 bool KvDelegate::Init()
72 {
73     if (isInitDone_) {
74         if (executors_ != nullptr) {
75             executors_->Reset(taskId_, std::chrono::seconds(WAIT_TIME));
76         }
77         return true;
78     }
79     int status = GRD_DBOpen(
80         (path_ + "/dataShare.db").c_str(), nullptr, GRD_DB_OPEN_CREATE | GRD_DB_OPEN_CHECK_FOR_ABNORMAL, &db_);
81     if (status != GRD_OK || db_ == nullptr) {
82         ZLOGE("GRD_DBOpen failed,status %{public}d", status);
83         return false;
84     }
85     if (executors_ != nullptr) {
86         taskId_ = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this]() {
87             std::lock_guard<decltype(mutex_)> lock(mutex_);
88             GRD_DBClose(db_, GRD_DB_CLOSE);
89             db_ = nullptr;
90             isInitDone_ = false;
91             taskId_ = ExecutorPool::INVALID_TASK_ID;
92         });
93     }
94     status = GRD_CreateCollection(db_, TEMPLATE_TABLE, nullptr, 0);
95     if (status != GRD_OK) {
96         ZLOGE("GRD_CreateCollection template table failed,status %{public}d", status);
97         return false;
98     }
99 
100     status = GRD_CreateCollection(db_, DATA_TABLE, nullptr, 0);
101     if (status != GRD_OK) {
102         ZLOGE("GRD_CreateCollection data table failed,status %{public}d", status);
103         return false;
104     }
105     isInitDone_ = true;
106     return true;
107 }
108 
~KvDelegate()109 KvDelegate::~KvDelegate()
110 {
111     std::lock_guard<decltype(mutex_)> lock(mutex_);
112     if (isInitDone_) {
113         int status = GRD_DBClose(db_, 0);
114         if (status != GRD_OK) {
115             ZLOGE("GRD_DBClose failed,status %{public}d", status);
116         }
117     }
118 }
119 
Upsert(const std::string & collectionName,const KvData & value)120 int32_t KvDelegate::Upsert(const std::string &collectionName, const KvData &value)
121 {
122     std::string id = value.GetId();
123     if (value.HasVersion() && value.GetVersion() != 0) {
124         int version = -1;
125         if (GetVersion(collectionName, id, version)) {
126             if (value.GetVersion() <= version) {
127                 ZLOGE("GetVersion failed,%{public}s id %{private}s %{public}d %{public}d", collectionName.c_str(),
128                       id.c_str(), value.GetVersion(), version);
129                 return E_VERSION_NOT_NEWER;
130             }
131         }
132     }
133     return Upsert(collectionName, id, value.GetValue());
134 }
135 
Get(const std::string & collectionName,const Id & id,std::string & value)136 int32_t KvDelegate::Get(const std::string &collectionName, const Id &id, std::string &value)
137 {
138     std::string filter = DistributedData::Serializable::Marshall(id);
139     if (Get(collectionName, filter, "{}", value) != E_OK) {
140         ZLOGE("Get failed, %{public}s %{public}s", collectionName.c_str(), filter.c_str());
141         return E_ERROR;
142     }
143     return E_OK;
144 }
145 
GetVersion(const std::string & collectionName,const std::string & filter,int & version)146 bool KvDelegate::GetVersion(const std::string &collectionName, const std::string &filter, int &version)
147 {
148     std::string value;
149     if (Get(collectionName, filter, "{}", value) != E_OK) {
150         ZLOGE("Get failed, %{public}s %{public}s", collectionName.c_str(), filter.c_str());
151         return false;
152     }
153     VersionData data(-1);
154     if (!DistributedData::Serializable::Unmarshall(value, data)) {
155         ZLOGE("Unmarshall failed,data %{public}s", value.c_str());
156         return false;
157     }
158     version = data.GetVersion();
159     return true;
160 }
161 
Get(const std::string & collectionName,const std::string & filter,const std::string & projection,std::string & result)162 int32_t KvDelegate::Get(
163     const std::string &collectionName, const std::string &filter, const std::string &projection, std::string &result)
164 {
165     std::lock_guard<decltype(mutex_)> lock(mutex_);
166     if (!Init()) {
167         ZLOGE("init failed, %{public}s", collectionName.c_str());
168         return E_ERROR;
169     }
170     Query query;
171     query.filter = filter.c_str();
172     query.projection = projection.c_str();
173     GRD_ResultSet *resultSet = nullptr;
174     int status = GRD_FindDoc(db_, collectionName.c_str(), query, 0, &resultSet);
175     if (status != GRD_OK || resultSet == nullptr) {
176         ZLOGE("GRD_FindDoc failed,status %{public}d", status);
177         return status;
178     }
179     status = GRD_Next(resultSet);
180     if (status != GRD_OK) {
181         GRD_FreeResultSet(resultSet);
182         ZLOGE("GRD_Next failed,status %{public}d", status);
183         return status;
184     }
185     char *value = nullptr;
186     status = GRD_GetValue(resultSet, &value);
187     if (status != GRD_OK || value == nullptr) {
188         GRD_FreeResultSet(resultSet);
189         ZLOGE("GRD_GetValue failed,status %{public}d", status);
190         return status;
191     }
192     result = value;
193     GRD_FreeValue(value);
194     GRD_FreeResultSet(resultSet);
195     return E_OK;
196 }
197 
Flush()198 void KvDelegate::Flush()
199 {
200     int status = GRD_Flush(db_, GRD_DB_FLUSH_ASYNC);
201     if (status != GRD_OK) {
202         ZLOGE("GRD_Flush failed,status %{public}d", status);
203     }
204 }
205 
GetBatch(const std::string & collectionName,const std::string & filter,const std::string & projection,std::vector<std::string> & result)206 int32_t KvDelegate::GetBatch(const std::string &collectionName, const std::string &filter,
207     const std::string &projection, std::vector<std::string> &result)
208 {
209     std::lock_guard<decltype(mutex_)> lock(mutex_);
210     if (!Init()) {
211         ZLOGE("init failed, %{public}s", collectionName.c_str());
212         return E_ERROR;
213     }
214     Query query;
215     query.filter = filter.c_str();
216     query.projection = projection.c_str();
217     GRD_ResultSet *resultSet;
218     int status = GRD_FindDoc(db_, collectionName.c_str(), query, GRD_DOC_ID_DISPLAY, &resultSet);
219     if (status != GRD_OK || resultSet == nullptr) {
220         ZLOGE("GRD_UpSertDoc failed,status %{public}d", status);
221         return status;
222     }
223     char *value = nullptr;
224     while (GRD_Next(resultSet) == GRD_OK) {
225         status = GRD_GetValue(resultSet, &value);
226         if (status != GRD_OK || value == nullptr) {
227             GRD_FreeResultSet(resultSet);
228             ZLOGE("GRD_GetValue failed,status %{public}d", status);
229             return status;
230         }
231         result.emplace_back(value);
232         GRD_FreeValue(value);
233     }
234     GRD_FreeResultSet(resultSet);
235     return E_OK;
236 }
237 
KvDelegate(const std::string & path,const std::shared_ptr<ExecutorPool> & executors)238 KvDelegate::KvDelegate(const std::string &path, const std::shared_ptr<ExecutorPool> &executors)
239     : path_(path), executors_(executors)
240 {
241 }
242 } // namespace OHOS::DataShare