• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "watcher_manager.h"
16 #include <fcntl.h>
17 #include <sys/socket.h>
18 #include <sys/stat.h>
19 #include <sys/time.h>
20 #include <sys/types.h>
21 #include <thread>
22 
23 #include "param_message.h"
24 #include "sys_param.h"
25 #include "system_ability_definition.h"
26 #include "watcher_utils.h"
27 
28 namespace OHOS {
29 namespace init_param {
30 REGISTER_SYSTEM_ABILITY_BY_ID(WatcherManager, PARAM_WATCHER_DISTRIBUTED_SERVICE_ID, true)
31 
32 const static int32_t INVALID_SOCKET = -1;
~WatcherManager()33 WatcherManager::~WatcherManager()
34 {
35     watchers_.clear();
36     watcherGroups_.clear();
37     groupMap_.clear();
38 }
39 
AddWatcher(const std::string & keyPrefix,const sptr<IWatcher> & watcher)40 uint32_t WatcherManager::AddWatcher(const std::string &keyPrefix, const sptr<IWatcher> &watcher)
41 {
42     WATCHER_CHECK(watcher != nullptr && deathRecipient_ != nullptr,
43         return 0, "Invalid remove watcher for %s", keyPrefix.c_str());
44     sptr<IRemoteObject> object = watcher->AsObject();
45     if ((object != nullptr) && (object->IsProxyObject())) {
46         WATCHER_CHECK(object->AddDeathRecipient(deathRecipient_),
47             return 0, "Failed to add death recipient %s", keyPrefix.c_str());
48     }
49 
50     // check watcher id
51     uint32_t watcherId = 0;
52     int ret = GetWatcherId(watcherId);
53     WATCHER_CHECK(ret == 0, return 0, "Failed to get watcher id for %s", keyPrefix.c_str());
54 
55     WATCHER_LOGV("AddWatcher prefix %s watcherId %u", keyPrefix.c_str(), watcherId);
56     bool newGroup = false;
57     WatcherGroupPtr group = GetWatcherGroup(keyPrefix);
58     if (group == nullptr) {
59         newGroup = true;
60         uint32_t groupId = 0;
61         ret = GetGroupId(groupId);
62         WATCHER_CHECK(ret == 0, return 0, "Failed to get group id for %s", keyPrefix.c_str());
63         group = std::make_shared<WatcherGroup>(groupId, keyPrefix);
64         WATCHER_CHECK(group != nullptr, return 0, "Failed to create group for %s", keyPrefix.c_str());
65     } else {
66         newGroup = group->Emptry();
67     }
68     ParamWatcherPtr paramWather = std::make_shared<ParamWatcher>(watcherId, watcher, group);
69     WATCHER_CHECK(paramWather != nullptr, return 0, "Failed to create watcher for %s", keyPrefix.c_str());
70     AddParamWatcher(keyPrefix, group, paramWather);
71     if (newGroup) {
72         StartLoop();
73         SendMessage(group, MSG_ADD_WATCHER);
74     }
75     SendLocalChange(keyPrefix, paramWather);
76     WATCHER_LOGI("AddWatcher %s watcherId: %u success", keyPrefix.c_str(), watcherId);
77     return watcherId;
78 }
79 
DelWatcher(const std::string & keyPrefix,uint32_t watcherId)80 int32_t WatcherManager::DelWatcher(const std::string &keyPrefix, uint32_t watcherId)
81 {
82     auto group = GetWatcherGroup(keyPrefix);
83     WATCHER_CHECK(group != nullptr, return 0, "Can not find group %s", keyPrefix.c_str());
84     auto paramWatcher = GetWatcher(watcherId);
85     WATCHER_CHECK(group != nullptr, return 0, "Can not find watcher %s %d", keyPrefix.c_str(), watcherId);
86     WATCHER_LOGV("DelWatcher prefix %s watcherId %u", keyPrefix.c_str(), watcherId);
87     return DelWatcher(group, paramWatcher);
88 }
89 
DelWatcher(WatcherGroupPtr group,ParamWatcherPtr watcher)90 int32_t WatcherManager::DelWatcher(WatcherGroupPtr group, ParamWatcherPtr watcher)
91 {
92     WATCHER_CHECK(watcher != nullptr && group != nullptr, return 0, "Invalid watcher");
93     sptr<IRemoteObject> object = watcher->GetRemoteWatcher()->AsObject();
94     if (object != nullptr) {
95         object->RemoveDeathRecipient(deathRecipient_);
96     }
97     WATCHER_LOGI("DelWatcher watcherId %u prefix %s", watcher->GetWatcherId(), group->GetKeyPrefix().c_str());
98     DelParamWatcher(watcher);
99     if (group->Emptry()) {
100         SendMessage(group, MSG_DEL_WATCHER);
101         DelWatcherGroup(group);
102     }
103     return 0;
104 }
105 
SendMessage(WatcherGroupPtr group,int type)106 int WatcherManager::SendMessage(WatcherGroupPtr group, int type)
107 {
108     ParamMessage *request = nullptr;
109     request = (ParamMessage *)CreateParamMessage(type, group->GetKeyPrefix().c_str(), sizeof(ParamMessage));
110     WATCHER_CHECK(request != NULL, return PARAM_CODE_ERROR, "Failed to malloc for watch");
111     request->id.watcherId = group->GetGroupId();
112     request->msgSize = sizeof(ParamMessage);
113 
114     WATCHER_LOGV("SendMessage %s", group->GetKeyPrefix().c_str());
115     int ret = PARAM_CODE_ERROR;
116     int fd = GetServerFd(false);
117     if (fd != INVALID_SOCKET) {
118         ssize_t sendLen = send(serverFd_, (char *)request, request->msgSize, 0);
119         ret = (sendLen > 0) ? 0 : PARAM_CODE_ERROR;
120     }
121     free(request);
122     WATCHER_CHECK(ret == 0, return ret, "SendMessage key: %s %d fail", group->GetKeyPrefix().c_str(), type);
123     return 0;
124 }
125 
ProcessWatcherMessage(const std::vector<char> & buffer,uint32_t dataSize)126 void WatcherManager::ProcessWatcherMessage(const std::vector<char> &buffer, uint32_t dataSize)
127 {
128     ParamMessage *msg = (ParamMessage *)buffer.data();
129     WATCHER_CHECK(msg != NULL, return, "Invalid msg");
130     WATCHER_LOGV("ProcessWatcherMessage %d", msg->type);
131     uint32_t offset = 0;
132     if (msg->type != MSG_NOTIFY_PARAM) {
133         return;
134     }
135     WATCHER_CHECK(msg->msgSize <= dataSize, return, "Invalid msg size %d", msg->msgSize);
136     ParamMsgContent *valueContent = GetNextContent((const ParamMessage *)msg, &offset);
137     WATCHER_CHECK(valueContent != NULL, return, "Invalid msg ");
138     WATCHER_LOGV("ProcessWatcherMessage name %s watcherId %u ", msg->key, msg->id.watcherId);
139     WatcherGroupPtr group = GetWatcherGroup(msg->id.watcherId);
140     if (group != nullptr) {
141         std::lock_guard<std::mutex> lock(watcherMutex_);
142         group->ProcessParameterChange(msg->key, valueContent->content);
143     }
144 }
145 
GetWatcherGroup(uint32_t groupId)146 WatcherManager::WatcherGroupPtr WatcherManager::GetWatcherGroup(uint32_t groupId)
147 {
148     std::lock_guard<std::mutex> lock(watcherMutex_);
149     if (watcherGroups_.find(groupId) != watcherGroups_.end()) {
150         return watcherGroups_[groupId];
151     }
152     return nullptr;
153 }
154 
GetWatcherGroup(const std::string & keyPrefix)155 WatcherManager::WatcherGroupPtr WatcherManager::GetWatcherGroup(const std::string &keyPrefix)
156 {
157     std::lock_guard<std::mutex> lock(watcherMutex_);
158     if (groupMap_.find(keyPrefix) == groupMap_.end()) {
159         return nullptr;
160     }
161     uint32_t groupId = groupMap_[keyPrefix];
162     if (watcherGroups_.find(groupId) != watcherGroups_.end()) {
163         return watcherGroups_[groupId];
164     }
165     return nullptr;
166 }
167 
AddParamWatcher(const std::string & keyPrefix,WatcherGroupPtr group,ParamWatcherPtr watcher)168 void WatcherManager::AddParamWatcher(const std::string &keyPrefix, WatcherGroupPtr group, ParamWatcherPtr watcher)
169 {
170     WATCHER_LOGV("AddParamWatcher prefix %s watcherId %u", keyPrefix.c_str(), watcher->GetWatcherId());
171     uint32_t groupId = group->GetGroupId();
172     std::lock_guard<std::mutex> lock(watcherMutex_);
173     groupMap_[keyPrefix] = groupId;
174     watchers_[watcher->GetWatcherId()] = watcher;
175     ListAddTail(group->GetWatchers(), watcher->GetGroupNode());
176 
177     if (watcherGroups_.find(groupId) != watcherGroups_.end()) {
178         return;
179     }
180     watcherGroups_[groupId] = group;
181 }
182 
DelParamWatcher(ParamWatcherPtr watcher)183 void WatcherManager::DelParamWatcher(ParamWatcherPtr watcher)
184 {
185     std::lock_guard<std::mutex> lock(watcherMutex_);
186     ListRemove(watcher->GetGroupNode());
187     ListInit(watcher->GetGroupNode());
188     watchers_.erase(watcher->GetWatcherId());
189     WATCHER_LOGV("DelParamWatcher watcherId %u", watcher->GetWatcherId());
190 }
191 
DelWatcherGroup(WatcherGroupPtr group)192 void WatcherManager::DelWatcherGroup(WatcherGroupPtr group)
193 {
194     std::lock_guard<std::mutex> lock(watcherMutex_);
195     groupMap_.erase(group->GetKeyPrefix());
196     watcherGroups_.erase(group->GetGroupId());
197 }
198 
ProcessParameterChange(const std::string & name,const std::string & value)199 void WatcherManager::WatcherGroup::ProcessParameterChange(const std::string &name, const std::string &value)
200 {
201     // walk watcher
202     ListNode *node = nullptr;
203     ForEachListEntry(&watchers_, node) {
204         ParamWatcher *watcher = (ParamWatcher *)node;
205         watcher->ProcessParameterChange(name, value);
206     }
207 }
208 
FilterParam(const char * name,const std::string & keyPrefix)209 static int FilterParam(const char *name, const std::string &keyPrefix)
210 {
211     if (keyPrefix.rfind("*") == keyPrefix.length() - 1) {
212         return strncmp(name, keyPrefix.c_str(), keyPrefix.length() - 1) == 0;
213     }
214     return strcmp(name, keyPrefix.c_str()) == 0;
215 }
216 
SendLocalChange(const std::string & keyPrefix,ParamWatcherPtr watcher)217 void WatcherManager::SendLocalChange(const std::string &keyPrefix, ParamWatcherPtr watcher)
218 {
219     struct Context {
220         char *buffer;
221         ParamWatcherPtr watcher;
222         std::string keyPrefix;
223     };
224     WATCHER_LOGV("SendLocalChange key %s  ", keyPrefix.c_str());
225     std::vector<char> buffer(PARAM_NAME_LEN_MAX + PARAM_CONST_VALUE_LEN_MAX);
226     struct Context context = {buffer.data(), watcher, keyPrefix};
227     // walk watcher
228     SystemTraversalParameter("", [](ParamHandle handle, void *cookie) {
229             struct Context *context = (struct Context *)(cookie);
230             SystemGetParameterName(handle, context->buffer, PARAM_NAME_LEN_MAX);
231             if (!FilterParam(context->buffer, context->keyPrefix)) {
232                 return;
233             }
234             uint32_t size = PARAM_CONST_VALUE_LEN_MAX;
235             SystemGetParameterValue(handle, context->buffer + PARAM_NAME_LEN_MAX, &size);
236             WATCHER_LOGV("SendLocalChange key %s value: %s ", context->buffer, context->buffer + PARAM_NAME_LEN_MAX);
237             context->watcher->ProcessParameterChange(context->buffer, context->buffer + PARAM_NAME_LEN_MAX);
238         }, (void *)&context);
239 }
240 
RunLoop()241 void WatcherManager::RunLoop()
242 {
243     const int32_t RECV_BUFFER_MAX = 5 * 1024;
244     std::vector<char> buffer(RECV_BUFFER_MAX, 0);
245     bool retry = false;
246     ssize_t recvLen = 0;
247     while (!stop_) {
248         int fd = GetServerFd(retry);
249         if (fd >= 0) {
250             recvLen = recv(fd, buffer.data(), RECV_BUFFER_MAX, 0);
251         }
252         if (stop_) {
253             break;
254         }
255         if (recvLen <= 0) {
256             if (errno == EAGAIN) { // timeout
257                 continue;
258             }
259             PARAM_LOGE("Failed to recv msg from server errno %d", errno);
260             retry = true;  // re connect
261         } else if (recvLen >= (ssize_t)sizeof(ParamMessage)) {
262             ProcessWatcherMessage(buffer, recvLen);
263         }
264     }
265     if (serverFd_ > 0) {
266         close(serverFd_);
267         serverFd_ = INVALID_SOCKET;
268     }
269     WATCHER_LOGV("Exit runLoop");
270 }
271 
StartLoop()272 void WatcherManager::StartLoop()
273 {
274     if (pRecvThread_ == nullptr) {
275         pRecvThread_ = new (std::nothrow)std::thread(&WatcherManager::RunLoop, this);
276         WATCHER_CHECK(pRecvThread_ != nullptr, return, "Failed to create thread");
277     }
278 }
279 
GetServerFd(bool retry)280 int WatcherManager::GetServerFd(bool retry)
281 {
282     const int32_t sleepTime = 200;
283     const int32_t maxRetry = 10;
284     std::lock_guard<std::mutex> lock(mutex_);
285     if (retry && serverFd_ != INVALID_SOCKET) {
286         close(serverFd_);
287         serverFd_ = INVALID_SOCKET;
288     }
289     if (serverFd_ != INVALID_SOCKET) {
290         return serverFd_;
291     }
292     int ret = 0;
293     int32_t retryCount = 0;
294     do {
295         serverFd_ = socket(PF_UNIX, SOCK_STREAM, 0);
296         int flags = fcntl(serverFd_, F_GETFL, 0);
297         (void)fcntl(serverFd_, F_SETFL, flags & ~O_NONBLOCK);
298         ret = ConntectServer(serverFd_, CLIENT_PIPE_NAME);
299         if (ret == 0) {
300             break;
301         }
302         close(serverFd_);
303         serverFd_ = INVALID_SOCKET;
304         usleep(sleepTime);
305         retryCount++;
306     } while (retryCount < maxRetry);
307     WATCHER_LOGV("GetServerFd serverFd_ %d retryCount %d ", serverFd_, retryCount);
308     return serverFd_;
309 }
310 
OnStart()311 void WatcherManager::OnStart()
312 {
313     WATCHER_LOGI("WatcherManager OnStart");
314     bool res = Publish(this);
315     if (!res) {
316         WATCHER_LOGE("WatcherManager Publish failed");
317     }
318     if (deathRecipient_ == nullptr) {
319         deathRecipient_ = new DeathRecipient(this);
320     }
321     return;
322 }
323 
StopLoop()324 void WatcherManager::StopLoop()
325 {
326     WATCHER_LOGV("WatcherManager StopLoop serverFd_ %d", serverFd_);
327     stop_ = true;
328     if (serverFd_ > 0) {
329         shutdown(serverFd_, SHUT_RDWR);
330         close(serverFd_);
331         serverFd_ = INVALID_SOCKET;
332     }
333     if (pRecvThread_ != nullptr) {
334         pRecvThread_->join();
335         delete pRecvThread_;
336         pRecvThread_ = nullptr;
337     }
338 }
339 
OnStop()340 void WatcherManager::OnStop()
341 {
342     {
343         std::lock_guard<std::mutex> lock(watcherMutex_);
344         for (auto iter = watchers_.begin(); iter != watchers_.end(); ++iter) {
345             if (iter->second == nullptr) {
346                 continue;
347             }
348             sptr<IRemoteObject> object = iter->second->GetRemoteWatcher()->AsObject();
349             if (object != nullptr) {
350                 object->RemoveDeathRecipient(deathRecipient_);
351             }
352         }
353     }
354     watchers_.clear();
355     watcherGroups_.clear();
356     groupMap_.clear();
357     StopLoop();
358 }
359 
OnRemoteDied(const wptr<IRemoteObject> & remote)360 void WatcherManager::DeathRecipient::OnRemoteDied(const wptr<IRemoteObject> &remote)
361 {
362     WATCHER_CHECK(remote != nullptr, return, "Invalid remote obj");
363     auto paramWatcher = manager_->GetWatcher(remote);
364     WATCHER_CHECK(paramWatcher != nullptr, return, "Failed to get remote watcher info ");
365     WATCHER_LOGV("OnRemoteDied watcherId %u", paramWatcher->GetWatcherId());
366     manager_->DelWatcher(paramWatcher->GetWatcherGroup(), paramWatcher);
367 }
368 
GetWatcher(uint32_t watcherId)369 WatcherManager::ParamWatcherPtr WatcherManager::GetWatcher(uint32_t watcherId)
370 {
371     std::lock_guard<std::mutex> lock(watcherMutex_);
372     auto iter = watchers_.find(watcherId);
373     if (iter != watchers_.end()) {
374         return iter->second;
375     }
376     return nullptr;
377 }
378 
GetWatcher(const wptr<IRemoteObject> & remote)379 WatcherManager::ParamWatcherPtr WatcherManager::GetWatcher(const wptr<IRemoteObject> &remote)
380 {
381     std::lock_guard<std::mutex> lock(watcherMutex_);
382     for (auto iter = watchers_.begin(); iter != watchers_.end(); ++iter) {
383         if (iter->second == nullptr) {
384             continue;
385         }
386         if (remote == iter->second->GetRemoteWatcher()->AsObject()) {
387             return iter->second;
388         }
389     }
390     return nullptr;
391 }
392 
GetWatcherId(uint32_t & watcherId)393 int WatcherManager::GetWatcherId(uint32_t &watcherId)
394 {
395     std::lock_guard<std::mutex> lock(watcherMutex_);
396     watcherId = watcherId_;
397     do {
398         watcherId_++;
399         if (watcherId_ == 0) {
400             watcherId_++;
401         }
402         if (watchers_.find(watcherId_) == watchers_.end()) {
403             break;
404         }
405         WATCHER_CHECK(watcherId_ != watcherId, return -1, "No enough watcherId %u", watcherId);
406     } while (1);
407     watcherId = watcherId_;
408     return 0;
409 }
410 
GetGroupId(uint32_t & groupId)411 int WatcherManager::GetGroupId(uint32_t &groupId)
412 {
413     std::lock_guard<std::mutex> lock(watcherMutex_);
414     groupId = groupId_;
415     do {
416         groupId_++;
417         if (watcherGroups_.find(groupId_) == watcherGroups_.end()) {
418             break;
419         }
420         WATCHER_CHECK(groupId_ == groupId, return -1, "No enough groupId %u", groupId);
421     } while (1);
422     groupId = groupId_;
423     return 0;
424 }
425 } // namespace init_param
426 } // namespace OHOS
427