• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false
18 #include "Log.h"
19 
20 #include "StatsPullerManager.h"
21 
22 #include <com_android_os_statsd_flags.h>
23 #include <cutils/log.h>
24 #include <math.h>
25 #include <stdint.h>
26 
27 #include <algorithm>
28 #include <atomic>
29 #include <iostream>
30 
31 #include "../StatsService.h"
32 #include "../logd/LogEvent.h"
33 #include "../stats_log_util.h"
34 #include "../statscompanion_util.h"
35 #include "StatsCallbackPuller.h"
36 #include "TrainInfoPuller.h"
37 #include "statslog_statsd.h"
38 
39 using std::shared_ptr;
40 using std::vector;
41 
42 namespace flags = com::android::os::statsd::flags;
43 
44 namespace android {
45 namespace os {
46 namespace statsd {
47 
48 // Values smaller than this may require to update the alarm.
49 const int64_t NO_ALARM_UPDATE = INT64_MAX;
50 // Use 3 threads to avoid overwhelming system server binder threads
51 const int32_t PULLER_THREAD_COUNT = 3;
52 
pullImpl(const PullerKey & key,const sp<StatsPuller> & puller,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)53 static PullErrorCode pullImpl(const PullerKey& key, const sp<StatsPuller>& puller,
54                               const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
55     VLOG("Initiating pulling %d", key.atomTag);
56     PullErrorCode status = puller->Pull(eventTimeNs, data);
57     VLOG("pulled %zu items", data->size());
58     if (status != PULL_SUCCESS) {
59         StatsdStats::getInstance().notePullFailed(key.atomTag);
60     }
61     return status;
62 }
63 
StatsPullerManager()64 StatsPullerManager::StatsPullerManager()
65     : mAllPullAtomInfo({
66               // TrainInfo.
67               {{.uid = AID_STATSD, .atomTag = util::TRAIN_INFO}, new TrainInfoPuller()},
68       }),
69       mNextPullTimeNs(NO_ALARM_UPDATE) {
70 }
71 
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)72 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
73                               vector<shared_ptr<LogEvent>>* data) {
74     ATRACE_CALL();
75     std::lock_guard<std::mutex> _l(mLock);
76     return PullLocked(tagId, configKey, eventTimeNs, data);
77 }
78 
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)79 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
80                               vector<std::shared_ptr<LogEvent>>* data) {
81     ATRACE_CALL();
82     std::lock_guard<std::mutex> _l(mLock);
83     return PullLocked(tagId, uids, eventTimeNs, data);
84 }
85 
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)86 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
87                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
88     vector<int32_t> uids;
89     if (!getPullerUidsLocked(tagId, configKey, uids)) {
90         return false;
91     }
92     return PullLocked(tagId, uids, eventTimeNs, data);
93 }
94 
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)95 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
96                                     const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
97     VLOG("Initiating pulling %d", tagId);
98     for (int32_t uid : uids) {
99         PullerKey key = {.uid = uid, .atomTag = tagId};
100         auto pullerIt = mAllPullAtomInfo.find(key);
101         if (pullerIt != mAllPullAtomInfo.end()) {
102             PullErrorCode status = PULL_SUCCESS;
103             if (flags::parallel_pulls()) {
104                 status = pullImpl(key, pullerIt->second, eventTimeNs, data);
105             } else {
106                 status = pullerIt->second->Pull(eventTimeNs, data);
107                 VLOG("pulled %zu items", data->size());
108                 if (status != PULL_SUCCESS) {
109                     StatsdStats::getInstance().notePullFailed(tagId);
110                 }
111             }
112             // If we received a dead object exception, it means the client process has died.
113             // We can remove the puller from the map.
114             if (status == PULL_DEAD_OBJECT) {
115                 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(
116                         tagId,
117                         /*registered=*/false);
118                 mAllPullAtomInfo.erase(pullerIt);
119             }
120             return status == PULL_SUCCESS;
121         }
122     }
123     StatsdStats::getInstance().notePullerNotFound(tagId);
124     ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
125     return false;  // Return early since we don't know what to pull.
126 }
127 
PullerForMatcherExists(int tagId) const128 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
129     // Pulled atoms might be registered after we parse the config, so just make sure the id is in
130     // an appropriate range.
131     return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
132 }
133 
updateAlarmLocked()134 void StatsPullerManager::updateAlarmLocked() {
135     if (mNextPullTimeNs == NO_ALARM_UPDATE) {
136         VLOG("No need to set alarms. Skipping");
137         return;
138     }
139 
140     // TODO(b/151045771): do not hold a lock while making a binder call
141     if (mStatsCompanionService != nullptr) {
142         mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
143     } else {
144         VLOG("StatsCompanionService not available. Alarm not set.");
145     }
146     return;
147 }
148 
SetStatsCompanionService(const shared_ptr<IStatsCompanionService> & statsCompanionService)149 void StatsPullerManager::SetStatsCompanionService(
150         const shared_ptr<IStatsCompanionService>& statsCompanionService) {
151     std::lock_guard<std::mutex> _l(mLock);
152     shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
153     mStatsCompanionService = statsCompanionService;
154     for (const auto& pulledAtom : mAllPullAtomInfo) {
155         pulledAtom.second->SetStatsCompanionService(statsCompanionService);
156     }
157     if (mStatsCompanionService != nullptr) {
158         updateAlarmLocked();
159     }
160 }
161 
RegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver,int64_t nextPullTimeNs,int64_t intervalNs)162 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
163                                           const wp<PullDataReceiver>& receiver,
164                                           int64_t nextPullTimeNs, int64_t intervalNs) {
165     std::lock_guard<std::mutex> _l(mLock);
166     auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
167     for (auto it = receivers.begin(); it != receivers.end(); it++) {
168         if (it->receiver == receiver) {
169             VLOG("Receiver already registered of %d", (int)receivers.size());
170             return;
171         }
172     }
173     ReceiverInfo receiverInfo;
174     receiverInfo.receiver = receiver;
175 
176     // Round it to the nearest minutes. This is the limit of alarm manager.
177     // In practice, we should always have larger buckets.
178     int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
179     // Scheduled pulling should be at least 1 min apart.
180     // This can be lower in cts tests, in which case we round it to 1 min.
181     if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
182         roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
183     }
184 
185     receiverInfo.intervalNs = roundedIntervalNs;
186     receiverInfo.nextPullTimeNs = nextPullTimeNs;
187     receivers.push_back(receiverInfo);
188 
189     // There is only one alarm for all pulled events. So only set it to the smallest denom.
190     if (nextPullTimeNs < mNextPullTimeNs) {
191         VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
192         mNextPullTimeNs = nextPullTimeNs;
193         updateAlarmLocked();
194     }
195     VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
196 }
197 
UnRegisterReceiver(int tagId,const ConfigKey & configKey,const wp<PullDataReceiver> & receiver)198 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
199                                             const wp<PullDataReceiver>& receiver) {
200     std::lock_guard<std::mutex> _l(mLock);
201     auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
202     if (receiversIt == mReceivers.end()) {
203         VLOG("Unknown pull code or no receivers: %d", tagId);
204         return;
205     }
206     std::list<ReceiverInfo>& receivers = receiversIt->second;
207     for (auto it = receivers.begin(); it != receivers.end(); it++) {
208         if (receiver == it->receiver) {
209             receivers.erase(it);
210             VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
211             return;
212         }
213     }
214 }
215 
RegisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)216 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
217                                                  const wp<PullUidProvider>& provider) {
218     std::lock_guard<std::mutex> _l(mLock);
219     mPullUidProviders[configKey] = provider;
220 }
221 
UnregisterPullUidProvider(const ConfigKey & configKey,const wp<PullUidProvider> & provider)222 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
223                                                    const wp<PullUidProvider>& provider) {
224     std::lock_guard<std::mutex> _l(mLock);
225     const auto& it = mPullUidProviders.find(configKey);
226     if (it != mPullUidProviders.end() && it->second == provider) {
227         mPullUidProviders.erase(it);
228     }
229 }
230 
processPullerQueue(ThreadSafeQueue<StatsPullerManager::PullerParams> & pullerQueue,std::queue<StatsPullerManager::PulledInfo> & pulledData,const int64_t wallClockNs,const int64_t elapsedTimeNs,std::atomic_int & pendingThreads,std::condition_variable & mainThreadCondition,std::mutex & mainThreadConditionLock)231 static void processPullerQueue(ThreadSafeQueue<StatsPullerManager::PullerParams>& pullerQueue,
232                                std::queue<StatsPullerManager::PulledInfo>& pulledData,
233                                const int64_t wallClockNs, const int64_t elapsedTimeNs,
234                                std::atomic_int& pendingThreads,
235                                std::condition_variable& mainThreadCondition,
236                                std::mutex& mainThreadConditionLock) {
237     std::optional<StatsPullerManager::PullerParams> queueResult = pullerQueue.pop();
238     while (queueResult.has_value()) {
239         const StatsPullerManager::PullerParams pullerParams = queueResult.value();
240         vector<shared_ptr<LogEvent>> data;
241         PullErrorCode pullErrorCode =
242                 pullImpl(pullerParams.key, pullerParams.puller, elapsedTimeNs, &data);
243 
244         if (pullErrorCode != PULL_SUCCESS) {
245             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
246         }
247 
248         // Convention is to mark pull atom timestamp at request time.
249         // If we pull at t0, puller starts at t1, finishes at t2, and send back
250         // at t3, we mark t0 as its timestamp, which should correspond to its
251         // triggering event, such as condition change at t0.
252         // Here the triggering event is alarm fired from AlarmManager.
253         // In ValueMetricProducer and GaugeMetricProducer we do same thing
254         // when pull on condition change, etc.
255         for (auto& event : data) {
256             event->setElapsedTimestampNs(elapsedTimeNs);
257             event->setLogdWallClockTimestampNs(wallClockNs);
258         }
259 
260         StatsPullerManager::PulledInfo pulledInfo;
261         pulledInfo.pullErrorCode = pullErrorCode;
262         pulledInfo.pullerKey = pullerParams.key;
263         pulledInfo.receiverInfo = std::move(pullerParams.receivers);
264         pulledInfo.data = std::move(data);
265         mainThreadConditionLock.lock();
266         pulledData.push(pulledInfo);
267         mainThreadConditionLock.unlock();
268         mainThreadCondition.notify_one();
269 
270         queueResult = pullerQueue.pop();
271     }
272     pendingThreads--;
273     mainThreadCondition.notify_one();
274 }
275 
OnAlarmFired(int64_t elapsedTimeNs)276 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
277     ATRACE_CALL();
278     std::lock_guard<std::mutex> _l(mLock);
279     int64_t wallClockNs = getWallClockNs();
280 
281     int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
282     if (flags::parallel_pulls()) {
283         ThreadSafeQueue<PullerParams> pullerQueue;
284         std::queue<PulledInfo> pulledData;
285         initPullerQueue(pullerQueue, pulledData, elapsedTimeNs, minNextPullTimeNs);
286         std::mutex mainThreadConditionLock;
287         std::condition_variable waitForPullerThreadsCondition;
288         vector<thread> pullerThreads;
289         std::atomic_int pendingThreads = PULLER_THREAD_COUNT;
290         pullerThreads.reserve(PULLER_THREAD_COUNT);
291         // Spawn multiple threads to simultaneously pull all necessary pullers. These pullers push
292         // the pulled data to a queue for the main thread to process.
293         for (int i = 0; i < PULLER_THREAD_COUNT; ++i) {
294             pullerThreads.emplace_back(
295                     processPullerQueue, std::ref(pullerQueue), std::ref(pulledData), wallClockNs,
296                     elapsedTimeNs, std::ref(pendingThreads),
297                     std::ref(waitForPullerThreadsCondition), std::ref(mainThreadConditionLock));
298         }
299 
300         // Process all pull results on the main thread without waiting for the puller threads
301         // to finish.
302         while (true) {
303             std::unique_lock<std::mutex> lock(mainThreadConditionLock);
304             waitForPullerThreadsCondition.wait(lock, [&pulledData, &pendingThreads]() -> bool {
305                 return pendingThreads == 0 || !pulledData.empty();
306             });
307             if (!pulledData.empty()) {
308                 const PulledInfo pullResultInfo = std::move(pulledData.front());
309                 pulledData.pop();
310                 const PullErrorCode pullErrorCode = pullResultInfo.pullErrorCode;
311                 const vector<ReceiverInfo*>& receiverInfos = pullResultInfo.receiverInfo;
312                 const vector<shared_ptr<LogEvent>>& data = pullResultInfo.data;
313                 for (const auto& receiverInfo : receiverInfos) {
314                     sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
315                     if (receiverPtr != nullptr) {
316                         PullResult pullResult = pullErrorCode == PULL_SUCCESS
317                                                         ? PullResult::PULL_RESULT_SUCCESS
318                                                         : PullResult::PULL_RESULT_FAIL;
319                         receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
320                         // We may have just come out of a coma, compute next pull time.
321                         int numBucketsAhead = (elapsedTimeNs - receiverInfo->nextPullTimeNs) /
322                                               receiverInfo->intervalNs;
323                         receiverInfo->nextPullTimeNs +=
324                                 (numBucketsAhead + 1) * receiverInfo->intervalNs;
325                         minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
326                     } else {
327                         VLOG("receiver already gone.");
328                     }
329                 }
330                 if (pullErrorCode == PULL_DEAD_OBJECT) {
331                     mAllPullAtomInfo.erase(pullResultInfo.pullerKey);
332                 }
333                 // else if is used here for the edge case of all threads being completed but
334                 // there are remaining pulled results in the queue to process.
335             } else if (pendingThreads == 0) {
336                 break;
337             }
338         }
339 
340         for (thread& pullerThread : pullerThreads) {
341             pullerThread.join();
342         }
343 
344     } else {
345         onAlarmFiredSynchronous(elapsedTimeNs, wallClockNs, minNextPullTimeNs);
346     }
347     VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
348          (long long)minNextPullTimeNs);
349     mNextPullTimeNs = minNextPullTimeNs;
350     updateAlarmLocked();
351 }
352 
onAlarmFiredSynchronous(const int64_t elapsedTimeNs,const int64_t wallClockNs,int64_t & minNextPullTimeNs)353 void StatsPullerManager::onAlarmFiredSynchronous(const int64_t elapsedTimeNs,
354                                                  const int64_t wallClockNs,
355                                                  int64_t& minNextPullTimeNs) {
356     vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
357     for (auto& pair : mReceivers) {
358         vector<ReceiverInfo*> receivers;
359         if (pair.second.size() != 0) {
360             for (ReceiverInfo& receiverInfo : pair.second) {
361                 // If pullNecessary and enough time has passed for the next bucket, then add
362                 // receiver to the list that will pull on this alarm.
363                 // If pullNecessary is false, check if next pull time needs to be updated.
364                 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
365                 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr &&
366                     receiverPtr->isPullNeeded()) {
367                     receivers.push_back(&receiverInfo);
368                 } else {
369                     if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
370                         receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
371                         int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
372                                               receiverInfo.intervalNs;
373                         receiverInfo.nextPullTimeNs +=
374                                 (numBucketsAhead + 1) * receiverInfo.intervalNs;
375                     }
376                     minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
377                 }
378             }
379             if (receivers.size() > 0) {
380                 needToPull.push_back(make_pair(&pair.first, receivers));
381             }
382         }
383     }
384     for (const auto& pullInfo : needToPull) {
385         vector<shared_ptr<LogEvent>> data;
386         PullResult pullResult =
387                 PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey, elapsedTimeNs, &data)
388                         ? PullResult::PULL_RESULT_SUCCESS
389                         : PullResult::PULL_RESULT_FAIL;
390         if (pullResult == PullResult::PULL_RESULT_FAIL) {
391             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
392         }
393 
394         // Convention is to mark pull atom timestamp at request time.
395         // If we pull at t0, puller starts at t1, finishes at t2, and send back
396         // at t3, we mark t0 as its timestamp, which should correspond to its
397         // triggering event, such as condition change at t0.
398         // Here the triggering event is alarm fired from AlarmManager.
399         // In ValueMetricProducer and GaugeMetricProducer we do same thing
400         // when pull on condition change, etc.
401         for (auto& event : data) {
402             event->setElapsedTimestampNs(elapsedTimeNs);
403             event->setLogdWallClockTimestampNs(wallClockNs);
404         }
405 
406         for (const auto& receiverInfo : pullInfo.second) {
407             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
408             if (receiverPtr != nullptr) {
409                 receiverPtr->onDataPulled(data, pullResult, elapsedTimeNs);
410                 // We may have just come out of a coma, compute next pull time.
411                 int numBucketsAhead =
412                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
413                 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
414                 minNextPullTimeNs = min(receiverInfo->nextPullTimeNs, minNextPullTimeNs);
415             } else {
416                 VLOG("receiver already gone.");
417             }
418         }
419     }
420 }
421 
getPullerUidsLocked(const int tagId,const ConfigKey & configKey,vector<int32_t> & uids)422 bool StatsPullerManager::getPullerUidsLocked(const int tagId, const ConfigKey& configKey,
423                                              vector<int32_t>& uids) {
424     const auto& uidProviderIt = mPullUidProviders.find(configKey);
425     if (uidProviderIt == mPullUidProviders.end()) {
426         ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
427               configKey.ToString().c_str());
428         StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
429         return false;
430     }
431     sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
432     if (pullUidProvider == nullptr) {
433         ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
434               configKey.ToString().c_str());
435         StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
436         return false;
437     }
438     uids = pullUidProvider->getPullAtomUids(tagId);
439     return true;
440 }
441 
initPullerQueue(ThreadSafeQueue<PullerParams> & pullerQueue,std::queue<PulledInfo> & pulledData,const int64_t elapsedTimeNs,int64_t & minNextPullTimeNs)442 void StatsPullerManager::initPullerQueue(ThreadSafeQueue<PullerParams>& pullerQueue,
443                                          std::queue<PulledInfo>& pulledData,
444                                          const int64_t elapsedTimeNs, int64_t& minNextPullTimeNs) {
445     for (auto& pair : mReceivers) {
446         vector<ReceiverInfo*> receivers;
447         if (pair.second.size() != 0) {
448             for (ReceiverInfo& receiverInfo : pair.second) {
449                 // If pullNecessary and enough time has passed for the next bucket, then add
450                 // receiver to the list that will pull on this alarm.
451                 // If pullNecessary is false, check if next pull time needs to be updated.
452                 sp<PullDataReceiver> receiverPtr = receiverInfo.receiver.promote();
453                 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs && receiverPtr != nullptr &&
454                     receiverPtr->isPullNeeded()) {
455                     receivers.push_back(&receiverInfo);
456                 } else {
457                     if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
458                         receiverPtr->onDataPulled({}, PullResult::PULL_NOT_NEEDED, elapsedTimeNs);
459                         int numBucketsAhead = (elapsedTimeNs - receiverInfo.nextPullTimeNs) /
460                                               receiverInfo.intervalNs;
461                         receiverInfo.nextPullTimeNs +=
462                                 (numBucketsAhead + 1) * receiverInfo.intervalNs;
463                     }
464                     minNextPullTimeNs = min(receiverInfo.nextPullTimeNs, minNextPullTimeNs);
465                 }
466             }
467             if (receivers.size() > 0) {
468                 bool foundPuller = false;
469                 int tagId = pair.first.atomTag;
470                 vector<int32_t> uids;
471                 if (getPullerUidsLocked(tagId, pair.first.configKey, uids)) {
472                     for (int32_t uid : uids) {
473                         PullerKey key = {.uid = uid, .atomTag = tagId};
474                         auto pullerIt = mAllPullAtomInfo.find(key);
475                         if (pullerIt != mAllPullAtomInfo.end()) {
476                             PullerParams params;
477                             params.key = key;
478                             params.puller = pullerIt->second;
479                             params.receivers = std::move(receivers);
480                             pullerQueue.push(params);
481                             foundPuller = true;
482                             break;
483                         }
484                     }
485                     if (!foundPuller) {
486                         StatsdStats::getInstance().notePullerNotFound(tagId);
487                         ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
488                     }
489                 }
490                 if (!foundPuller) {
491                     PulledInfo pulledInfo;
492                     pulledInfo.pullErrorCode = PullErrorCode::PULL_FAIL;
493                     pulledInfo.receiverInfo = std::move(receivers);
494                     pulledData.push(pulledInfo);
495                 }
496             }
497         }
498     }
499 }
500 
ForceClearPullerCache()501 int StatsPullerManager::ForceClearPullerCache() {
502     ATRACE_CALL();
503     std::lock_guard<std::mutex> _l(mLock);
504     int totalCleared = 0;
505     for (const auto& pulledAtom : mAllPullAtomInfo) {
506         totalCleared += pulledAtom.second->ForceClearCache();
507     }
508     return totalCleared;
509 }
510 
ClearPullerCacheIfNecessary(int64_t timestampNs)511 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
512     ATRACE_CALL();
513     std::lock_guard<std::mutex> _l(mLock);
514     int totalCleared = 0;
515     for (const auto& pulledAtom : mAllPullAtomInfo) {
516         totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
517     }
518     return totalCleared;
519 }
520 
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)521 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
522                                                   const int64_t coolDownNs, const int64_t timeoutNs,
523                                                   const vector<int32_t>& additiveFields,
524                                                   const shared_ptr<IPullAtomCallback>& callback) {
525     ATRACE_CALL();
526     std::lock_guard<std::mutex> _l(mLock);
527     VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
528 
529     if (callback == nullptr) {
530         ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
531         return;
532     }
533 
534     int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
535     int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
536 
537     sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
538                                                              actualTimeoutNs, additiveFields);
539     PullerKey key = {.uid = uid, .atomTag = atomTag};
540     auto it = mAllPullAtomInfo.find(key);
541     if (it != mAllPullAtomInfo.end()) {
542         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
543                                                                          /*registered=*/false);
544     }
545     mAllPullAtomInfo[key] = puller;
546     StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
547 }
548 
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)549 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
550     ATRACE_CALL();
551     std::lock_guard<std::mutex> _l(mLock);
552     PullerKey key = {.uid = uid, .atomTag = atomTag};
553     if (mAllPullAtomInfo.find(key) != mAllPullAtomInfo.end()) {
554         StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
555                                                                          /*registered=*/false);
556         mAllPullAtomInfo.erase(key);
557     }
558 }
559 
560 }  // namespace statsd
561 }  // namespace os
562 }  // namespace android
563