• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ObserverBridge"
16 #include "observer_bridge.h"
17 #include "kvdb_service_client.h"
18 #include "kvstore_observer_client.h"
19 #include "log_print.h"
20 namespace OHOS::DistributedKv {
21 constexpr uint32_t INVALID_SUBSCRIBE_TYPE = 0;
ObserverBridge(AppId appId,StoreId storeId,int32_t subUser,std::shared_ptr<Observer> observer,const Convertor & cvt)22 ObserverBridge::ObserverBridge(AppId appId, StoreId storeId, int32_t subUser, std::shared_ptr<Observer> observer,
23     const Convertor &cvt) : appId_(std::move(appId)), storeId_(std::move(storeId)), subUser_(subUser),
24     observer_(std::move(observer)), convert_(cvt)
25 {
26 }
27 
~ObserverBridge()28 ObserverBridge::~ObserverBridge()
29 {
30     if (remote_ == nullptr) {
31         return;
32     }
33     auto service = KVDBServiceClient::GetInstance();
34     if (service == nullptr) {
35         return;
36     }
37     service->Unsubscribe(appId_, storeId_, subUser_, remote_);
38 }
39 
RegisterRemoteObserver(uint32_t realType)40 Status ObserverBridge::RegisterRemoteObserver(uint32_t realType)
41 {
42     std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
43     if (remote_ != nullptr) {
44         remote_->realType_ |= realType;
45         return SUCCESS;
46     }
47 
48     auto service = KVDBServiceClient::GetInstance();
49     if (service == nullptr) {
50         return SERVER_UNAVAILABLE;
51     }
52 
53     remote_ = new (std::nothrow) ObserverClient(observer_, convert_);
54     if (remote_ == nullptr) {
55         ZLOGE("New ObserverClient failed, appId:%{public}s", appId_.appId.c_str());
56         return ERROR;
57     }
58     auto status = service->Subscribe(appId_, storeId_, subUser_, remote_);
59     if (status != SUCCESS) {
60         remote_ = nullptr;
61     } else {
62         remote_->realType_ = realType;
63     }
64     return status;
65 }
66 
UnregisterRemoteObserver(uint32_t realType)67 Status ObserverBridge::UnregisterRemoteObserver(uint32_t realType)
68 {
69     std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
70     if (remote_ == nullptr) {
71         return SUCCESS;
72     }
73 
74     auto service = KVDBServiceClient::GetInstance();
75     if (service == nullptr) {
76         return SERVER_UNAVAILABLE;
77     }
78 
79     Status status = Status::SUCCESS;
80     remote_->realType_ &= ~SUBSCRIBE_TYPE_LOCAL;
81     remote_->realType_ &= ~realType;
82     if (remote_->realType_ == 0) {
83         status = service->Unsubscribe(appId_, storeId_, subUser_, remote_);
84         remote_ = nullptr;
85     }
86     return status;
87 }
88 
OnChange(const DBChangedData & data)89 void ObserverBridge::OnChange(const DBChangedData &data)
90 {
91     std::string deviceId;
92     auto inserted = ConvertDB(data.GetEntriesInserted(), deviceId, convert_);
93     auto updated = ConvertDB(data.GetEntriesUpdated(), deviceId, convert_);
94     auto deleted = ConvertDB(data.GetEntriesDeleted(), deviceId, convert_);
95     ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
96     observer_->OnChange(notice);
97 }
98 
ObserverClient(std::shared_ptr<Observer> observer,const Convertor & cvt)99 ObserverBridge::ObserverClient::ObserverClient(std::shared_ptr<Observer> observer, const Convertor &cvt)
100     : KvStoreObserverClient(observer), convert_(cvt), realType_(INVALID_SUBSCRIBE_TYPE)
101 {
102 }
103 
OnChange(const ChangeNotification & data)104 void ObserverBridge::ObserverClient::OnChange(const ChangeNotification &data)
105 {
106     if ((realType_ & SUBSCRIBE_TYPE_REMOTE) != SUBSCRIBE_TYPE_REMOTE) {
107         return;
108     }
109     std::string deviceId;
110     auto inserted = ObserverBridge::ConvertDB(data.GetInsertEntries(), deviceId, convert_);
111     auto updated = ObserverBridge::ConvertDB(data.GetUpdateEntries(), deviceId, convert_);
112     auto deleted = ObserverBridge::ConvertDB(data.GetDeleteEntries(), deviceId, convert_);
113     ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
114     KvStoreObserverClient::OnChange(notice);
115 }
116 
OnChange(const DataOrigin & origin,Keys && keys)117 void ObserverBridge::ObserverClient::OnChange(const DataOrigin &origin, Keys &&keys)
118 {
119     if ((realType_ & SUBSCRIBE_TYPE_CLOUD) != SUBSCRIBE_TYPE_CLOUD) {
120         return;
121     }
122     KvStoreObserverClient::OnChange(origin, std::move(keys));
123 }
124 
125 template<class T>
ConvertDB(const T & dbEntries,std::string & deviceId,const Convertor & convert)126 std::vector<Entry> ObserverBridge::ConvertDB(const T &dbEntries, std::string &deviceId, const Convertor &convert)
127 {
128     std::vector<Entry> entries(dbEntries.size());
129     auto it = entries.begin();
130     for (const auto &dbEntry : dbEntries) {
131         Entry &entry = *it;
132         entry.key = convert.ToKey(DBKey(dbEntry.key), deviceId);
133         entry.value = dbEntry.value;
134         ++it;
135     }
136     return entries;
137 }
138 
OnServiceDeath()139 void ObserverBridge::OnServiceDeath()
140 {
141     std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
142     if (remote_ == nullptr) {
143         return;
144     }
145     remote_ = nullptr;
146 }
147 } // namespace OHOS::DistributedKv
148