• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define STATSD_DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriberClient.h"
20 
21 #include "FieldValue.h"
22 #include "guardrail/StatsdStats.h"
23 #include "matchers/matcher_util.h"
24 #include "stats_log_util.h"
25 
26 using android::base::unique_fd;
27 using Status = ::ndk::ScopedAStatus;
28 
29 namespace android {
30 namespace os {
31 namespace statsd {
32 
33 const static int FIELD_ID_SHELL_DATA__ATOM = 1;
34 const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
35 const static int FIELD_ID_SHELL_DATA__LOGGING_UID = 3;
36 
37 // Store next subscription ID for StatsdStats.
38 // Not thread-safe; should only be accessed while holding ShellSubscriber::mMutex lock.
39 static int nextSubId = 0;
40 
41 struct ReadConfigResult {
42     vector<SimpleAtomMatcher> pushedMatchers;
43     vector<ShellSubscriberClient::PullInfo> pullInfo;
44     bool collect_uids;
45 };
46 
47 // Read and parse single config. There should only one config in the input.
readConfig(const vector<uint8_t> & configBytes,int64_t startTimeMs,int64_t minPullIntervalMs)48 static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
49                                              int64_t startTimeMs, int64_t minPullIntervalMs) {
50     // Parse the config.
51     ShellSubscription config;
52     if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
53         ALOGE("ShellSubscriberClient: failed to parse the config");
54         return nullopt;
55     }
56 
57     ReadConfigResult result;
58 
59     result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end());
60 
61     vector<ShellSubscriberClient::PullInfo> pullInfo;
62     for (const auto& pulled : config.pulled()) {
63         vector<string> packages;
64         vector<int32_t> uids;
65         for (const string& pkg : pulled.packages()) {
66             auto it = UidMap::sAidToUidMapping.find(pkg);
67             if (it != UidMap::sAidToUidMapping.end()) {
68                 uids.push_back(it->second);
69             } else {
70                 packages.push_back(pkg);
71             }
72         }
73 
74         const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs);
75         result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids);
76         ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
77               pulled.matcher().atom_id());
78     }
79 
80     result.collect_uids = config.collect_uids();
81 
82     return result;
83 }
84 
PullInfo(const SimpleAtomMatcher & matcher,int64_t startTimeMs,int64_t intervalMs,const std::vector<std::string> & packages,const std::vector<int32_t> & uids)85 ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
86                                           int64_t intervalMs,
87                                           const std::vector<std::string>& packages,
88                                           const std::vector<int32_t>& uids)
89     : mPullerMatcher(matcher),
90       mIntervalMs(intervalMs),
91       mPrevPullElapsedRealtimeMs(startTimeMs),
92       mPullPackages(packages),
93       mPullUids(uids) {
94 }
95 
ShellSubscriberClient(int id,int out,const std::shared_ptr<IStatsSubscriptionCallback> & callback,const std::vector<SimpleAtomMatcher> & pushedMatchers,const std::vector<PullInfo> & pulledInfo,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)96 ShellSubscriberClient::ShellSubscriberClient(
97         int id, int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
98         const std::vector<SimpleAtomMatcher>& pushedMatchers,
99         const std::vector<PullInfo>& pulledInfo, int64_t timeoutSec, int64_t startTimeSec,
100         const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr)
101     : mId(id),
102       mUidMap(uidMap),
103       mPullerMgr(pullerMgr),
104       mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)),
105       mPushedMatchers(pushedMatchers),
106       mPulledInfo(pulledInfo),
107       mCallback(callback),
108       mTimeoutSec(timeoutSec),
109       mStartTimeSec(startTimeSec),
110       mLastWriteMs(startTimeSec * 1000),
111       mCacheSize(0){};
112 
create(int in,int out,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)113 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
114         int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
115         const sp<StatsPullerManager>& pullerMgr) {
116     // Read the size of the config.
117     size_t bufferSize;
118     if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) {
119         return nullptr;
120     }
121 
122     // Check bufferSize
123     if (bufferSize > (kMaxSizeKb * 1024)) {
124         ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
125               "bytes)",
126               bufferSize, (kMaxSizeKb * 1024));
127         return nullptr;
128     }
129 
130     // Read the config.
131     vector<uint8_t> buffer(bufferSize);
132     if (!android::base::ReadFully(in, buffer.data(), bufferSize)) {
133         ALOGE("ShellSubscriberClient: failed to read the config from file descriptor");
134         return nullptr;
135     }
136 
137     const optional<ReadConfigResult> readConfigResult =
138             readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0);
139     if (!readConfigResult.has_value()) {
140         return nullptr;
141     }
142 
143     auto result = unique_ptr<ShellSubscriberClient>(new ShellSubscriberClient(
144             nextSubId++, out, /*callback=*/nullptr, readConfigResult->pushedMatchers,
145             readConfigResult->pullInfo, timeoutSec, startTimeSec, uidMap, pullerMgr));
146     if (result != nullptr) {
147         result->setCollectUids(readConfigResult->collect_uids);
148     }
149     return result;
150 }
151 
create(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)152 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
153         const vector<uint8_t>& subscriptionConfig,
154         const shared_ptr<IStatsSubscriptionCallback>& callback, int64_t startTimeSec,
155         const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr) {
156     if (callback == nullptr) {
157         ALOGE("ShellSubscriberClient: received nullptr callback");
158         return nullptr;
159     }
160 
161     if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) {
162         ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
163               "bytes)",
164               subscriptionConfig.size(), (kMaxSizeKb * 1024));
165         return nullptr;
166     }
167 
168     const optional<ReadConfigResult> readConfigResult =
169             readConfig(subscriptionConfig, startTimeSec * 1000,
170                        ShellSubscriberClient::kMinCallbackPullIntervalMs);
171     if (!readConfigResult.has_value()) {
172         return nullptr;
173     }
174 
175     const int id = nextSubId++;
176 
177     StatsdStats::getInstance().noteSubscriptionStarted(id, readConfigResult->pushedMatchers.size(),
178                                                        readConfigResult->pullInfo.size());
179     auto result = unique_ptr<ShellSubscriberClient>(new ShellSubscriberClient(
180             id, /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
181             /*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr));
182     if (result != nullptr) {
183         result->setCollectUids(readConfigResult->collect_uids);
184     }
185     return result;
186 }
187 
writeEventToProtoIfMatched(const LogEvent & event,const SimpleAtomMatcher & matcher,const sp<UidMap> & uidMap)188 bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event,
189                                                        const SimpleAtomMatcher& matcher,
190                                                        const sp<UidMap>& uidMap) {
191     auto [matched, transformedEvent] = matchesSimple(mUidMap, matcher, event);
192     if (!matched) {
193         return false;
194     }
195     const LogEvent& eventRef = transformedEvent == nullptr ? event : *transformedEvent;
196 
197     // Cache atom event in mProtoOut.
198     uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED |
199                                          FIELD_ID_SHELL_DATA__ATOM);
200     eventRef.ToProto(mProtoOut);
201     mProtoOut.end(atomToken);
202 
203     const int64_t timestampNs = truncateTimestampIfNecessary(eventRef);
204     mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
205                             FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
206                     static_cast<long long>(timestampNs));
207 
208     // Update byte size of cached data.
209     mCacheSize += getSize(eventRef.getValues()) + sizeof(timestampNs);
210 
211     if (mDoCollectUids) {
212         mProtoOut.write(util::FIELD_TYPE_INT32 | util::FIELD_COUNT_REPEATED |
213                                 FIELD_ID_SHELL_DATA__LOGGING_UID,
214                         eventRef.GetUid());
215         mCacheSize += sizeof(int32_t);
216     }
217 
218     return true;
219 }
220 
221 // Called by ShellSubscriber when a pushed event occurs
onLogEvent(const LogEvent & event)222 void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
223     for (const auto& matcher : mPushedMatchers) {
224         if (writeEventToProtoIfMatched(event, matcher, mUidMap)) {
225             flushProtoIfNeeded();
226             break;
227         }
228     }
229 }
230 
flushProtoIfNeeded()231 void ShellSubscriberClient::flushProtoIfNeeded() {
232     if (mCallback == nullptr) {  // Using file descriptor.
233         triggerFdFlush();
234     } else if (mCacheSize >= kMaxCacheSizeBytes) {  // Using callback.
235         // Flush data if cache is full.
236         triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
237     }
238 }
239 
pullIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)240 int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
241     int64_t sleepTimeMs = 24 * 60 * 60 * 1000;  // 24 hours.
242     for (PullInfo& pullInfo : mPulledInfo) {
243         if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) {
244             vector<int32_t> uids;
245             getUidsForPullAtom(&uids, pullInfo);
246 
247             vector<shared_ptr<LogEvent>> data;
248             mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
249             VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
250                  pullInfo.mPullerMatcher.atom_id());
251             if (mCallback != nullptr) {  // Callback subscription
252                 StatsdStats::getInstance().noteSubscriptionAtomPulled(
253                         pullInfo.mPullerMatcher.atom_id());
254             }
255 
256             writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
257             pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
258         }
259 
260         // Determine how long to sleep before doing more work.
261         const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
262 
263         const int64_t timeBeforePullMs =
264                 nextPullTimeMs - nowMillis;  // guaranteed to be non-negative
265         sleepTimeMs = min(sleepTimeMs, timeBeforePullMs);
266     }
267     return sleepTimeMs;
268 }
269 
270 // The pullAndHeartbeat threads sleep for the minimum time
271 // among all clients' input
pullAndSendHeartbeatsIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)272 int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
273                                                              int64_t nowNanos) {
274     int64_t sleepTimeMs;
275     if (mCallback == nullptr) {  // File descriptor subscription
276         if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
277             mClientAlive = false;
278             return kMsBetweenHeartbeats;
279         }
280 
281         sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos));
282 
283         // Send a heartbeat consisting of data size of 0, if
284         // the user hasn't recently received data from statsd. When it receives the data size of 0,
285         // the user will not expect any atoms and recheck whether the subscription should end.
286         if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
287             triggerFdFlush();
288             if (!mClientAlive) return kMsBetweenHeartbeats;
289         }
290 
291         int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis;
292         sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat);
293     } else {  // Callback subscription.
294         sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos));
295 
296         if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) {
297             // Flush data if cache has kept data for longer than kMsBetweenCallbacks.
298             triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
299         }
300 
301         // Cache should be flushed kMsBetweenCallbacks after mLastWrite.
302         const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis;
303 
304         // For callback subscriptions, ensure minimum sleep time is at least
305         // kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left
306         // before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the
307         // effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from
308         // now to have their pulls batched together, mitigating frequent wakeups of the puller
309         // thread.
310         sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs));
311     }
312     return sleepTimeMs;
313 }
314 
writePulledAtomsLocked(const vector<shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)315 void ShellSubscriberClient::writePulledAtomsLocked(const vector<shared_ptr<LogEvent>>& data,
316                                                    const SimpleAtomMatcher& matcher) {
317     bool hasData = false;
318     for (const shared_ptr<LogEvent>& event : data) {
319         if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) {
320             hasData = true;
321         }
322     }
323 
324     if (hasData) {
325         flushProtoIfNeeded();
326     }
327 }
328 
329 // Tries to write the atom encoded in mProtoOut to the pipe. If the write fails
330 // because the read end of the pipe has closed, change the client status so
331 // the manager knows the subscription is no longer active
attemptWriteToPipeLocked()332 void ShellSubscriberClient::attemptWriteToPipeLocked() {
333     const size_t dataSize = mProtoOut.size();
334     // First, write the payload size.
335     if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
336         mClientAlive = false;
337         return;
338     }
339     // Then, write the payload if this is not just a heartbeat.
340     if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) {
341         mClientAlive = false;
342         return;
343     }
344     mLastWriteMs = getElapsedRealtimeMillis();
345 }
346 
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)347 void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
348     uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
349     // This is slow. Consider storing the uids per app and listening to uidmap updates.
350     for (const string& pkg : pullInfo.mPullPackages) {
351         set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
352         uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
353     }
354     uids->push_back(DEFAULT_PULL_UID);
355 }
356 
clearCache()357 void ShellSubscriberClient::clearCache() {
358     mProtoOut.clear();
359     mCacheSize = 0;
360 }
361 
triggerFdFlush()362 void ShellSubscriberClient::triggerFdFlush() {
363     attemptWriteToPipeLocked();
364     clearCache();
365 }
366 
triggerCallback(StatsSubscriptionCallbackReason reason)367 void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) {
368     // Invoke Binder callback with cached event data.
369     vector<uint8_t> payloadBytes;
370     mProtoOut.serializeToVector(&payloadBytes);
371     StatsdStats::getInstance().noteSubscriptionFlushed(mId);
372     const Status status = mCallback->onSubscriptionData(reason, payloadBytes);
373     if (status.getStatus() == STATUS_DEAD_OBJECT &&
374         status.getExceptionCode() == EX_TRANSACTION_FAILED) {
375         mClientAlive = false;
376         return;
377     }
378 
379     mLastWriteMs = getElapsedRealtimeMillis();
380     clearCache();
381 }
382 
flush()383 void ShellSubscriberClient::flush() {
384     triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED);
385 }
386 
onUnsubscribe()387 void ShellSubscriberClient::onUnsubscribe() {
388     StatsdStats::getInstance().noteSubscriptionEnded(mId);
389     if (mClientAlive) {
390         triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED);
391     }
392 }
393 
addAllAtomIds(LogEventFilter::AtomIdSet & allAtomIds) const394 void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const {
395     for (const auto& matcher : mPushedMatchers) {
396         allAtomIds.insert(matcher.atom_id());
397     }
398 }
399 
400 }  // namespace statsd
401 }  // namespace os
402 }  // namespace android
403