1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define STATSD_DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriberClient.h"
20
21 #include "FieldValue.h"
22 #include "guardrail/StatsdStats.h"
23 #include "matchers/matcher_util.h"
24 #include "stats_log_util.h"
25
26 using android::base::unique_fd;
27 using Status = ::ndk::ScopedAStatus;
28
29 namespace android {
30 namespace os {
31 namespace statsd {
32
33 const static int FIELD_ID_SHELL_DATA__ATOM = 1;
34 const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
35 const static int FIELD_ID_SHELL_DATA__LOGGING_UID = 3;
36
37 // Store next subscription ID for StatsdStats.
38 // Not thread-safe; should only be accessed while holding ShellSubscriber::mMutex lock.
39 static int nextSubId = 0;
40
41 struct ReadConfigResult {
42 vector<SimpleAtomMatcher> pushedMatchers;
43 vector<ShellSubscriberClient::PullInfo> pullInfo;
44 bool collect_uids;
45 };
46
47 // Read and parse single config. There should only one config in the input.
readConfig(const vector<uint8_t> & configBytes,int64_t startTimeMs,int64_t minPullIntervalMs)48 static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
49 int64_t startTimeMs, int64_t minPullIntervalMs) {
50 // Parse the config.
51 ShellSubscription config;
52 if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
53 ALOGE("ShellSubscriberClient: failed to parse the config");
54 return nullopt;
55 }
56
57 ReadConfigResult result;
58
59 result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end());
60
61 vector<ShellSubscriberClient::PullInfo> pullInfo;
62 for (const auto& pulled : config.pulled()) {
63 vector<string> packages;
64 vector<int32_t> uids;
65 for (const string& pkg : pulled.packages()) {
66 auto it = UidMap::sAidToUidMapping.find(pkg);
67 if (it != UidMap::sAidToUidMapping.end()) {
68 uids.push_back(it->second);
69 } else {
70 packages.push_back(pkg);
71 }
72 }
73
74 const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs);
75 result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids);
76 ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
77 pulled.matcher().atom_id());
78 }
79
80 result.collect_uids = config.collect_uids();
81
82 return result;
83 }
84
PullInfo(const SimpleAtomMatcher & matcher,int64_t startTimeMs,int64_t intervalMs,const std::vector<std::string> & packages,const std::vector<int32_t> & uids)85 ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
86 int64_t intervalMs,
87 const std::vector<std::string>& packages,
88 const std::vector<int32_t>& uids)
89 : mPullerMatcher(matcher),
90 mIntervalMs(intervalMs),
91 mPrevPullElapsedRealtimeMs(startTimeMs),
92 mPullPackages(packages),
93 mPullUids(uids) {
94 }
95
ShellSubscriberClient(int id,int out,const std::shared_ptr<IStatsSubscriptionCallback> & callback,const std::vector<SimpleAtomMatcher> & pushedMatchers,const std::vector<PullInfo> & pulledInfo,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)96 ShellSubscriberClient::ShellSubscriberClient(
97 int id, int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
98 const std::vector<SimpleAtomMatcher>& pushedMatchers,
99 const std::vector<PullInfo>& pulledInfo, int64_t timeoutSec, int64_t startTimeSec,
100 const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr)
101 : mId(id),
102 mUidMap(uidMap),
103 mPullerMgr(pullerMgr),
104 mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)),
105 mPushedMatchers(pushedMatchers),
106 mPulledInfo(pulledInfo),
107 mCallback(callback),
108 mTimeoutSec(timeoutSec),
109 mStartTimeSec(startTimeSec),
110 mLastWriteMs(startTimeSec * 1000),
111 mCacheSize(0){};
112
create(int in,int out,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)113 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
114 int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
115 const sp<StatsPullerManager>& pullerMgr) {
116 // Read the size of the config.
117 size_t bufferSize;
118 if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) {
119 return nullptr;
120 }
121
122 // Check bufferSize
123 if (bufferSize > (kMaxSizeKb * 1024)) {
124 ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
125 "bytes)",
126 bufferSize, (kMaxSizeKb * 1024));
127 return nullptr;
128 }
129
130 // Read the config.
131 vector<uint8_t> buffer(bufferSize);
132 if (!android::base::ReadFully(in, buffer.data(), bufferSize)) {
133 ALOGE("ShellSubscriberClient: failed to read the config from file descriptor");
134 return nullptr;
135 }
136
137 const optional<ReadConfigResult> readConfigResult =
138 readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0);
139 if (!readConfigResult.has_value()) {
140 return nullptr;
141 }
142
143 auto result = unique_ptr<ShellSubscriberClient>(new ShellSubscriberClient(
144 nextSubId++, out, /*callback=*/nullptr, readConfigResult->pushedMatchers,
145 readConfigResult->pullInfo, timeoutSec, startTimeSec, uidMap, pullerMgr));
146 if (result != nullptr) {
147 result->setCollectUids(readConfigResult->collect_uids);
148 }
149 return result;
150 }
151
create(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)152 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
153 const vector<uint8_t>& subscriptionConfig,
154 const shared_ptr<IStatsSubscriptionCallback>& callback, int64_t startTimeSec,
155 const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr) {
156 if (callback == nullptr) {
157 ALOGE("ShellSubscriberClient: received nullptr callback");
158 return nullptr;
159 }
160
161 if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) {
162 ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
163 "bytes)",
164 subscriptionConfig.size(), (kMaxSizeKb * 1024));
165 return nullptr;
166 }
167
168 const optional<ReadConfigResult> readConfigResult =
169 readConfig(subscriptionConfig, startTimeSec * 1000,
170 ShellSubscriberClient::kMinCallbackPullIntervalMs);
171 if (!readConfigResult.has_value()) {
172 return nullptr;
173 }
174
175 const int id = nextSubId++;
176
177 StatsdStats::getInstance().noteSubscriptionStarted(id, readConfigResult->pushedMatchers.size(),
178 readConfigResult->pullInfo.size());
179 auto result = unique_ptr<ShellSubscriberClient>(new ShellSubscriberClient(
180 id, /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
181 /*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr));
182 if (result != nullptr) {
183 result->setCollectUids(readConfigResult->collect_uids);
184 }
185 return result;
186 }
187
writeEventToProtoIfMatched(const LogEvent & event,const SimpleAtomMatcher & matcher,const sp<UidMap> & uidMap)188 bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event,
189 const SimpleAtomMatcher& matcher,
190 const sp<UidMap>& uidMap) {
191 auto [matched, transformedEvent] = matchesSimple(mUidMap, matcher, event);
192 if (!matched) {
193 return false;
194 }
195 const LogEvent& eventRef = transformedEvent == nullptr ? event : *transformedEvent;
196
197 // Cache atom event in mProtoOut.
198 uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED |
199 FIELD_ID_SHELL_DATA__ATOM);
200 eventRef.ToProto(mProtoOut);
201 mProtoOut.end(atomToken);
202
203 const int64_t timestampNs = truncateTimestampIfNecessary(eventRef);
204 mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
205 FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
206 static_cast<long long>(timestampNs));
207
208 // Update byte size of cached data.
209 mCacheSize += getSize(eventRef.getValues()) + sizeof(timestampNs);
210
211 if (mDoCollectUids) {
212 mProtoOut.write(util::FIELD_TYPE_INT32 | util::FIELD_COUNT_REPEATED |
213 FIELD_ID_SHELL_DATA__LOGGING_UID,
214 eventRef.GetUid());
215 mCacheSize += sizeof(int32_t);
216 }
217
218 return true;
219 }
220
221 // Called by ShellSubscriber when a pushed event occurs
onLogEvent(const LogEvent & event)222 void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
223 for (const auto& matcher : mPushedMatchers) {
224 if (writeEventToProtoIfMatched(event, matcher, mUidMap)) {
225 flushProtoIfNeeded();
226 break;
227 }
228 }
229 }
230
flushProtoIfNeeded()231 void ShellSubscriberClient::flushProtoIfNeeded() {
232 if (mCallback == nullptr) { // Using file descriptor.
233 triggerFdFlush();
234 } else if (mCacheSize >= kMaxCacheSizeBytes) { // Using callback.
235 // Flush data if cache is full.
236 triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
237 }
238 }
239
pullIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)240 int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
241 int64_t sleepTimeMs = 24 * 60 * 60 * 1000; // 24 hours.
242 for (PullInfo& pullInfo : mPulledInfo) {
243 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) {
244 vector<int32_t> uids;
245 getUidsForPullAtom(&uids, pullInfo);
246
247 vector<shared_ptr<LogEvent>> data;
248 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
249 VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
250 pullInfo.mPullerMatcher.atom_id());
251 if (mCallback != nullptr) { // Callback subscription
252 StatsdStats::getInstance().noteSubscriptionAtomPulled(
253 pullInfo.mPullerMatcher.atom_id());
254 }
255
256 writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
257 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
258 }
259
260 // Determine how long to sleep before doing more work.
261 const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
262
263 const int64_t timeBeforePullMs =
264 nextPullTimeMs - nowMillis; // guaranteed to be non-negative
265 sleepTimeMs = min(sleepTimeMs, timeBeforePullMs);
266 }
267 return sleepTimeMs;
268 }
269
270 // The pullAndHeartbeat threads sleep for the minimum time
271 // among all clients' input
pullAndSendHeartbeatsIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)272 int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
273 int64_t nowNanos) {
274 int64_t sleepTimeMs;
275 if (mCallback == nullptr) { // File descriptor subscription
276 if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
277 mClientAlive = false;
278 return kMsBetweenHeartbeats;
279 }
280
281 sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos));
282
283 // Send a heartbeat consisting of data size of 0, if
284 // the user hasn't recently received data from statsd. When it receives the data size of 0,
285 // the user will not expect any atoms and recheck whether the subscription should end.
286 if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
287 triggerFdFlush();
288 if (!mClientAlive) return kMsBetweenHeartbeats;
289 }
290
291 int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis;
292 sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat);
293 } else { // Callback subscription.
294 sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos));
295
296 if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) {
297 // Flush data if cache has kept data for longer than kMsBetweenCallbacks.
298 triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
299 }
300
301 // Cache should be flushed kMsBetweenCallbacks after mLastWrite.
302 const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis;
303
304 // For callback subscriptions, ensure minimum sleep time is at least
305 // kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left
306 // before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the
307 // effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from
308 // now to have their pulls batched together, mitigating frequent wakeups of the puller
309 // thread.
310 sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs));
311 }
312 return sleepTimeMs;
313 }
314
writePulledAtomsLocked(const vector<shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)315 void ShellSubscriberClient::writePulledAtomsLocked(const vector<shared_ptr<LogEvent>>& data,
316 const SimpleAtomMatcher& matcher) {
317 bool hasData = false;
318 for (const shared_ptr<LogEvent>& event : data) {
319 if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) {
320 hasData = true;
321 }
322 }
323
324 if (hasData) {
325 flushProtoIfNeeded();
326 }
327 }
328
329 // Tries to write the atom encoded in mProtoOut to the pipe. If the write fails
330 // because the read end of the pipe has closed, change the client status so
331 // the manager knows the subscription is no longer active
attemptWriteToPipeLocked()332 void ShellSubscriberClient::attemptWriteToPipeLocked() {
333 const size_t dataSize = mProtoOut.size();
334 // First, write the payload size.
335 if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
336 mClientAlive = false;
337 return;
338 }
339 // Then, write the payload if this is not just a heartbeat.
340 if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) {
341 mClientAlive = false;
342 return;
343 }
344 mLastWriteMs = getElapsedRealtimeMillis();
345 }
346
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)347 void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
348 uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
349 // This is slow. Consider storing the uids per app and listening to uidmap updates.
350 for (const string& pkg : pullInfo.mPullPackages) {
351 set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
352 uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
353 }
354 uids->push_back(DEFAULT_PULL_UID);
355 }
356
clearCache()357 void ShellSubscriberClient::clearCache() {
358 mProtoOut.clear();
359 mCacheSize = 0;
360 }
361
triggerFdFlush()362 void ShellSubscriberClient::triggerFdFlush() {
363 attemptWriteToPipeLocked();
364 clearCache();
365 }
366
triggerCallback(StatsSubscriptionCallbackReason reason)367 void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) {
368 // Invoke Binder callback with cached event data.
369 vector<uint8_t> payloadBytes;
370 mProtoOut.serializeToVector(&payloadBytes);
371 StatsdStats::getInstance().noteSubscriptionFlushed(mId);
372 const Status status = mCallback->onSubscriptionData(reason, payloadBytes);
373 if (status.getStatus() == STATUS_DEAD_OBJECT &&
374 status.getExceptionCode() == EX_TRANSACTION_FAILED) {
375 mClientAlive = false;
376 return;
377 }
378
379 mLastWriteMs = getElapsedRealtimeMillis();
380 clearCache();
381 }
382
flush()383 void ShellSubscriberClient::flush() {
384 triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED);
385 }
386
onUnsubscribe()387 void ShellSubscriberClient::onUnsubscribe() {
388 StatsdStats::getInstance().noteSubscriptionEnded(mId);
389 if (mClientAlive) {
390 triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED);
391 }
392 }
393
addAllAtomIds(LogEventFilter::AtomIdSet & allAtomIds) const394 void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const {
395 for (const auto& matcher : mPushedMatchers) {
396 allAtomIds.insert(matcher.atom_id());
397 }
398 }
399
400 } // namespace statsd
401 } // namespace os
402 } // namespace android
403