1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "watcher_manager.h"
16 #include <fcntl.h>
17 #include <sys/socket.h>
18 #include <sys/stat.h>
19 #include <sys/time.h>
20 #include <sys/types.h>
21 #include <thread>
22
23 #include "param_message.h"
24 #include "sys_param.h"
25 #include "system_ability_definition.h"
26 #include "watcher_utils.h"
27
28 namespace OHOS {
29 namespace init_param {
30 REGISTER_SYSTEM_ABILITY_BY_ID(WatcherManager, PARAM_WATCHER_DISTRIBUTED_SERVICE_ID, true)
31
32 const static int32_t INVALID_SOCKET = -1;
~WatcherManager()33 WatcherManager::~WatcherManager()
34 {
35 watchers_.clear();
36 watcherGroups_.clear();
37 groupMap_.clear();
38 }
39
AddWatcher(const std::string & keyPrefix,const sptr<IWatcher> & watcher)40 uint32_t WatcherManager::AddWatcher(const std::string &keyPrefix, const sptr<IWatcher> &watcher)
41 {
42 WATCHER_CHECK(watcher != nullptr && deathRecipient_ != nullptr,
43 return 0, "Invalid remove watcher for %s", keyPrefix.c_str());
44 sptr<IRemoteObject> object = watcher->AsObject();
45 if ((object != nullptr) && (object->IsProxyObject())) {
46 WATCHER_CHECK(object->AddDeathRecipient(deathRecipient_),
47 return 0, "Failed to add death recipient %s", keyPrefix.c_str());
48 }
49
50 // check watcher id
51 uint32_t watcherId = 0;
52 int ret = GetWatcherId(watcherId);
53 WATCHER_CHECK(ret == 0, return 0, "Failed to get watcher id for %s", keyPrefix.c_str());
54
55 WATCHER_LOGV("AddWatcher prefix %s watcherId %u", keyPrefix.c_str(), watcherId);
56 bool newGroup = false;
57 WatcherGroupPtr group = GetWatcherGroup(keyPrefix);
58 if (group == nullptr) {
59 newGroup = true;
60 uint32_t groupId = 0;
61 ret = GetGroupId(groupId);
62 WATCHER_CHECK(ret == 0, return 0, "Failed to get group id for %s", keyPrefix.c_str());
63 group = std::make_shared<WatcherGroup>(groupId, keyPrefix);
64 WATCHER_CHECK(group != nullptr, return 0, "Failed to create group for %s", keyPrefix.c_str());
65 } else {
66 newGroup = group->Emptry();
67 }
68 ParamWatcherPtr paramWather = std::make_shared<ParamWatcher>(watcherId, watcher, group);
69 WATCHER_CHECK(paramWather != nullptr, return 0, "Failed to create watcher for %s", keyPrefix.c_str());
70 AddParamWatcher(keyPrefix, group, paramWather);
71 if (newGroup) {
72 StartLoop();
73 SendMessage(group, MSG_ADD_WATCHER);
74 }
75 SendLocalChange(keyPrefix, paramWather);
76 WATCHER_LOGI("AddWatcher %s watcherId: %u success", keyPrefix.c_str(), watcherId);
77 return watcherId;
78 }
79
DelWatcher(const std::string & keyPrefix,uint32_t watcherId)80 int32_t WatcherManager::DelWatcher(const std::string &keyPrefix, uint32_t watcherId)
81 {
82 auto group = GetWatcherGroup(keyPrefix);
83 WATCHER_CHECK(group != nullptr, return 0, "Can not find group %s", keyPrefix.c_str());
84 auto paramWatcher = GetWatcher(watcherId);
85 WATCHER_CHECK(group != nullptr, return 0, "Can not find watcher %s %d", keyPrefix.c_str(), watcherId);
86 WATCHER_LOGV("DelWatcher prefix %s watcherId %u", keyPrefix.c_str(), watcherId);
87 return DelWatcher(group, paramWatcher);
88 }
89
DelWatcher(WatcherGroupPtr group,ParamWatcherPtr watcher)90 int32_t WatcherManager::DelWatcher(WatcherGroupPtr group, ParamWatcherPtr watcher)
91 {
92 WATCHER_CHECK(watcher != nullptr && group != nullptr, return 0, "Invalid watcher");
93 sptr<IRemoteObject> object = watcher->GetRemoteWatcher()->AsObject();
94 if (object != nullptr) {
95 object->RemoveDeathRecipient(deathRecipient_);
96 }
97 WATCHER_LOGI("DelWatcher watcherId %u prefix %s", watcher->GetWatcherId(), group->GetKeyPrefix().c_str());
98 DelParamWatcher(watcher);
99 if (group->Emptry()) {
100 SendMessage(group, MSG_DEL_WATCHER);
101 DelWatcherGroup(group);
102 }
103 return 0;
104 }
105
SendMessage(WatcherGroupPtr group,int type)106 int WatcherManager::SendMessage(WatcherGroupPtr group, int type)
107 {
108 ParamMessage *request = nullptr;
109 request = (ParamMessage *)CreateParamMessage(type, group->GetKeyPrefix().c_str(), sizeof(ParamMessage));
110 WATCHER_CHECK(request != NULL, return PARAM_CODE_ERROR, "Failed to malloc for watch");
111 request->id.watcherId = group->GetGroupId();
112 request->msgSize = sizeof(ParamMessage);
113
114 WATCHER_LOGV("SendMessage %s", group->GetKeyPrefix().c_str());
115 int ret = PARAM_CODE_ERROR;
116 int fd = GetServerFd(false);
117 if (fd != INVALID_SOCKET) {
118 ssize_t sendLen = send(serverFd_, (char *)request, request->msgSize, 0);
119 ret = (sendLen > 0) ? 0 : PARAM_CODE_ERROR;
120 }
121 free(request);
122 WATCHER_CHECK(ret == 0, return ret, "SendMessage key: %s %d fail", group->GetKeyPrefix().c_str(), type);
123 return 0;
124 }
125
ProcessWatcherMessage(const std::vector<char> & buffer,uint32_t dataSize)126 void WatcherManager::ProcessWatcherMessage(const std::vector<char> &buffer, uint32_t dataSize)
127 {
128 ParamMessage *msg = (ParamMessage *)buffer.data();
129 WATCHER_CHECK(msg != NULL, return, "Invalid msg");
130 WATCHER_LOGV("ProcessWatcherMessage %d", msg->type);
131 uint32_t offset = 0;
132 if (msg->type != MSG_NOTIFY_PARAM) {
133 return;
134 }
135 WATCHER_CHECK(msg->msgSize <= dataSize, return, "Invalid msg size %d", msg->msgSize);
136 ParamMsgContent *valueContent = GetNextContent((const ParamMessage *)msg, &offset);
137 WATCHER_CHECK(valueContent != NULL, return, "Invalid msg ");
138 WATCHER_LOGV("ProcessWatcherMessage name %s watcherId %u ", msg->key, msg->id.watcherId);
139 WatcherGroupPtr group = GetWatcherGroup(msg->id.watcherId);
140 if (group != nullptr) {
141 std::lock_guard<std::mutex> lock(watcherMutex_);
142 group->ProcessParameterChange(msg->key, valueContent->content);
143 }
144 }
145
GetWatcherGroup(uint32_t groupId)146 WatcherManager::WatcherGroupPtr WatcherManager::GetWatcherGroup(uint32_t groupId)
147 {
148 std::lock_guard<std::mutex> lock(watcherMutex_);
149 if (watcherGroups_.find(groupId) != watcherGroups_.end()) {
150 return watcherGroups_[groupId];
151 }
152 return nullptr;
153 }
154
GetWatcherGroup(const std::string & keyPrefix)155 WatcherManager::WatcherGroupPtr WatcherManager::GetWatcherGroup(const std::string &keyPrefix)
156 {
157 std::lock_guard<std::mutex> lock(watcherMutex_);
158 if (groupMap_.find(keyPrefix) == groupMap_.end()) {
159 return nullptr;
160 }
161 uint32_t groupId = groupMap_[keyPrefix];
162 if (watcherGroups_.find(groupId) != watcherGroups_.end()) {
163 return watcherGroups_[groupId];
164 }
165 return nullptr;
166 }
167
AddParamWatcher(const std::string & keyPrefix,WatcherGroupPtr group,ParamWatcherPtr watcher)168 void WatcherManager::AddParamWatcher(const std::string &keyPrefix, WatcherGroupPtr group, ParamWatcherPtr watcher)
169 {
170 WATCHER_LOGV("AddParamWatcher prefix %s watcherId %u", keyPrefix.c_str(), watcher->GetWatcherId());
171 uint32_t groupId = group->GetGroupId();
172 std::lock_guard<std::mutex> lock(watcherMutex_);
173 groupMap_[keyPrefix] = groupId;
174 watchers_[watcher->GetWatcherId()] = watcher;
175 ListAddTail(group->GetWatchers(), watcher->GetGroupNode());
176
177 if (watcherGroups_.find(groupId) != watcherGroups_.end()) {
178 return;
179 }
180 watcherGroups_[groupId] = group;
181 }
182
DelParamWatcher(ParamWatcherPtr watcher)183 void WatcherManager::DelParamWatcher(ParamWatcherPtr watcher)
184 {
185 std::lock_guard<std::mutex> lock(watcherMutex_);
186 ListRemove(watcher->GetGroupNode());
187 ListInit(watcher->GetGroupNode());
188 watchers_.erase(watcher->GetWatcherId());
189 WATCHER_LOGV("DelParamWatcher watcherId %u", watcher->GetWatcherId());
190 }
191
DelWatcherGroup(WatcherGroupPtr group)192 void WatcherManager::DelWatcherGroup(WatcherGroupPtr group)
193 {
194 std::lock_guard<std::mutex> lock(watcherMutex_);
195 groupMap_.erase(group->GetKeyPrefix());
196 watcherGroups_.erase(group->GetGroupId());
197 }
198
ProcessParameterChange(const std::string & name,const std::string & value)199 void WatcherManager::WatcherGroup::ProcessParameterChange(const std::string &name, const std::string &value)
200 {
201 // walk watcher
202 ListNode *node = nullptr;
203 ForEachListEntry(&watchers_, node) {
204 ParamWatcher *watcher = (ParamWatcher *)node;
205 watcher->ProcessParameterChange(name, value);
206 }
207 }
208
FilterParam(const char * name,const std::string & keyPrefix)209 static int FilterParam(const char *name, const std::string &keyPrefix)
210 {
211 if (keyPrefix.rfind("*") == keyPrefix.length() - 1) {
212 return strncmp(name, keyPrefix.c_str(), keyPrefix.length() - 1) == 0;
213 }
214 return strcmp(name, keyPrefix.c_str()) == 0;
215 }
216
SendLocalChange(const std::string & keyPrefix,ParamWatcherPtr watcher)217 void WatcherManager::SendLocalChange(const std::string &keyPrefix, ParamWatcherPtr watcher)
218 {
219 struct Context {
220 char *buffer;
221 ParamWatcherPtr watcher;
222 std::string keyPrefix;
223 };
224 WATCHER_LOGV("SendLocalChange key %s ", keyPrefix.c_str());
225 std::vector<char> buffer(PARAM_NAME_LEN_MAX + PARAM_CONST_VALUE_LEN_MAX);
226 struct Context context = {buffer.data(), watcher, keyPrefix};
227 // walk watcher
228 SystemTraversalParameter("", [](ParamHandle handle, void *cookie) {
229 struct Context *context = (struct Context *)(cookie);
230 SystemGetParameterName(handle, context->buffer, PARAM_NAME_LEN_MAX);
231 if (!FilterParam(context->buffer, context->keyPrefix)) {
232 return;
233 }
234 uint32_t size = PARAM_CONST_VALUE_LEN_MAX;
235 SystemGetParameterValue(handle, context->buffer + PARAM_NAME_LEN_MAX, &size);
236 WATCHER_LOGV("SendLocalChange key %s value: %s ", context->buffer, context->buffer + PARAM_NAME_LEN_MAX);
237 context->watcher->ProcessParameterChange(context->buffer, context->buffer + PARAM_NAME_LEN_MAX);
238 }, (void *)&context);
239 }
240
RunLoop()241 void WatcherManager::RunLoop()
242 {
243 const int32_t RECV_BUFFER_MAX = 5 * 1024;
244 std::vector<char> buffer(RECV_BUFFER_MAX, 0);
245 bool retry = false;
246 ssize_t recvLen = 0;
247 while (!stop_) {
248 int fd = GetServerFd(retry);
249 if (fd >= 0) {
250 recvLen = recv(fd, buffer.data(), RECV_BUFFER_MAX, 0);
251 }
252 if (stop_) {
253 break;
254 }
255 if (recvLen <= 0) {
256 if (errno == EAGAIN) { // timeout
257 continue;
258 }
259 PARAM_LOGE("Failed to recv msg from server errno %d", errno);
260 retry = true; // re connect
261 } else if (recvLen >= (ssize_t)sizeof(ParamMessage)) {
262 ProcessWatcherMessage(buffer, recvLen);
263 }
264 }
265 if (serverFd_ > 0) {
266 close(serverFd_);
267 serverFd_ = INVALID_SOCKET;
268 }
269 WATCHER_LOGV("Exit runLoop");
270 }
271
StartLoop()272 void WatcherManager::StartLoop()
273 {
274 if (pRecvThread_ == nullptr) {
275 pRecvThread_ = new (std::nothrow)std::thread(&WatcherManager::RunLoop, this);
276 WATCHER_CHECK(pRecvThread_ != nullptr, return, "Failed to create thread");
277 }
278 }
279
GetServerFd(bool retry)280 int WatcherManager::GetServerFd(bool retry)
281 {
282 const int32_t sleepTime = 200;
283 const int32_t maxRetry = 10;
284 std::lock_guard<std::mutex> lock(mutex_);
285 if (retry && serverFd_ != INVALID_SOCKET) {
286 close(serverFd_);
287 serverFd_ = INVALID_SOCKET;
288 }
289 if (serverFd_ != INVALID_SOCKET) {
290 return serverFd_;
291 }
292 int ret = 0;
293 int32_t retryCount = 0;
294 do {
295 serverFd_ = socket(PF_UNIX, SOCK_STREAM, 0);
296 int flags = fcntl(serverFd_, F_GETFL, 0);
297 (void)fcntl(serverFd_, F_SETFL, flags & ~O_NONBLOCK);
298 ret = ConntectServer(serverFd_, CLIENT_PIPE_NAME);
299 if (ret == 0) {
300 break;
301 }
302 close(serverFd_);
303 serverFd_ = INVALID_SOCKET;
304 usleep(sleepTime);
305 retryCount++;
306 } while (retryCount < maxRetry);
307 WATCHER_LOGV("GetServerFd serverFd_ %d retryCount %d ", serverFd_, retryCount);
308 return serverFd_;
309 }
310
OnStart()311 void WatcherManager::OnStart()
312 {
313 WATCHER_LOGI("WatcherManager OnStart");
314 bool res = Publish(this);
315 if (!res) {
316 WATCHER_LOGE("WatcherManager Publish failed");
317 }
318 if (deathRecipient_ == nullptr) {
319 deathRecipient_ = new DeathRecipient(this);
320 }
321 return;
322 }
323
StopLoop()324 void WatcherManager::StopLoop()
325 {
326 WATCHER_LOGV("WatcherManager StopLoop serverFd_ %d", serverFd_);
327 stop_ = true;
328 if (serverFd_ > 0) {
329 shutdown(serverFd_, SHUT_RDWR);
330 close(serverFd_);
331 serverFd_ = INVALID_SOCKET;
332 }
333 if (pRecvThread_ != nullptr) {
334 pRecvThread_->join();
335 delete pRecvThread_;
336 pRecvThread_ = nullptr;
337 }
338 }
339
OnStop()340 void WatcherManager::OnStop()
341 {
342 {
343 std::lock_guard<std::mutex> lock(watcherMutex_);
344 for (auto iter = watchers_.begin(); iter != watchers_.end(); ++iter) {
345 if (iter->second == nullptr) {
346 continue;
347 }
348 sptr<IRemoteObject> object = iter->second->GetRemoteWatcher()->AsObject();
349 if (object != nullptr) {
350 object->RemoveDeathRecipient(deathRecipient_);
351 }
352 }
353 }
354 watchers_.clear();
355 watcherGroups_.clear();
356 groupMap_.clear();
357 StopLoop();
358 }
359
OnRemoteDied(const wptr<IRemoteObject> & remote)360 void WatcherManager::DeathRecipient::OnRemoteDied(const wptr<IRemoteObject> &remote)
361 {
362 WATCHER_CHECK(remote != nullptr, return, "Invalid remote obj");
363 auto paramWatcher = manager_->GetWatcher(remote);
364 WATCHER_CHECK(paramWatcher != nullptr, return, "Failed to get remote watcher info ");
365 WATCHER_LOGV("OnRemoteDied watcherId %u", paramWatcher->GetWatcherId());
366 manager_->DelWatcher(paramWatcher->GetWatcherGroup(), paramWatcher);
367 }
368
GetWatcher(uint32_t watcherId)369 WatcherManager::ParamWatcherPtr WatcherManager::GetWatcher(uint32_t watcherId)
370 {
371 std::lock_guard<std::mutex> lock(watcherMutex_);
372 auto iter = watchers_.find(watcherId);
373 if (iter != watchers_.end()) {
374 return iter->second;
375 }
376 return nullptr;
377 }
378
GetWatcher(const wptr<IRemoteObject> & remote)379 WatcherManager::ParamWatcherPtr WatcherManager::GetWatcher(const wptr<IRemoteObject> &remote)
380 {
381 std::lock_guard<std::mutex> lock(watcherMutex_);
382 for (auto iter = watchers_.begin(); iter != watchers_.end(); ++iter) {
383 if (iter->second == nullptr) {
384 continue;
385 }
386 if (remote == iter->second->GetRemoteWatcher()->AsObject()) {
387 return iter->second;
388 }
389 }
390 return nullptr;
391 }
392
GetWatcherId(uint32_t & watcherId)393 int WatcherManager::GetWatcherId(uint32_t &watcherId)
394 {
395 std::lock_guard<std::mutex> lock(watcherMutex_);
396 watcherId = watcherId_;
397 do {
398 watcherId_++;
399 if (watcherId_ == 0) {
400 watcherId_++;
401 }
402 if (watchers_.find(watcherId_) == watchers_.end()) {
403 break;
404 }
405 WATCHER_CHECK(watcherId_ != watcherId, return -1, "No enough watcherId %u", watcherId);
406 } while (1);
407 watcherId = watcherId_;
408 return 0;
409 }
410
GetGroupId(uint32_t & groupId)411 int WatcherManager::GetGroupId(uint32_t &groupId)
412 {
413 std::lock_guard<std::mutex> lock(watcherMutex_);
414 groupId = groupId_;
415 do {
416 groupId_++;
417 if (watcherGroups_.find(groupId_) == watcherGroups_.end()) {
418 break;
419 }
420 WATCHER_CHECK(groupId_ == groupId, return -1, "No enough groupId %u", groupId);
421 } while (1);
422 groupId = groupId_;
423 return 0;
424 }
425 } // namespace init_param
426 } // namespace OHOS
427