• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define STATSD_DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriberClient.h"
20 
21 #include "FieldValue.h"
22 #include "matchers/matcher_util.h"
23 #include "stats_log_util.h"
24 
25 using android::base::unique_fd;
26 using android::util::ProtoOutputStream;
27 using Status = ::ndk::ScopedAStatus;
28 
29 namespace android {
30 namespace os {
31 namespace statsd {
32 
33 const static int FIELD_ID_SHELL_DATA__ATOM = 1;
34 const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
35 
36 struct ReadConfigResult {
37     vector<SimpleAtomMatcher> pushedMatchers;
38     vector<ShellSubscriberClient::PullInfo> pullInfo;
39 };
40 
41 // Read and parse single config. There should only one config in the input.
readConfig(const vector<uint8_t> & configBytes,int64_t startTimeMs,int64_t minPullIntervalMs)42 static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
43                                              int64_t startTimeMs, int64_t minPullIntervalMs) {
44     // Parse the config.
45     ShellSubscription config;
46     if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
47         ALOGE("ShellSubscriberClient: failed to parse the config");
48         return nullopt;
49     }
50 
51     ReadConfigResult result;
52 
53     result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end());
54 
55     vector<ShellSubscriberClient::PullInfo> pullInfo;
56     for (const auto& pulled : config.pulled()) {
57         vector<string> packages;
58         vector<int32_t> uids;
59         for (const string& pkg : pulled.packages()) {
60             auto it = UidMap::sAidToUidMapping.find(pkg);
61             if (it != UidMap::sAidToUidMapping.end()) {
62                 uids.push_back(it->second);
63             } else {
64                 packages.push_back(pkg);
65             }
66         }
67 
68         const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs);
69         result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids);
70         ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
71               pulled.matcher().atom_id());
72     }
73 
74     return result;
75 }
76 
PullInfo(const SimpleAtomMatcher & matcher,int64_t startTimeMs,int64_t intervalMs,const std::vector<std::string> & packages,const std::vector<int32_t> & uids)77 ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
78                                           int64_t intervalMs,
79                                           const std::vector<std::string>& packages,
80                                           const std::vector<int32_t>& uids)
81     : mPullerMatcher(matcher),
82       mIntervalMs(intervalMs),
83       mPrevPullElapsedRealtimeMs(startTimeMs),
84       mPullPackages(packages),
85       mPullUids(uids) {
86 }
87 
ShellSubscriberClient(int out,const std::shared_ptr<IStatsSubscriptionCallback> & callback,const std::vector<SimpleAtomMatcher> & pushedMatchers,const std::vector<PullInfo> & pulledInfo,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)88 ShellSubscriberClient::ShellSubscriberClient(
89         int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
90         const std::vector<SimpleAtomMatcher>& pushedMatchers,
91         const std::vector<PullInfo>& pulledInfo, int64_t timeoutSec, int64_t startTimeSec,
92         const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr)
93     : mUidMap(uidMap),
94       mPullerMgr(pullerMgr),
95       mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)),
96       mPushedMatchers(pushedMatchers),
97       mPulledInfo(pulledInfo),
98       mCallback(callback),
99       mTimeoutSec(timeoutSec),
100       mStartTimeSec(startTimeSec),
101       mLastWriteMs(startTimeSec * 1000),
102       mCacheSize(0){};
103 
create(int in,int out,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)104 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
105         int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
106         const sp<StatsPullerManager>& pullerMgr) {
107     // Read the size of the config.
108     size_t bufferSize;
109     if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) {
110         return nullptr;
111     }
112 
113     // Check bufferSize
114     if (bufferSize > (kMaxSizeKb * 1024)) {
115         ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
116               "bytes)",
117               bufferSize, (kMaxSizeKb * 1024));
118         return nullptr;
119     }
120 
121     // Read the config.
122     vector<uint8_t> buffer(bufferSize);
123     if (!android::base::ReadFully(in, buffer.data(), bufferSize)) {
124         ALOGE("ShellSubscriberClient: failed to read the config from file descriptor");
125         return nullptr;
126     }
127 
128     const optional<ReadConfigResult> readConfigResult =
129             readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0);
130     if (!readConfigResult.has_value()) {
131         return nullptr;
132     }
133 
134     return make_unique<ShellSubscriberClient>(
135             out, /*callback=*/nullptr, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
136             timeoutSec, startTimeSec, uidMap, pullerMgr);
137 }
138 
create(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)139 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
140         const vector<uint8_t>& subscriptionConfig,
141         const shared_ptr<IStatsSubscriptionCallback>& callback, int64_t startTimeSec,
142         const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr) {
143     if (callback == nullptr) {
144         ALOGE("ShellSubscriberClient: received nullptr callback");
145         return nullptr;
146     }
147 
148     if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) {
149         ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
150               "bytes)",
151               subscriptionConfig.size(), (kMaxSizeKb * 1024));
152         return nullptr;
153     }
154 
155     const optional<ReadConfigResult> readConfigResult =
156             readConfig(subscriptionConfig, startTimeSec * 1000,
157                        ShellSubscriberClient::kMinCallbackPullIntervalMs);
158     if (!readConfigResult.has_value()) {
159         return nullptr;
160     }
161 
162     return make_unique<ShellSubscriberClient>(
163             /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
164             /*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr);
165 }
166 
writeEventToProtoIfMatched(const LogEvent & event,const SimpleAtomMatcher & matcher,const sp<UidMap> & uidMap)167 bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event,
168                                                        const SimpleAtomMatcher& matcher,
169                                                        const sp<UidMap>& uidMap) {
170     if (!matchesSimple(uidMap, matcher, event)) {
171         return false;
172     }
173 
174     // Cache atom event in mProtoOut.
175     uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED |
176                                          FIELD_ID_SHELL_DATA__ATOM);
177     event.ToProto(mProtoOut);
178     mProtoOut.end(atomToken);
179 
180     const int64_t timestampNs = truncateTimestampIfNecessary(event);
181     mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
182                             FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
183                     static_cast<long long>(timestampNs));
184 
185     // Update byte size of cached data.
186     mCacheSize += getSize(event.getValues()) + sizeof(timestampNs);
187 
188     return true;
189 }
190 
191 // Called by ShellSubscriber when a pushed event occurs
onLogEvent(const LogEvent & event)192 void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
193     for (const auto& matcher : mPushedMatchers) {
194         if (writeEventToProtoIfMatched(event, matcher, mUidMap)) {
195             flushProtoIfNeeded();
196             break;
197         }
198     }
199 }
200 
flushProtoIfNeeded()201 void ShellSubscriberClient::flushProtoIfNeeded() {
202     if (mCallback == nullptr) {  // Using file descriptor.
203         triggerFdFlush();
204     } else if (mCacheSize >= kMaxCacheSizeBytes) {  // Using callback.
205         // Flush data if cache is full.
206         triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
207     }
208 }
209 
pullIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)210 int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
211     int64_t sleepTimeMs = 24 * 60 * 60 * 1000;  // 24 hours.
212     for (PullInfo& pullInfo : mPulledInfo) {
213         if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) {
214             vector<int32_t> uids;
215             getUidsForPullAtom(&uids, pullInfo);
216 
217             vector<shared_ptr<LogEvent>> data;
218             mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
219             VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
220                  pullInfo.mPullerMatcher.atom_id());
221 
222             writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
223             pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
224         }
225 
226         // Determine how long to sleep before doing more work.
227         const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
228 
229         const int64_t timeBeforePullMs =
230                 nextPullTimeMs - nowMillis;  // guaranteed to be non-negative
231         sleepTimeMs = min(sleepTimeMs, timeBeforePullMs);
232     }
233     return sleepTimeMs;
234 }
235 
236 // The pullAndHeartbeat threads sleep for the minimum time
237 // among all clients' input
pullAndSendHeartbeatsIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)238 int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
239                                                              int64_t nowNanos) {
240     int64_t sleepTimeMs;
241     if (mCallback == nullptr) {  // File descriptor subscription
242         if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
243             mClientAlive = false;
244             return kMsBetweenHeartbeats;
245         }
246 
247         sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos));
248 
249         // Send a heartbeat consisting of data size of 0, if
250         // the user hasn't recently received data from statsd. When it receives the data size of 0,
251         // the user will not expect any atoms and recheck whether the subscription should end.
252         if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
253             triggerFdFlush();
254             if (!mClientAlive) return kMsBetweenHeartbeats;
255         }
256 
257         int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis;
258         sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat);
259     } else {  // Callback subscription.
260         sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos));
261 
262         if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) {
263             // Flush data if cache has kept data for longer than kMsBetweenCallbacks.
264             triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
265         }
266 
267         // Cache should be flushed kMsBetweenCallbacks after mLastWrite.
268         const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis;
269 
270         // For callback subscriptions, ensure minimum sleep time is at least
271         // kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left
272         // before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the
273         // effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from
274         // now to have their pulls batched together, mitigating frequent wakeups of the puller
275         // thread.
276         sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs));
277     }
278     return sleepTimeMs;
279 }
280 
writePulledAtomsLocked(const vector<shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)281 void ShellSubscriberClient::writePulledAtomsLocked(const vector<shared_ptr<LogEvent>>& data,
282                                                    const SimpleAtomMatcher& matcher) {
283     bool hasData = false;
284     for (const shared_ptr<LogEvent>& event : data) {
285         if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) {
286             hasData = true;
287         }
288     }
289 
290     if (hasData) {
291         flushProtoIfNeeded();
292     }
293 }
294 
295 // Tries to write the atom encoded in mProtoOut to the pipe. If the write fails
296 // because the read end of the pipe has closed, change the client status so
297 // the manager knows the subscription is no longer active
attemptWriteToPipeLocked()298 void ShellSubscriberClient::attemptWriteToPipeLocked() {
299     const size_t dataSize = mProtoOut.size();
300     // First, write the payload size.
301     if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
302         mClientAlive = false;
303         return;
304     }
305     // Then, write the payload if this is not just a heartbeat.
306     if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) {
307         mClientAlive = false;
308         return;
309     }
310     mLastWriteMs = getElapsedRealtimeMillis();
311 }
312 
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)313 void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
314     uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
315     // This is slow. Consider storing the uids per app and listening to uidmap updates.
316     for (const string& pkg : pullInfo.mPullPackages) {
317         set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
318         uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
319     }
320     uids->push_back(DEFAULT_PULL_UID);
321 }
322 
clearCache()323 void ShellSubscriberClient::clearCache() {
324     mProtoOut.clear();
325     mCacheSize = 0;
326 }
327 
triggerFdFlush()328 void ShellSubscriberClient::triggerFdFlush() {
329     attemptWriteToPipeLocked();
330     clearCache();
331 }
332 
triggerCallback(StatsSubscriptionCallbackReason reason)333 void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) {
334     // Invoke Binder callback with cached event data.
335     vector<uint8_t> payloadBytes;
336     mProtoOut.serializeToVector(&payloadBytes);
337     const Status status = mCallback->onSubscriptionData(reason, payloadBytes);
338     if (status.getStatus() == STATUS_DEAD_OBJECT &&
339         status.getExceptionCode() == EX_TRANSACTION_FAILED) {
340         mClientAlive = false;
341         return;
342     }
343 
344     mLastWriteMs = getElapsedRealtimeMillis();
345     clearCache();
346 }
347 
flush()348 void ShellSubscriberClient::flush() {
349     triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED);
350 }
351 
onUnsubscribe()352 void ShellSubscriberClient::onUnsubscribe() {
353     triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED);
354 }
355 
addAllAtomIds(LogEventFilter::AtomIdSet & allAtomIds) const356 void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const {
357     for (const auto& matcher : mPushedMatchers) {
358         allAtomIds.insert(matcher.atom_id());
359     }
360 }
361 
362 }  // namespace statsd
363 }  // namespace os
364 }  // namespace android
365