1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define DEBUG false
18 #include "Log.h"
19
20 #include "StatsPullerManager.h"
21
22 #include <cutils/log.h>
23 #include <math.h>
24 #include <stdint.h>
25
26 #include <algorithm>
27 #include <iostream>
28
29 #include "../StatsService.h"
30 #include "../logd/LogEvent.h"
31 #include "../stats_log_util.h"
32 #include "../statscompanion_util.h"
33 #include "StatsCallbackPuller.h"
34 #include "TrainInfoPuller.h"
35 #include "statslog_statsd.h"
36
37 using std::shared_ptr;
38 using std::vector;
39
40 namespace android {
41 namespace os {
42 namespace statsd {
43
44 // Stores the puller as a wp to avoid holding a reference in case it is unregistered and
45 // pullAtomCallbackDied is never called.
46 struct PullAtomCallbackDeathCookie {
PullAtomCallbackDeathCookieandroid::os::statsd::PullAtomCallbackDeathCookie47 PullAtomCallbackDeathCookie(const wp<StatsPullerManager>& pullerManager,
48 const PullerKey& pullerKey, const wp<StatsPuller>& puller) :
49 mPullerManager(pullerManager), mPullerKey(pullerKey), mPuller(puller) {
50 }
51
52 wp<StatsPullerManager> mPullerManager;
53 PullerKey mPullerKey;
54 wp<StatsPuller> mPuller;
55 };
56
pullAtomCallbackDied(void * cookie)57 void StatsPullerManager::pullAtomCallbackDied(void* cookie) {
58 PullAtomCallbackDeathCookie* cookie_ = static_cast<PullAtomCallbackDeathCookie*>(cookie);
59 sp<StatsPullerManager> thiz = cookie_->mPullerManager.promote();
60 if (!thiz) {
61 return;
62 }
63
64 const PullerKey& pullerKey = cookie_->mPullerKey;
65 wp<StatsPuller> puller = cookie_->mPuller;
66
67 // Erase the mapping from the puller key to the puller if the mapping still exists.
68 // Note that we are removing the StatsPuller object, which internally holds the binder
69 // IPullAtomCallback. However, each new registration creates a new StatsPuller, so this works.
70 lock_guard<mutex> lock(thiz->mLock);
71 const auto& it = thiz->kAllPullAtomInfo.find(pullerKey);
72 if (it != thiz->kAllPullAtomInfo.end() && puller != nullptr && puller == it->second) {
73 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(pullerKey.atomTag,
74 /*registered=*/false);
75 thiz->kAllPullAtomInfo.erase(pullerKey);
76 }
77 // The death recipient corresponding to this specific IPullAtomCallback can never
78 // be triggered again, so free up resources.
79 delete cookie_;
80 }
81
82 // Values smaller than this may require to update the alarm.
83 const int64_t NO_ALARM_UPDATE = INT64_MAX;
84
StatsPullerManager()85 StatsPullerManager::StatsPullerManager()
86 : kAllPullAtomInfo({
87 // TrainInfo.
88 {{.atomTag = util::TRAIN_INFO, .uid = AID_STATSD}, new TrainInfoPuller()},
89 }),
90 mNextPullTimeNs(NO_ALARM_UPDATE),
91 mPullAtomCallbackDeathRecipient(AIBinder_DeathRecipient_new(pullAtomCallbackDied)) {
92 }
93
Pull(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)94 bool StatsPullerManager::Pull(int tagId, const ConfigKey& configKey, const int64_t eventTimeNs,
95 vector<shared_ptr<LogEvent>>* data) {
96 std::lock_guard<std::mutex> _l(mLock);
97 return PullLocked(tagId, configKey, eventTimeNs, data);
98 }
99
Pull(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<std::shared_ptr<LogEvent>> * data)100 bool StatsPullerManager::Pull(int tagId, const vector<int32_t>& uids, const int64_t eventTimeNs,
101 vector<std::shared_ptr<LogEvent>>* data) {
102 std::lock_guard<std::mutex> _l(mLock);
103 return PullLocked(tagId, uids, eventTimeNs, data);
104 }
105
PullLocked(int tagId,const ConfigKey & configKey,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)106 bool StatsPullerManager::PullLocked(int tagId, const ConfigKey& configKey,
107 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
108 vector<int32_t> uids;
109 const auto& uidProviderIt = mPullUidProviders.find(configKey);
110 if (uidProviderIt == mPullUidProviders.end()) {
111 ALOGE("Error pulling tag %d. No pull uid provider for config key %s", tagId,
112 configKey.ToString().c_str());
113 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
114 return false;
115 }
116 sp<PullUidProvider> pullUidProvider = uidProviderIt->second.promote();
117 if (pullUidProvider == nullptr) {
118 ALOGE("Error pulling tag %d, pull uid provider for config %s is gone.", tagId,
119 configKey.ToString().c_str());
120 StatsdStats::getInstance().notePullUidProviderNotFound(tagId);
121 return false;
122 }
123 uids = pullUidProvider->getPullAtomUids(tagId);
124 return PullLocked(tagId, uids, eventTimeNs, data);
125 }
126
PullLocked(int tagId,const vector<int32_t> & uids,const int64_t eventTimeNs,vector<shared_ptr<LogEvent>> * data)127 bool StatsPullerManager::PullLocked(int tagId, const vector<int32_t>& uids,
128 const int64_t eventTimeNs, vector<shared_ptr<LogEvent>>* data) {
129 VLOG("Initiating pulling %d", tagId);
130 for (int32_t uid : uids) {
131 PullerKey key = {.atomTag = tagId, .uid = uid};
132 auto pullerIt = kAllPullAtomInfo.find(key);
133 if (pullerIt != kAllPullAtomInfo.end()) {
134 bool ret = pullerIt->second->Pull(eventTimeNs, data);
135 VLOG("pulled %zu items", data->size());
136 if (!ret) {
137 StatsdStats::getInstance().notePullFailed(tagId);
138 }
139 return ret;
140 }
141 }
142 StatsdStats::getInstance().notePullerNotFound(tagId);
143 ALOGW("StatsPullerManager: Unknown tagId %d", tagId);
144 return false; // Return early since we don't know what to pull.
145 }
146
PullerForMatcherExists(int tagId) const147 bool StatsPullerManager::PullerForMatcherExists(int tagId) const {
148 // Pulled atoms might be registered after we parse the config, so just make sure the id is in
149 // an appropriate range.
150 return isVendorPulledAtom(tagId) || isPulledAtom(tagId);
151 }
152
updateAlarmLocked()153 void StatsPullerManager::updateAlarmLocked() {
154 if (mNextPullTimeNs == NO_ALARM_UPDATE) {
155 VLOG("No need to set alarms. Skipping");
156 return;
157 }
158
159 // TODO(b/151045771): do not hold a lock while making a binder call
160 if (mStatsCompanionService != nullptr) {
161 mStatsCompanionService->setPullingAlarm(mNextPullTimeNs / 1000000);
162 } else {
163 VLOG("StatsCompanionService not available. Alarm not set.");
164 }
165 return;
166 }
167
SetStatsCompanionService(shared_ptr<IStatsCompanionService> statsCompanionService)168 void StatsPullerManager::SetStatsCompanionService(
169 shared_ptr<IStatsCompanionService> statsCompanionService) {
170 std::lock_guard<std::mutex> _l(mLock);
171 shared_ptr<IStatsCompanionService> tmpForLock = mStatsCompanionService;
172 mStatsCompanionService = statsCompanionService;
173 for (const auto& pulledAtom : kAllPullAtomInfo) {
174 pulledAtom.second->SetStatsCompanionService(statsCompanionService);
175 }
176 if (mStatsCompanionService != nullptr) {
177 updateAlarmLocked();
178 }
179 }
180
RegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver,int64_t nextPullTimeNs,int64_t intervalNs)181 void StatsPullerManager::RegisterReceiver(int tagId, const ConfigKey& configKey,
182 wp<PullDataReceiver> receiver, int64_t nextPullTimeNs,
183 int64_t intervalNs) {
184 std::lock_guard<std::mutex> _l(mLock);
185 auto& receivers = mReceivers[{.atomTag = tagId, .configKey = configKey}];
186 for (auto it = receivers.begin(); it != receivers.end(); it++) {
187 if (it->receiver == receiver) {
188 VLOG("Receiver already registered of %d", (int)receivers.size());
189 return;
190 }
191 }
192 ReceiverInfo receiverInfo;
193 receiverInfo.receiver = receiver;
194
195 // Round it to the nearest minutes. This is the limit of alarm manager.
196 // In practice, we should always have larger buckets.
197 int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
198 // Scheduled pulling should be at least 1 min apart.
199 // This can be lower in cts tests, in which case we round it to 1 min.
200 if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
201 roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
202 }
203
204 receiverInfo.intervalNs = roundedIntervalNs;
205 receiverInfo.nextPullTimeNs = nextPullTimeNs;
206 receivers.push_back(receiverInfo);
207
208 // There is only one alarm for all pulled events. So only set it to the smallest denom.
209 if (nextPullTimeNs < mNextPullTimeNs) {
210 VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
211 mNextPullTimeNs = nextPullTimeNs;
212 updateAlarmLocked();
213 }
214 VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
215 }
216
UnRegisterReceiver(int tagId,const ConfigKey & configKey,wp<PullDataReceiver> receiver)217 void StatsPullerManager::UnRegisterReceiver(int tagId, const ConfigKey& configKey,
218 wp<PullDataReceiver> receiver) {
219 std::lock_guard<std::mutex> _l(mLock);
220 auto receiversIt = mReceivers.find({.atomTag = tagId, .configKey = configKey});
221 if (receiversIt == mReceivers.end()) {
222 VLOG("Unknown pull code or no receivers: %d", tagId);
223 return;
224 }
225 std::list<ReceiverInfo>& receivers = receiversIt->second;
226 for (auto it = receivers.begin(); it != receivers.end(); it++) {
227 if (receiver == it->receiver) {
228 receivers.erase(it);
229 VLOG("Puller for tagId %d unregistered of %d", tagId, (int)receivers.size());
230 return;
231 }
232 }
233 }
234
RegisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)235 void StatsPullerManager::RegisterPullUidProvider(const ConfigKey& configKey,
236 wp<PullUidProvider> provider) {
237 std::lock_guard<std::mutex> _l(mLock);
238 mPullUidProviders[configKey] = provider;
239 }
240
UnregisterPullUidProvider(const ConfigKey & configKey,wp<PullUidProvider> provider)241 void StatsPullerManager::UnregisterPullUidProvider(const ConfigKey& configKey,
242 wp<PullUidProvider> provider) {
243 std::lock_guard<std::mutex> _l(mLock);
244 const auto& it = mPullUidProviders.find(configKey);
245 if (it != mPullUidProviders.end() && it->second == provider) {
246 mPullUidProviders.erase(it);
247 }
248 }
249
OnAlarmFired(int64_t elapsedTimeNs)250 void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
251 std::lock_guard<std::mutex> _l(mLock);
252 int64_t wallClockNs = getWallClockNs();
253
254 int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
255
256 vector<pair<const ReceiverKey*, vector<ReceiverInfo*>>> needToPull;
257 for (auto& pair : mReceivers) {
258 vector<ReceiverInfo*> receivers;
259 if (pair.second.size() != 0) {
260 for (ReceiverInfo& receiverInfo : pair.second) {
261 if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
262 receivers.push_back(&receiverInfo);
263 } else {
264 if (receiverInfo.nextPullTimeNs < minNextPullTimeNs) {
265 minNextPullTimeNs = receiverInfo.nextPullTimeNs;
266 }
267 }
268 }
269 if (receivers.size() > 0) {
270 needToPull.push_back(make_pair(&pair.first, receivers));
271 }
272 }
273 }
274 for (const auto& pullInfo : needToPull) {
275 vector<shared_ptr<LogEvent>> data;
276 bool pullSuccess = PullLocked(pullInfo.first->atomTag, pullInfo.first->configKey,
277 elapsedTimeNs, &data);
278 if (!pullSuccess) {
279 VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
280 }
281
282 // Convention is to mark pull atom timestamp at request time.
283 // If we pull at t0, puller starts at t1, finishes at t2, and send back
284 // at t3, we mark t0 as its timestamp, which should correspond to its
285 // triggering event, such as condition change at t0.
286 // Here the triggering event is alarm fired from AlarmManager.
287 // In ValueMetricProducer and GaugeMetricProducer we do same thing
288 // when pull on condition change, etc.
289 for (auto& event : data) {
290 event->setElapsedTimestampNs(elapsedTimeNs);
291 event->setLogdWallClockTimestampNs(wallClockNs);
292 }
293
294 for (const auto& receiverInfo : pullInfo.second) {
295 sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
296 if (receiverPtr != nullptr) {
297 receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs);
298 // We may have just come out of a coma, compute next pull time.
299 int numBucketsAhead =
300 (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
301 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
302 if (receiverInfo->nextPullTimeNs < minNextPullTimeNs) {
303 minNextPullTimeNs = receiverInfo->nextPullTimeNs;
304 }
305 } else {
306 VLOG("receiver already gone.");
307 }
308 }
309 }
310
311 VLOG("mNextPullTimeNs: %lld updated to %lld", (long long)mNextPullTimeNs,
312 (long long)minNextPullTimeNs);
313 mNextPullTimeNs = minNextPullTimeNs;
314 updateAlarmLocked();
315 }
316
ForceClearPullerCache()317 int StatsPullerManager::ForceClearPullerCache() {
318 std::lock_guard<std::mutex> _l(mLock);
319 int totalCleared = 0;
320 for (const auto& pulledAtom : kAllPullAtomInfo) {
321 totalCleared += pulledAtom.second->ForceClearCache();
322 }
323 return totalCleared;
324 }
325
ClearPullerCacheIfNecessary(int64_t timestampNs)326 int StatsPullerManager::ClearPullerCacheIfNecessary(int64_t timestampNs) {
327 std::lock_guard<std::mutex> _l(mLock);
328 int totalCleared = 0;
329 for (const auto& pulledAtom : kAllPullAtomInfo) {
330 totalCleared += pulledAtom.second->ClearCacheIfNecessary(timestampNs);
331 }
332 return totalCleared;
333 }
334
RegisterPullAtomCallback(const int uid,const int32_t atomTag,const int64_t coolDownNs,const int64_t timeoutNs,const vector<int32_t> & additiveFields,const shared_ptr<IPullAtomCallback> & callback)335 void StatsPullerManager::RegisterPullAtomCallback(const int uid, const int32_t atomTag,
336 const int64_t coolDownNs, const int64_t timeoutNs,
337 const vector<int32_t>& additiveFields,
338 const shared_ptr<IPullAtomCallback>& callback) {
339 std::lock_guard<std::mutex> _l(mLock);
340 VLOG("RegisterPullerCallback: adding puller for tag %d", atomTag);
341
342 if (callback == nullptr) {
343 ALOGW("SetPullAtomCallback called with null callback for atom %d.", atomTag);
344 return;
345 }
346
347 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag, /*registered=*/true);
348 int64_t actualCoolDownNs = coolDownNs < kMinCoolDownNs ? kMinCoolDownNs : coolDownNs;
349 int64_t actualTimeoutNs = timeoutNs > kMaxTimeoutNs ? kMaxTimeoutNs : timeoutNs;
350
351 sp<StatsCallbackPuller> puller = new StatsCallbackPuller(atomTag, callback, actualCoolDownNs,
352 actualTimeoutNs, additiveFields);
353 PullerKey key = {.atomTag = atomTag, .uid = uid};
354 AIBinder_linkToDeath(callback->asBinder().get(), mPullAtomCallbackDeathRecipient.get(),
355 new PullAtomCallbackDeathCookie(this, key, puller));
356 kAllPullAtomInfo[key] = puller;
357 }
358
UnregisterPullAtomCallback(const int uid,const int32_t atomTag)359 void StatsPullerManager::UnregisterPullAtomCallback(const int uid, const int32_t atomTag) {
360 std::lock_guard<std::mutex> _l(mLock);
361 PullerKey key = {.atomTag = atomTag, .uid = uid};
362 if (kAllPullAtomInfo.find(key) != kAllPullAtomInfo.end()) {
363 StatsdStats::getInstance().notePullerCallbackRegistrationChanged(atomTag,
364 /*registered=*/false);
365 kAllPullAtomInfo.erase(key);
366 }
367 }
368
369 } // namespace statsd
370 } // namespace os
371 } // namespace android
372