1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false
18 #include "Log.h"
19
20 #include "StatsPullerManager.h"
21
22 #include <cutils/log.h>
23 #include <math.h>
24 #include <stdint.h>
25
26 #include <algorithm>
27 #include <iostream>
28
29 #include "../StatsService.h"
30 #include "../logd/LogEvent.h"
31 #include "../stats_log_util.h"
32 #include "../statscompanion_util.h"
33 #include "StatsCallbackPuller.h"
34 #include "TrainInfoPuller.h"
35 #include "statslog_statsd.h"
36
37 using std::shared_ptr;
38 using std::vector;
39
40 namespace android {
41 namespace os {
42 namespace statsd {
43
44 // Values smaller than this may require to update the alarm.
45 const int64_t NO_ALARM_UPDATE = INT64_MAX;
46
StatsPullerManager()47 StatsPullerManager::StatsPullerManager()
48 : kAllPullAtomInfo({
49 // TrainInfo.
50 {{.atomTag = util::TRAIN_INFO, .uid = AID_STATSD}, new TrainInfoPuller()},
51 }),
52 mNextPullTimeNs(NO_ALARM_UPDATE) {
53 }
54
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)55 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
56 vector<shared_ptr<LogEvent>>* data) {
57 std::lock_guard<std::mutex> _l(mLock);
58 return PullLocked(tagId, configKey, eventTimeNs, data);
59 }
60
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)61 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
62 vector<std::shared_ptr<LogEvent>>* data) {
63 std::lock_guard<std::mutex> _l(mLock);
64 return PullLocked(tagId, uids, eventTimeNs, data);
65 }
66
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)67 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
68 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
69 vector<int32_t> uids;
70 const auto& uidProviderIt = mPullUidProviders.find(configKey);
71 if (uidProviderIt == mPullUidProviders.end()) {
72 ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
73 configKey.ToString().c_str());
74 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
75 return false;
76 }
77 sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
78 if (pullUidProvider == nullptr) {
79 ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
80 configKey.ToString().c_str());
81 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
82 return false;
83 }
84 uids = pullUidProvider->getPullAtomUids(tagId);
85 return PullLocked(tagId, uids, eventTimeNs, data);
86 }
87
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)88 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
89 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
90 VLOG("Initiating pulling %d", tagId);
91 for (int32_t uid : uids) {
92 PullerKey key = {.atomTag = tagId, .uid = uid};
93 auto pullerIt = kAllPullAtomInfo.find(key);
94 if (pullerIt != kAllPullAtomInfo.end()) {
95 PullErrorCode status = pullerIt->second->Pull(eventTimeNs, data);
96 VLOG("pulled %zu items", data->size());
97 if (status != PULL_SUCCESS) {
98 StatsdStats::getInstance().notePullFailed(tagId);
99 }
100 // If we received a dead object exception, it means the client process has died.
101 // We can remove the puller from the map.
102 if (status == PULL_DEAD_OBJECT) {
103 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(
104 tagId,
105 /*registered=*/false);
106 kAllPullAtomInfo.erase(pullerIt);
107 }
108 return status == PULL_SUCCESS;
109 }
110 }
111 StatsdStats::getInstance().notePullerNotFound(tagId);
112 ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
113 return false; // Return early since we don't know what to pull.
114 }
115
PullerForMatcherExists(int tagId) const116 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
117 // Pulled atoms might be registered after we parse the config, so just make sure the id is in
118 // an appropriate range.
119 return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
120 }
121
updateAlarmLocked()122 void StatsPullerManager::updateAlarmLocked() {
123 if (mNextPullTimeNs == NO_ALARM_UPDATE) {
124 VLOG("No need to set alarms. Skipping");
125 return;
126 }
127
128 // TODO(b/151045771): do not hold a lock while making a binder call
129 if (mStatsCompanionService != nullptr) {
130 mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
131 } else {
132 VLOG("StatsCompanionService not available. Alarm not set.");
133 }
134 return;
135 }
136
SetStatsCompanionService(shared_ptr<IStatsCompanionService> statsCompanionService)137 void StatsPullerManager::SetStatsCompanionService(
138 shared_ptr<IStatsCompanionService> statsCompanionService) {
139 std::lock_guard<std::mutex> _l(mLock);
140 shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
141 mStatsCompanionService = statsCompanionService;
142 for (const auto& pulledAtom : kAllPullAtomInfo) {
143 pulledAtom.second->SetStatsCompanionService(statsCompanionService);
144 }
145 if (mStatsCompanionService != nullptr) {
146 updateAlarmLocked();
147 }
148 }
149
RegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver,int64_t nextPullTimeNs,int64_t intervalNs)150 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
151 wp<PullDataReceiver> receiver, int64_t nextPullTimeNs,
152 int64_t intervalNs) {
153 std::lock_guard<std::mutex> _l(mLock);
154 auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
155 for (auto it = receivers.begin(); it != receivers.end(); it++) {
156 if (it->receiver == receiver) {
157 VLOG("Receiver already registered of %d", (int)receivers.size());
158 return;
159 }
160 }
161 ReceiverInfo receiverInfo;
162 receiverInfo.receiver = receiver;
163
164 // Round it to the nearest minutes. This is the limit of alarm manager.
165 // In practice, we should always have larger buckets.
166 int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
167 // Scheduled pulling should be at least 1 min apart.
168 // This can be lower in cts tests, in which case we round it to 1 min.
169 if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
170 roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
171 }
172
173 receiverInfo.intervalNs = roundedIntervalNs;
174 receiverInfo.nextPullTimeNs = nextPullTimeNs;
175 receivers.push_back(receiverInfo);
176
177 // There is only one alarm for all pulled events. So only set it to the smallest denom.
178 if (nextPullTimeNs < mNextPullTimeNs) {
179 VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
180 mNextPullTimeNs = nextPullTimeNs;
181 updateAlarmLocked();
182 }
183 VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
184 }
185
UnRegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver)186 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
187 wp<PullDataReceiver> receiver) {
188 std::lock_guard<std::mutex> _l(mLock);
189 auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
190 if (receiversIt == mReceivers.end()) {
191 VLOG("Unknown pull code or no receivers: %d", tagId);
192 return;
193 }
194 std::list<ReceiverInfo>& receivers = receiversIt->second;
195 for (auto it = receivers.begin(); it != receivers.end(); it++) {
196 if (receiver == it->receiver) {
197 receivers.erase(it);
198 VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
199 return;
200 }
201 }
202 }
203
RegisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)204 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
205 wp<PullUidProvider> provider) {
206 std::lock_guard<std::mutex> _l(mLock);
207 mPullUidProviders[configKey] = provider;
208 }
209
UnregisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)210 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
211 wp<PullUidProvider> provider) {
212 std::lock_guard<std::mutex> _l(mLock);
213 const auto& it = mPullUidProviders.find(configKey);
214 if (it != mPullUidProviders.end() && it->second == provider) {
215 mPullUidProviders.erase(it);
216 }
217 }
218
OnAlarmFired(int64_t elapsedTimeNs)219 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
220 std::lock_guard<std::mutex> _l(mLock);
221 int64_t wallClockNs = getWallClockNs();
222
223 int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
224
225 vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
226 for (auto& pair : mReceivers) {
227 vector<ReceiverInfo*> receivers;
228 if (pair.second.size() != 0) {
229 for (ReceiverInfo& receiverInfo : pair.second) {
230 // If pullNecessary and enough time has passed for the next bucket, then add
231 // receiver to the list that will pull on this alarm.
232 // If pullNecessary is false, check if next pull time needs to be updated.
233 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
234 const bool pullNecessary = receiverPtr != nullptr && receiverPtr->isPullNeeded();
235 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && pullNecessary) {
236 receivers.push_back(&receiverInfo);
237 } else {
238 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
239 receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
240 int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
241 receiverInfo.intervalNs;
242 receiverInfo.nextPullTimeNs +=
243 (numBucketsAhead + 1) * receiverInfo.intervalNs;
244 }
245 minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
246 }
247 }
248 if (receivers.size() > 0) {
249 needToPull.push_back(make_pair(&pair.first, receivers));
250 }
251 }
252 }
253 for (const auto& pullInfo : needToPull) {
254 vector<shared_ptr<LogEvent>> data;
255 PullResult pullResult =
256 PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data)
257 ? PullResult::PULL_RESULT_SUCCESS
258 : PullResult::PULL_RESULT_FAIL;
259 if (pullResult == PullResult::PULL_RESULT_FAIL) {
260 VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
261 }
262
263 // Convention is to mark pull atom timestamp at request time.
264 // If we pull at t0, puller starts at t1, finishes at t2, and send back
265 // at t3, we mark t0 as its timestamp, which should correspond to its
266 // triggering event, such as condition change at t0.
267 // Here the triggering event is alarm fired from AlarmManager.
268 // In ValueMetricProducer and GaugeMetricProducer we do same thing
269 // when pull on condition change, etc.
270 for (auto& event : data) {
271 event->setElapsedTimestampNs(elapsedTimeNs);
272 event->setLogdWallClockTimestampNs(wallClockNs);
273 }
274
275 for (const auto& receiverInfo : pullInfo.second) {
276 sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
277 if (receiverPtr != nullptr) {
278 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
279 // We may have just come out of a coma, compute next pull time.
280 int numBucketsAhead =
281 (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
282 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
283 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
284 } else {
285 VLOG("receiver already gone.");
286 }
287 }
288 }
289
290 VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
291 (long long)minNextPullTimeNs);
292 mNextPullTimeNs = minNextPullTimeNs;
293 updateAlarmLocked();
294 }
295
ForceClearPullerCache()296 int StatsPullerManager::ForceClearPullerCache() {
297 std::lock_guard<std::mutex> _l(mLock);
298 int totalCleared = 0;
299 for (const auto& pulledAtom : kAllPullAtomInfo) {
300 totalCleared += pulledAtom.second->ForceClearCache();
301 }
302 return totalCleared;
303 }
304
ClearPullerCacheIfNecessary(int64_t timestampNs)305 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
306 std::lock_guard<std::mutex> _l(mLock);
307 int totalCleared = 0;
308 for (const auto& pulledAtom : kAllPullAtomInfo) {
309 totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
310 }
311 return totalCleared;
312 }
313
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)314 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
315 const int64_t coolDownNs, const int64_t timeoutNs,
316 const vector<int32_t>& additiveFields,
317 const shared_ptr<IPullAtomCallback>& callback) {
318 std::lock_guard<std::mutex> _l(mLock);
319 VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
320
321 if (callback == nullptr) {
322 ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
323 return;
324 }
325
326 int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
327 int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
328
329 sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
330 actualTimeoutNs, additiveFields);
331 PullerKey key = {.atomTag = atomTag, .uid = uid};
332 auto it = kAllPullAtomInfo.find(key);
333 if (it != kAllPullAtomInfo.end()) {
334 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
335 /*registered=*/false);
336 }
337 kAllPullAtomInfo[key] = puller;
338 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
339 }
340
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)341 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
342 std::lock_guard<std::mutex> _l(mLock);
343 PullerKey key = {.atomTag = atomTag, .uid = uid};
344 if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) {
345 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
346 /*registered=*/false);
347 kAllPullAtomInfo.erase(key);
348 }
349 }
350
351 } // namespace statsd
352 } // namespace os
353 } // namespace android
354