1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false
18 #include "Log.h"
19
20 #include "StatsPullerManager.h"
21
22 #include <com_android_os_statsd_flags.h>
23 #include <cutils/log.h>
24 #include <math.h>
25 #include <stdint.h>
26
27 #include <algorithm>
28 #include <atomic>
29 #include <iostream>
30
31 #include "../StatsService.h"
32 #include "../logd/LogEvent.h"
33 #include "../stats_log_util.h"
34 #include "../statscompanion_util.h"
35 #include "StatsCallbackPuller.h"
36 #include "TrainInfoPuller.h"
37 #include "statslog_statsd.h"
38
39 using std::shared_ptr;
40 using std::vector;
41
42 namespace flags = com::android::os::statsd::flags;
43
44 namespace android {
45 namespace os {
46 namespace statsd {
47
48 // Values smaller than this may require to update the alarm.
49 const int64_t NO_ALARM_UPDATE = INT64_MAX;
50 // Use 3 threads to avoid overwhelming system server binder threads
51 const int32_t PULLER_THREAD_COUNT = 3;
52
pullImpl(const PullerKey & key,const sp<StatsPuller> & puller,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)53 static PullErrorCode pullImpl(const PullerKey& key, const sp<StatsPuller>& puller,
54 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
55 VLOG("Initiating pulling %d", key.atomTag);
56 PullErrorCode status = puller->Pull(eventTimeNs, data);
57 VLOG("pulled %zu items", data->size());
58 if (status != PULL_SUCCESS) {
59 StatsdStats::getInstance().notePullFailed(key.atomTag);
60 }
61 return status;
62 }
63
StatsPullerManager()64 StatsPullerManager::StatsPullerManager()
65 : mAllPullAtomInfo({
66 // TrainInfo.
67 {{.uid = AID_STATSD, .atomTag = util::TRAIN_INFO}, new TrainInfoPuller()},
68 }),
69 mNextPullTimeNs(NO_ALARM_UPDATE) {
70 }
71
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)72 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
73 vector<shared_ptr<LogEvent>>* data) {
74 ATRACE_CALL();
75 std::lock_guard<std::mutex> _l(mLock);
76 return PullLocked(tagId, configKey, eventTimeNs, data);
77 }
78
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)79 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
80 vector<std::shared_ptr<LogEvent>>* data) {
81 ATRACE_CALL();
82 std::lock_guard<std::mutex> _l(mLock);
83 return PullLocked(tagId, uids, eventTimeNs, data);
84 }
85
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)86 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
87 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
88 vector<int32_t> uids;
89 if (!getPullerUidsLocked(tagId, configKey, uids)) {
90 return false;
91 }
92 return PullLocked(tagId, uids, eventTimeNs, data);
93 }
94
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)95 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
96 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
97 VLOG("Initiating pulling %d", tagId);
98 for (int32_t uid : uids) {
99 PullerKey key = {.uid = uid, .atomTag = tagId};
100 auto pullerIt = mAllPullAtomInfo.find(key);
101 if (pullerIt != mAllPullAtomInfo.end()) {
102 PullErrorCode status = PULL_SUCCESS;
103 if (flags::parallel_pulls()) {
104 status = pullImpl(key, pullerIt->second, eventTimeNs, data);
105 } else {
106 status = pullerIt->second->Pull(eventTimeNs, data);
107 VLOG("pulled %zu items", data->size());
108 if (status != PULL_SUCCESS) {
109 StatsdStats::getInstance().notePullFailed(tagId);
110 }
111 }
112 // If we received a dead object exception, it means the client process has died.
113 // We can remove the puller from the map.
114 if (status == PULL_DEAD_OBJECT) {
115 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(
116 tagId,
117 /*registered=*/false);
118 mAllPullAtomInfo.erase(pullerIt);
119 }
120 return status == PULL_SUCCESS;
121 }
122 }
123 StatsdStats::getInstance().notePullerNotFound(tagId);
124 ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
125 return false; // Return early since we don't know what to pull.
126 }
127
PullerForMatcherExists(int tagId) const128 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
129 // Pulled atoms might be registered after we parse the config, so just make sure the id is in
130 // an appropriate range.
131 return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
132 }
133
updateAlarmLocked()134 void StatsPullerManager::updateAlarmLocked() {
135 if (mNextPullTimeNs == NO_ALARM_UPDATE) {
136 VLOG("No need to set alarms. Skipping");
137 return;
138 }
139
140 // TODO(b/151045771): do not hold a lock while making a binder call
141 if (mStatsCompanionService != nullptr) {
142 mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
143 } else {
144 VLOG("StatsCompanionService not available. Alarm not set.");
145 }
146 return;
147 }
148
SetStatsCompanionService(const shared_ptr<IStatsCompanionService> & statsCompanionService)149 void StatsPullerManager::SetStatsCompanionService(
150 const shared_ptr<IStatsCompanionService>& statsCompanionService) {
151 std::lock_guard<std::mutex> _l(mLock);
152 shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
153 mStatsCompanionService = statsCompanionService;
154 for (const auto& pulledAtom : mAllPullAtomInfo) {
155 pulledAtom.second->SetStatsCompanionService(statsCompanionService);
156 }
157 if (mStatsCompanionService != nullptr) {
158 updateAlarmLocked();
159 }
160 }
161
RegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver,int64_t nextPullTimeNs,int64_t intervalNs)162 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
163 const wp<PullDataReceiver>& receiver,
164 int64_t nextPullTimeNs, int64_t intervalNs) {
165 std::lock_guard<std::mutex> _l(mLock);
166 auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
167 for (auto it = receivers.begin(); it != receivers.end(); it++) {
168 if (it->receiver == receiver) {
169 VLOG("Receiver already registered of %d", (int)receivers.size());
170 return;
171 }
172 }
173 ReceiverInfo receiverInfo;
174 receiverInfo.receiver = receiver;
175
176 // Round it to the nearest minutes. This is the limit of alarm manager.
177 // In practice, we should always have larger buckets.
178 int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
179 // Scheduled pulling should be at least 1 min apart.
180 // This can be lower in cts tests, in which case we round it to 1 min.
181 if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
182 roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
183 }
184
185 receiverInfo.intervalNs = roundedIntervalNs;
186 receiverInfo.nextPullTimeNs = nextPullTimeNs;
187 receivers.push_back(receiverInfo);
188
189 // There is only one alarm for all pulled events. So only set it to the smallest denom.
190 if (nextPullTimeNs < mNextPullTimeNs) {
191 VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
192 mNextPullTimeNs = nextPullTimeNs;
193 updateAlarmLocked();
194 }
195 VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
196 }
197
UnRegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver)198 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
199 const wp<PullDataReceiver>& receiver) {
200 std::lock_guard<std::mutex> _l(mLock);
201 auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
202 if (receiversIt == mReceivers.end()) {
203 VLOG("Unknown pull code or no receivers: %d", tagId);
204 return;
205 }
206 std::list<ReceiverInfo>& receivers = receiversIt->second;
207 for (auto it = receivers.begin(); it != receivers.end(); it++) {
208 if (receiver == it->receiver) {
209 receivers.erase(it);
210 VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
211 return;
212 }
213 }
214 }
215
RegisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)216 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
217 const wp<PullUidProvider>& provider) {
218 std::lock_guard<std::mutex> _l(mLock);
219 mPullUidProviders[configKey] = provider;
220 }
221
UnregisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)222 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
223 const wp<PullUidProvider>& provider) {
224 std::lock_guard<std::mutex> _l(mLock);
225 const auto& it = mPullUidProviders.find(configKey);
226 if (it != mPullUidProviders.end() && it->second == provider) {
227 mPullUidProviders.erase(it);
228 }
229 }
230
processPullerQueue(ThreadSafeQueue<StatsPullerManager::PullerParams> & pullerQueue,std::queue<StatsPullerManager::PulledInfo> & pulledData,const int64_t wallClockNs,const int64_t elapsedTimeNs,std::atomic_int & pendingThreads,std::condition_variable & mainThreadCondition,std::mutex & mainThreadConditionLock)231 static void processPullerQueue(ThreadSafeQueue<StatsPullerManager::PullerParams>& pullerQueue,
232 std::queue<StatsPullerManager::PulledInfo>& pulledData,
233 const int64_t wallClockNs, const int64_t elapsedTimeNs,
234 std::atomic_int& pendingThreads,
235 std::condition_variable& mainThreadCondition,
236 std::mutex& mainThreadConditionLock) {
237 std::optional<StatsPullerManager::PullerParams> queueResult = pullerQueue.pop();
238 while (queueResult.has_value()) {
239 const StatsPullerManager::PullerParams pullerParams = queueResult.value();
240 vector<shared_ptr<LogEvent>> data;
241 PullErrorCode pullErrorCode =
242 pullImpl(pullerParams.key, pullerParams.puller, elapsedTimeNs, &data);
243
244 if (pullErrorCode != PULL_SUCCESS) {
245 VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
246 }
247
248 // Convention is to mark pull atom timestamp at request time.
249 // If we pull at t0, puller starts at t1, finishes at t2, and send back
250 // at t3, we mark t0 as its timestamp, which should correspond to its
251 // triggering event, such as condition change at t0.
252 // Here the triggering event is alarm fired from AlarmManager.
253 // In ValueMetricProducer and GaugeMetricProducer we do same thing
254 // when pull on condition change, etc.
255 for (auto& event : data) {
256 event->setElapsedTimestampNs(elapsedTimeNs);
257 event->setLogdWallClockTimestampNs(wallClockNs);
258 }
259
260 StatsPullerManager::PulledInfo pulledInfo;
261 pulledInfo.pullErrorCode = pullErrorCode;
262 pulledInfo.pullerKey = pullerParams.key;
263 pulledInfo.receiverInfo = std::move(pullerParams.receivers);
264 pulledInfo.data = std::move(data);
265 mainThreadConditionLock.lock();
266 pulledData.push(pulledInfo);
267 mainThreadConditionLock.unlock();
268 mainThreadCondition.notify_one();
269
270 queueResult = pullerQueue.pop();
271 }
272 pendingThreads--;
273 mainThreadCondition.notify_one();
274 }
275
OnAlarmFired(int64_t elapsedTimeNs)276 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
277 ATRACE_CALL();
278 std::lock_guard<std::mutex> _l(mLock);
279 int64_t wallClockNs = getWallClockNs();
280
281 int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
282 if (flags::parallel_pulls()) {
283 ThreadSafeQueue<PullerParams> pullerQueue;
284 std::queue<PulledInfo> pulledData;
285 initPullerQueue(pullerQueue, pulledData, elapsedTimeNs, minNextPullTimeNs);
286 std::mutex mainThreadConditionLock;
287 std::condition_variable waitForPullerThreadsCondition;
288 vector<thread> pullerThreads;
289 std::atomic_int pendingThreads = PULLER_THREAD_COUNT;
290 pullerThreads.reserve(PULLER_THREAD_COUNT);
291 // Spawn multiple threads to simultaneously pull all necessary pullers. These pullers push
292 // the pulled data to a queue for the main thread to process.
293 for (int i = 0; i < PULLER_THREAD_COUNT; ++i) {
294 pullerThreads.emplace_back(
295 processPullerQueue, std::ref(pullerQueue), std::ref(pulledData), wallClockNs,
296 elapsedTimeNs, std::ref(pendingThreads),
297 std::ref(waitForPullerThreadsCondition), std::ref(mainThreadConditionLock));
298 }
299
300 // Process all pull results on the main thread without waiting for the puller threads
301 // to finish.
302 while (true) {
303 std::unique_lock<std::mutex> lock(mainThreadConditionLock);
304 waitForPullerThreadsCondition.wait(lock, [&pulledData, &pendingThreads]() -> bool {
305 return pendingThreads == 0 || !pulledData.empty();
306 });
307 if (!pulledData.empty()) {
308 const PulledInfo pullResultInfo = std::move(pulledData.front());
309 pulledData.pop();
310 const PullErrorCode pullErrorCode = pullResultInfo.pullErrorCode;
311 const vector<ReceiverInfo*>& receiverInfos = pullResultInfo.receiverInfo;
312 const vector<shared_ptr<LogEvent>>& data = pullResultInfo.data;
313 for (const auto& receiverInfo : receiverInfos) {
314 sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
315 if (receiverPtr != nullptr) {
316 PullResult pullResult = pullErrorCode == PULL_SUCCESS
317 ? PullResult::PULL_RESULT_SUCCESS
318 : PullResult::PULL_RESULT_FAIL;
319 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
320 // We may have just come out of a coma, compute next pull time.
321 int numBucketsAhead = (elapsedTimeNs - receiverInfo->nextPullTimeNs) /
322 receiverInfo->intervalNs;
323 receiverInfo->nextPullTimeNs +=
324 (numBucketsAhead + 1) * receiverInfo->intervalNs;
325 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
326 } else {
327 VLOG("receiver already gone.");
328 }
329 }
330 if (pullErrorCode == PULL_DEAD_OBJECT) {
331 mAllPullAtomInfo.erase(pullResultInfo.pullerKey);
332 }
333 // else if is used here for the edge case of all threads being completed but
334 // there are remaining pulled results in the queue to process.
335 } else if (pendingThreads == 0) {
336 break;
337 }
338 }
339
340 for (thread& pullerThread : pullerThreads) {
341 pullerThread.join();
342 }
343
344 } else {
345 onAlarmFiredSynchronous(elapsedTimeNs, wallClockNs, minNextPullTimeNs);
346 }
347 VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
348 (long long)minNextPullTimeNs);
349 mNextPullTimeNs = minNextPullTimeNs;
350 updateAlarmLocked();
351 }
352
onAlarmFiredSynchronous(const int64_t elapsedTimeNs,const int64_t wallClockNs,int64_t & minNextPullTimeNs)353 void StatsPullerManager::onAlarmFiredSynchronous(const int64_t elapsedTimeNs,
354 const int64_t wallClockNs,
355 int64_t& minNextPullTimeNs) {
356 vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
357 for (auto& pair : mReceivers) {
358 vector<ReceiverInfo*> receivers;
359 if (pair.second.size() != 0) {
360 for (ReceiverInfo& receiverInfo : pair.second) {
361 // If pullNecessary and enough time has passed for the next bucket, then add
362 // receiver to the list that will pull on this alarm.
363 // If pullNecessary is false, check if next pull time needs to be updated.
364 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
365 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr &&
366 receiverPtr->isPullNeeded()) {
367 receivers.push_back(&receiverInfo);
368 } else {
369 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
370 receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
371 int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
372 receiverInfo.intervalNs;
373 receiverInfo.nextPullTimeNs +=
374 (numBucketsAhead + 1) * receiverInfo.intervalNs;
375 }
376 minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
377 }
378 }
379 if (receivers.size() > 0) {
380 needToPull.push_back(make_pair(&pair.first, receivers));
381 }
382 }
383 }
384 for (const auto& pullInfo : needToPull) {
385 vector<shared_ptr<LogEvent>> data;
386 PullResult pullResult =
387 PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data)
388 ? PullResult::PULL_RESULT_SUCCESS
389 : PullResult::PULL_RESULT_FAIL;
390 if (pullResult == PullResult::PULL_RESULT_FAIL) {
391 VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
392 }
393
394 // Convention is to mark pull atom timestamp at request time.
395 // If we pull at t0, puller starts at t1, finishes at t2, and send back
396 // at t3, we mark t0 as its timestamp, which should correspond to its
397 // triggering event, such as condition change at t0.
398 // Here the triggering event is alarm fired from AlarmManager.
399 // In ValueMetricProducer and GaugeMetricProducer we do same thing
400 // when pull on condition change, etc.
401 for (auto& event : data) {
402 event->setElapsedTimestampNs(elapsedTimeNs);
403 event->setLogdWallClockTimestampNs(wallClockNs);
404 }
405
406 for (const auto& receiverInfo : pullInfo.second) {
407 sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
408 if (receiverPtr != nullptr) {
409 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
410 // We may have just come out of a coma, compute next pull time.
411 int numBucketsAhead =
412 (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
413 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
414 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
415 } else {
416 VLOG("receiver already gone.");
417 }
418 }
419 }
420 }
421
getPullerUidsLocked(const int tagId,const ConfigKey & configKey,vector<int32_t> & uids)422 bool StatsPullerManager::getPullerUidsLocked(const int tagId, const ConfigKey& configKey,
423 vector<int32_t>& uids) {
424 const auto& uidProviderIt = mPullUidProviders.find(configKey);
425 if (uidProviderIt == mPullUidProviders.end()) {
426 ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
427 configKey.ToString().c_str());
428 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
429 return false;
430 }
431 sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
432 if (pullUidProvider == nullptr) {
433 ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
434 configKey.ToString().c_str());
435 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
436 return false;
437 }
438 uids = pullUidProvider->getPullAtomUids(tagId);
439 return true;
440 }
441
initPullerQueue(ThreadSafeQueue<PullerParams> & pullerQueue,std::queue<PulledInfo> & pulledData,const int64_t elapsedTimeNs,int64_t & minNextPullTimeNs)442 void StatsPullerManager::initPullerQueue(ThreadSafeQueue<PullerParams>& pullerQueue,
443 std::queue<PulledInfo>& pulledData,
444 const int64_t elapsedTimeNs, int64_t& minNextPullTimeNs) {
445 for (auto& pair : mReceivers) {
446 vector<ReceiverInfo*> receivers;
447 if (pair.second.size() != 0) {
448 for (ReceiverInfo& receiverInfo : pair.second) {
449 // If pullNecessary and enough time has passed for the next bucket, then add
450 // receiver to the list that will pull on this alarm.
451 // If pullNecessary is false, check if next pull time needs to be updated.
452 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
453 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr &&
454 receiverPtr->isPullNeeded()) {
455 receivers.push_back(&receiverInfo);
456 } else {
457 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
458 receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
459 int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
460 receiverInfo.intervalNs;
461 receiverInfo.nextPullTimeNs +=
462 (numBucketsAhead + 1) * receiverInfo.intervalNs;
463 }
464 minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
465 }
466 }
467 if (receivers.size() > 0) {
468 bool foundPuller = false;
469 int tagId = pair.first.atomTag;
470 vector<int32_t> uids;
471 if (getPullerUidsLocked(tagId, pair.first.configKey, uids)) {
472 for (int32_t uid : uids) {
473 PullerKey key = {.uid = uid, .atomTag = tagId};
474 auto pullerIt = mAllPullAtomInfo.find(key);
475 if (pullerIt != mAllPullAtomInfo.end()) {
476 PullerParams params;
477 params.key = key;
478 params.puller = pullerIt->second;
479 params.receivers = std::move(receivers);
480 pullerQueue.push(params);
481 foundPuller = true;
482 break;
483 }
484 }
485 if (!foundPuller) {
486 StatsdStats::getInstance().notePullerNotFound(tagId);
487 ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
488 }
489 }
490 if (!foundPuller) {
491 PulledInfo pulledInfo;
492 pulledInfo.pullErrorCode = PullErrorCode::PULL_FAIL;
493 pulledInfo.receiverInfo = std::move(receivers);
494 pulledData.push(pulledInfo);
495 }
496 }
497 }
498 }
499 }
500
ForceClearPullerCache()501 int StatsPullerManager::ForceClearPullerCache() {
502 ATRACE_CALL();
503 std::lock_guard<std::mutex> _l(mLock);
504 int totalCleared = 0;
505 for (const auto& pulledAtom : mAllPullAtomInfo) {
506 totalCleared += pulledAtom.second->ForceClearCache();
507 }
508 return totalCleared;
509 }
510
ClearPullerCacheIfNecessary(int64_t timestampNs)511 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
512 ATRACE_CALL();
513 std::lock_guard<std::mutex> _l(mLock);
514 int totalCleared = 0;
515 for (const auto& pulledAtom : mAllPullAtomInfo) {
516 totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
517 }
518 return totalCleared;
519 }
520
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)521 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
522 const int64_t coolDownNs, const int64_t timeoutNs,
523 const vector<int32_t>& additiveFields,
524 const shared_ptr<IPullAtomCallback>& callback) {
525 ATRACE_CALL();
526 std::lock_guard<std::mutex> _l(mLock);
527 VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
528
529 if (callback == nullptr) {
530 ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
531 return;
532 }
533
534 int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
535 int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
536
537 sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
538 actualTimeoutNs, additiveFields);
539 PullerKey key = {.uid = uid, .atomTag = atomTag};
540 auto it = mAllPullAtomInfo.find(key);
541 if (it != mAllPullAtomInfo.end()) {
542 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
543 /*registered=*/false);
544 }
545 mAllPullAtomInfo[key] = puller;
546 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
547 }
548
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)549 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
550 ATRACE_CALL();
551 std::lock_guard<std::mutex> _l(mLock);
552 PullerKey key = {.uid = uid, .atomTag = atomTag};
553 if (mAllPullAtomInfo.find(key) != mAllPullAtomInfo.end()) {
554 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
555 /*registered=*/false);
556 mAllPullAtomInfo.erase(key);
557 }
558 }
559
560 } // namespace statsd
561 } // namespace os
562 } // namespace android
563