1 /*
2 * Copyright (C) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "distributed_service.h"
16
17 #include <thread>
18 #include "notification_helper.h"
19 #include "distributed_client.h"
20 #include "request_box.h"
21 #include "state_box.h"
22 #include "in_process_call_wrapper.h"
23 #include "distributed_observer_service.h"
24 #include "os_account_manager.h"
25 #include "distributed_server.h"
26 #include "distributed_device_data.h"
27
28 namespace OHOS {
29 namespace Notification {
30
31 namespace {
32 static const int32_t MAX_CONNECTED_TYR = 5;
33 static const int32_t ADD_DEVICE_SLEEP_TIMES_MS = 1000; // 1s
34 static const uint64_t SYNC_TASK_DELAY = 7 * 1000 * 1000;
35 static const int32_t MAX_DATA_LENGTH = 7;
36 static const int32_t START_ANONYMOUS_INDEX = 5;
37 }
38
GetInstance()39 DistributedService& DistributedService::GetInstance()
40 {
41 static DistributedService distributedService;
42 return distributedService;
43 }
44
DistributedService()45 DistributedService::DistributedService()
46 {
47 serviceQueue_ = std::make_shared<ffrt::queue>("ans_distributed");
48 if (serviceQueue_ == nullptr) {
49 ANS_LOGW("ffrt create failed!");
50 return;
51 }
52 ANS_LOGI("Distributed service init successfully.");
53 }
54
InitService(const std::string & deviceId,uint16_t deviceType)55 int32_t DistributedService::InitService(const std::string &deviceId, uint16_t deviceType)
56 {
57 int32_t userId;
58 localDevice_.deviceId_ = deviceId;
59 localDevice_.deviceType_ = deviceType;
60 if (DistributedServer::GetInstance().InitServer(deviceId, deviceType) != 0) {
61 ANS_LOGI("Distributed service init server failed.");
62 return -1;
63 }
64 OberverService::GetInstance().Init(deviceType);
65 return 0;
66 }
67
DestoryService()68 void DistributedService::DestoryService()
69 {
70 if (serviceQueue_ == nullptr) {
71 ANS_LOGE("Check handler is null.");
72 return;
73 }
74 ffrt::task_handle handler = serviceQueue_->submit_h([&]() {
75 ANS_LOGI("Start destory service.");
76 DistributedClient::GetInstance().ReleaseClient();
77 DistributedServer::GetInstance().ReleaseServer();
78 OberverService::GetInstance().Destory();
79 for (auto& subscriberInfo : subscriberMap_) {
80 int32_t result = NotificationHelper::UnSubscribeNotification(subscriberInfo.second);
81 ANS_LOGI("UnSubscribe %{public}s %{public}d.", subscriberInfo.first.c_str(), result);
82 }
83 });
84 serviceQueue_->wait(handler);
85 }
86
SyncConnectedDevice(DistributedDeviceInfo device)87 void DistributedService::SyncConnectedDevice(DistributedDeviceInfo device)
88 {
89 auto iter = peerDevice_.find(device.deviceId_);
90 if (iter == peerDevice_.end()) {
91 ANS_LOGE("SyncConnectedDevice device is valid.");
92 return;
93 }
94 if (iter->second.connectedTry_ >= MAX_CONNECTED_TYR || iter->second.peerState_ != DeviceState::STATE_SYNC) {
95 ANS_LOGE("SyncConnectedDevice no need try %{public}d.", iter->second.connectedTry_);
96 return;
97 }
98 int32_t result = SyncDeviceMatch(device, MatchType::MATCH_SYN);
99 ANS_LOGI("SyncConnectedDevice try %{public}d %{public}d.", iter->second.connectedTry_, result);
100 iter->second.connectedTry_ = iter->second.connectedTry_ + 1;
101 if (result != 0) {
102 if (serviceQueue_ == nullptr) {
103 ANS_LOGE("Check handler is null.");
104 return;
105 }
106 serviceQueue_->submit_h([&, device]() { SyncConnectedDevice(device); },
107 ffrt::task_attr().name("sync").delay(SYNC_TASK_DELAY));
108 } else {
109 iter->second.connectedTry_ = 0;
110 }
111 }
112
AddDevice(DistributedDeviceInfo device)113 void DistributedService::AddDevice(DistributedDeviceInfo device)
114 {
115 if (serviceQueue_ == nullptr) {
116 ANS_LOGE("Check handler is null.");
117 return;
118 }
119 serviceQueue_->submit_h([&, device]() {
120 ANS_LOGI("Dans AddDevice %{public}s %{public}d %{public}s %{public}d.",
121 StringAnonymous(device.deviceId_).c_str(), device.deviceType_,
122 StringAnonymous(localDevice_.deviceId_).c_str(), localDevice_.deviceType_);
123 DistributedDeviceInfo deviceItem = device;
124 deviceItem.peerState_ = DeviceState::STATE_SYNC;
125 peerDevice_[deviceItem.deviceId_] = deviceItem;
126 // Delay linking to avoid bind failure, There is a delay in reporting the device online
127 auto sleepTime = std::chrono::milliseconds(ADD_DEVICE_SLEEP_TIMES_MS);
128 std::this_thread::sleep_for(sleepTime);
129 SyncConnectedDevice(device);
130 });
131 }
132
OnHandleMsg(std::shared_ptr<TlvBox> & box)133 void DistributedService::OnHandleMsg(std::shared_ptr<TlvBox>& box)
134 {
135 if (serviceQueue_ == nullptr || box == nullptr) {
136 ANS_LOGE("Check handler is null.");
137 return;
138 }
139 std::function<void()> task = std::bind([&, box]() {
140 int32_t type;
141 if (!box->GetMessageType(type)) {
142 ANS_LOGW("Dans invalid message type failed.");
143 return;
144 }
145 ANS_LOGI("Dans handle message type %{public}d.", type);
146 switch (type) {
147 case NotificationEventType::PUBLISH_NOTIFICATION:
148 PublishNotifictaion(box);
149 break;
150 case NotificationEventType::NOTIFICATION_STATE_SYNC:
151 HandleDeviceState(box);
152 break;
153 case NotificationEventType::NOTIFICATION_MATCH_SYNC:
154 HandleMatchSync(box);
155 break;
156 case NotificationEventType::REMOVE_NOTIFICATION:
157 RemoveNotification(box);
158 break;
159 case NotificationEventType::REMOVE_ALL_NOTIFICATIONS:
160 RemoveNotifications(box);
161 break;
162 case NotificationEventType::BUNDLE_ICON_SYNC:
163 HandleBundleIconSync(box);
164 break;
165 case NotificationEventType::SYNC_NOTIFICATION:
166 HandleNotificationSync(box);
167 break;
168 case NotificationEventType::NOTIFICATION_RESPONSE_SYNC:
169 case NotificationEventType::NOTIFICATION_RESPONSE_REPLY_SYNC:
170 HandleResponseSync(box);
171 break;
172 default:
173 ANS_LOGW("Dans receive msg %{public}d %{public}d.", type, box->bytesLength_);
174 break;
175 }
176 });
177 serviceQueue_->submit(task);
178 }
179
OnReceiveMsg(const void * data,uint32_t dataLen)180 void DistributedService::OnReceiveMsg(const void *data, uint32_t dataLen)
181 {
182 if (!TlvBox::CheckMessageCRC((const unsigned char*)data, dataLen)) {
183 ANS_LOGW("Dans check message crc failed.");
184 return;
185 }
186 std::shared_ptr<TlvBox> box = std::make_shared<TlvBox>();
187 if (!box->Parse((const unsigned char*)data, dataLen - sizeof(uint32_t))) {
188 ANS_LOGW("Dans parse message failed.");
189 return;
190 }
191 OnHandleMsg(box);
192 }
193
GetCurrentTime()194 int64_t DistributedService::GetCurrentTime()
195 {
196 auto now = std::chrono::system_clock::now();
197 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch());
198 return duration.count();
199 }
200
SendEventReport(int32_t messageType,int32_t errCode,const std::string & errorReason)201 void DistributedService::SendEventReport(
202 int32_t messageType, int32_t errCode, const std::string& errorReason)
203 {
204 if (sendReportCallback_ != nullptr ||
205 localDevice_.deviceType_ != DistributedHardware::DmDeviceType::DEVICE_TYPE_PHONE) {
206 sendReportCallback_(messageType, errCode, errorReason);
207 }
208 }
209
InitHACallBack(std::function<void (int32_t,int32_t,uint32_t,std::string)> callback)210 void DistributedService::InitHACallBack(
211 std::function<void(int32_t, int32_t, uint32_t, std::string)> callback)
212 {
213 haCallback_ = callback;
214 }
215
InitSendReportCallBack(std::function<void (int32_t,int32_t,std::string)> callback)216 void DistributedService::InitSendReportCallBack(
217 std::function<void(int32_t, int32_t, std::string)> callback)
218 {
219 sendReportCallback_ = callback;
220 }
221
AnonymousProcessing(std::string data)222 std::string DistributedService::AnonymousProcessing(std::string data)
223 {
224 int32_t length = data.length();
225 if (length >= MAX_DATA_LENGTH) {
226 data.replace(START_ANONYMOUS_INDEX, length - 1, "**");
227 }
228 return data;
229 }
230
SendHaReport(int32_t errorCode,uint32_t branchId,const std::string & errorReason,int32_t code)231 void DistributedService::SendHaReport(
232 int32_t errorCode, uint32_t branchId, const std::string& errorReason, int32_t code)
233 {
234 if (haCallback_ == nullptr || localDevice_.deviceType_ != DistributedHardware::DmDeviceType::DEVICE_TYPE_PHONE) {
235 return;
236 }
237 if (code == -1) {
238 haCallback_(code_, errorCode, branchId, errorReason);
239 } else {
240 haCallback_(code, errorCode, branchId, errorReason);
241 }
242 }
243
244 }
245 }
246