• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false
18 #include "Log.h"
19 
20 #include "StatsPullerManager.h"
21 
22 #include <cutils/log.h>
23 #include <math.h>
24 #include <stdint.h>
25 
26 #include <algorithm>
27 #include <iostream>
28 
29 #include "../StatsService.h"
30 #include "../logd/LogEvent.h"
31 #include "../stats_log_util.h"
32 #include "../statscompanion_util.h"
33 #include "StatsCallbackPuller.h"
34 #include "TrainInfoPuller.h"
35 #include "statslog_statsd.h"
36 
37 using std::shared_ptr;
38 using std::vector;
39 
40 namespace android {
41 namespace os {
42 namespace statsd {
43 
44 // Values smaller than this may require to update the alarm.
45 const int64_t NO_ALARM_UPDATE = INT64_MAX;
46 
StatsPullerManager()47 StatsPullerManager::StatsPullerManager()
48     : kAllPullAtomInfo({
49               // TrainInfo.
50               {{.atomTag = util::TRAIN_INFO, .uid = AID_STATSD}, new TrainInfoPuller()},
51       }),
52       mNextPullTimeNs(NO_ALARM_UPDATE) {
53 }
54 
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)55 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
56                               vector<shared_ptr<LogEvent>>* data) {
57     std::lock_guard<std::mutex> _l(mLock);
58     return PullLocked(tagId, configKey, eventTimeNs, data);
59 }
60 
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)61 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
62                               vector<std::shared_ptr<LogEvent>>* data) {
63     std::lock_guard<std::mutex> _l(mLock);
64     return PullLocked(tagId, uids, eventTimeNs, data);
65 }
66 
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)67 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
68                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
69     vector<int32_t> uids;
70     const auto& uidProviderIt = mPullUidProviders.find(configKey);
71     if (uidProviderIt == mPullUidProviders.end()) {
72         ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
73               configKey.ToString().c_str());
74         StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
75         return false;
76     }
77     sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
78     if (pullUidProvider == nullptr) {
79         ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
80               configKey.ToString().c_str());
81         StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
82         return false;
83     }
84     uids = pullUidProvider->getPullAtomUids(tagId);
85     return PullLocked(tagId, uids, eventTimeNs, data);
86 }
87 
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)88 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
89                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
90     VLOG("Initiating pulling %d", tagId);
91     for (int32_t uid : uids) {
92         PullerKey key = {.atomTag = tagId, .uid = uid};
93         auto pullerIt = kAllPullAtomInfo.find(key);
94         if (pullerIt != kAllPullAtomInfo.end()) {
95             PullErrorCode status = pullerIt->second->Pull(eventTimeNs, data);
96             VLOG("pulled %zu items", data->size());
97             if (status != PULL_SUCCESS) {
98                 StatsdStats::getInstance().notePullFailed(tagId);
99             }
100             // If we received a dead object exception, it means the client process has died.
101             // We can remove the puller from the map.
102             if (status == PULL_DEAD_OBJECT) {
103                 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(
104                         tagId,
105                         /*registered=*/false);
106                 kAllPullAtomInfo.erase(pullerIt);
107             }
108             return status == PULL_SUCCESS;
109         }
110     }
111     StatsdStats::getInstance().notePullerNotFound(tagId);
112     ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
113     return false;  // Return early since we don't know what to pull.
114 }
115 
PullerForMatcherExists(int tagId) const116 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
117     // Pulled atoms might be registered after we parse the config, so just make sure the id is in
118     // an appropriate range.
119     return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
120 }
121 
updateAlarmLocked()122 void StatsPullerManager::updateAlarmLocked() {
123     if (mNextPullTimeNs == NO_ALARM_UPDATE) {
124         VLOG("No need to set alarms. Skipping");
125         return;
126     }
127 
128     // TODO(b/151045771): do not hold a lock while making a binder call
129     if (mStatsCompanionService != nullptr) {
130         mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
131     } else {
132         VLOG("StatsCompanionService not available. Alarm not set.");
133     }
134     return;
135 }
136 
SetStatsCompanionService(shared_ptr<IStatsCompanionService> statsCompanionService)137 void StatsPullerManager::SetStatsCompanionService(
138         shared_ptr<IStatsCompanionService> statsCompanionService) {
139     std::lock_guard<std::mutex> _l(mLock);
140     shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
141     mStatsCompanionService = statsCompanionService;
142     for (const auto& pulledAtom : kAllPullAtomInfo) {
143         pulledAtom.second->SetStatsCompanionService(statsCompanionService);
144     }
145     if (mStatsCompanionService != nullptr) {
146         updateAlarmLocked();
147     }
148 }
149 
RegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver,int64_t nextPullTimeNs,int64_t intervalNs)150 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
151                                           wp<PullDataReceiver> receiver, int64_t nextPullTimeNs,
152                                           int64_t intervalNs) {
153     std::lock_guard<std::mutex> _l(mLock);
154     auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
155     for (auto it = receivers.begin(); it != receivers.end(); it++) {
156         if (it->receiver == receiver) {
157             VLOG("Receiver already registered of %d", (int)receivers.size());
158             return;
159         }
160     }
161     ReceiverInfo receiverInfo;
162     receiverInfo.receiver = receiver;
163 
164     // Round it to the nearest minutes. This is the limit of alarm manager.
165     // In practice, we should always have larger buckets.
166     int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
167     // Scheduled pulling should be at least 1 min apart.
168     // This can be lower in cts tests, in which case we round it to 1 min.
169     if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
170         roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
171     }
172 
173     receiverInfo.intervalNs = roundedIntervalNs;
174     receiverInfo.nextPullTimeNs = nextPullTimeNs;
175     receivers.push_back(receiverInfo);
176 
177     // There is only one alarm for all pulled events. So only set it to the smallest denom.
178     if (nextPullTimeNs < mNextPullTimeNs) {
179         VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
180         mNextPullTimeNs = nextPullTimeNs;
181         updateAlarmLocked();
182     }
183     VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
184 }
185 
UnRegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver)186 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
187                                             wp<PullDataReceiver> receiver) {
188     std::lock_guard<std::mutex> _l(mLock);
189     auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
190     if (receiversIt == mReceivers.end()) {
191         VLOG("Unknown pull code or no receivers: %d", tagId);
192         return;
193     }
194     std::list<ReceiverInfo>& receivers = receiversIt->second;
195     for (auto it = receivers.begin(); it != receivers.end(); it++) {
196         if (receiver == it->receiver) {
197             receivers.erase(it);
198             VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
199             return;
200         }
201     }
202 }
203 
RegisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)204 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
205                                                  wp<PullUidProvider> provider) {
206     std::lock_guard<std::mutex> _l(mLock);
207     mPullUidProviders[configKey] = provider;
208 }
209 
UnregisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)210 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
211                                                    wp<PullUidProvider> provider) {
212     std::lock_guard<std::mutex> _l(mLock);
213     const auto& it = mPullUidProviders.find(configKey);
214     if (it != mPullUidProviders.end() && it->second == provider) {
215         mPullUidProviders.erase(it);
216     }
217 }
218 
OnAlarmFired(int64_t elapsedTimeNs)219 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
220     std::lock_guard<std::mutex> _l(mLock);
221     int64_t wallClockNs = getWallClockNs();
222 
223     int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
224 
225     vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
226     for (auto& pair : mReceivers) {
227         vector<ReceiverInfo*> receivers;
228         if (pair.second.size() != 0) {
229             for (ReceiverInfo& receiverInfo : pair.second) {
230                 // If pullNecessary and enough time has passed for the next bucket, then add
231                 // receiver to the list that will pull on this alarm.
232                 // If pullNecessary is false, check if next pull time needs to be updated.
233                 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
234                 const bool pullNecessary = receiverPtr != nullptr && receiverPtr->isPullNeeded();
235                 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && pullNecessary) {
236                     receivers.push_back(&receiverInfo);
237                 } else {
238                     if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
239                         receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
240                         int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
241                                               receiverInfo.intervalNs;
242                         receiverInfo.nextPullTimeNs +=
243                                 (numBucketsAhead + 1) * receiverInfo.intervalNs;
244                     }
245                     minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
246                 }
247             }
248             if (receivers.size() > 0) {
249                 needToPull.push_back(make_pair(&pair.first, receivers));
250             }
251         }
252     }
253     for (const auto& pullInfo : needToPull) {
254         vector<shared_ptr<LogEvent>> data;
255         PullResult pullResult =
256                 PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data)
257                         ? PullResult::PULL_RESULT_SUCCESS
258                         : PullResult::PULL_RESULT_FAIL;
259         if (pullResult == PullResult::PULL_RESULT_FAIL) {
260             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
261         }
262 
263         // Convention is to mark pull atom timestamp at request time.
264         // If we pull at t0, puller starts at t1, finishes at t2, and send back
265         // at t3, we mark t0 as its timestamp, which should correspond to its
266         // triggering event, such as condition change at t0.
267         // Here the triggering event is alarm fired from AlarmManager.
268         // In ValueMetricProducer and GaugeMetricProducer we do same thing
269         // when pull on condition change, etc.
270         for (auto& event : data) {
271             event->setElapsedTimestampNs(elapsedTimeNs);
272             event->setLogdWallClockTimestampNs(wallClockNs);
273         }
274 
275         for (const auto& receiverInfo : pullInfo.second) {
276             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
277             if (receiverPtr != nullptr) {
278                 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
279                 // We may have just come out of a coma, compute next pull time.
280                 int numBucketsAhead =
281                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
282                 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
283                 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
284             } else {
285                 VLOG("receiver already gone.");
286             }
287         }
288     }
289 
290     VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
291          (long long)minNextPullTimeNs);
292     mNextPullTimeNs = minNextPullTimeNs;
293     updateAlarmLocked();
294 }
295 
ForceClearPullerCache()296 int StatsPullerManager::ForceClearPullerCache() {
297     std::lock_guard<std::mutex> _l(mLock);
298     int totalCleared = 0;
299     for (const auto& pulledAtom : kAllPullAtomInfo) {
300         totalCleared += pulledAtom.second->ForceClearCache();
301     }
302     return totalCleared;
303 }
304 
ClearPullerCacheIfNecessary(int64_t timestampNs)305 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
306     std::lock_guard<std::mutex> _l(mLock);
307     int totalCleared = 0;
308     for (const auto& pulledAtom : kAllPullAtomInfo) {
309         totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
310     }
311     return totalCleared;
312 }
313 
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)314 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
315                                                   const int64_t coolDownNs, const int64_t timeoutNs,
316                                                   const vector<int32_t>& additiveFields,
317                                                   const shared_ptr<IPullAtomCallback>& callback) {
318     std::lock_guard<std::mutex> _l(mLock);
319     VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
320 
321     if (callback == nullptr) {
322         ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
323         return;
324     }
325 
326     int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
327     int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
328 
329     sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
330                                                              actualTimeoutNs, additiveFields);
331     PullerKey key = {.atomTag = atomTag, .uid = uid};
332     auto it = kAllPullAtomInfo.find(key);
333     if (it != kAllPullAtomInfo.end()) {
334         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
335                                                                          /*registered=*/false);
336     }
337     kAllPullAtomInfo[key] = puller;
338     StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
339 }
340 
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)341 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
342     std::lock_guard<std::mutex> _l(mLock);
343     PullerKey key = {.atomTag = atomTag, .uid = uid};
344     if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) {
345         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
346                                                                          /*registered=*/false);
347         kAllPullAtomInfo.erase(key);
348     }
349 }
350 
351 }  // namespace statsd
352 }  // namespace os
353 }  // namespace android
354