1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define STATSD_DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriberClient.h"
20
21 #include "FieldValue.h"
22 #include "matchers/matcher_util.h"
23 #include "stats_log_util.h"
24
25 using android::base::unique_fd;
26 using android::util::ProtoOutputStream;
27 using Status = ::ndk::ScopedAStatus;
28
29 namespace android {
30 namespace os {
31 namespace statsd {
32
33 const static int FIELD_ID_SHELL_DATA__ATOM = 1;
34 const static int FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS = 2;
35
36 struct ReadConfigResult {
37 vector<SimpleAtomMatcher> pushedMatchers;
38 vector<ShellSubscriberClient::PullInfo> pullInfo;
39 };
40
41 // Read and parse single config. There should only one config in the input.
readConfig(const vector<uint8_t> & configBytes,int64_t startTimeMs,int64_t minPullIntervalMs)42 static optional<ReadConfigResult> readConfig(const vector<uint8_t>& configBytes,
43 int64_t startTimeMs, int64_t minPullIntervalMs) {
44 // Parse the config.
45 ShellSubscription config;
46 if (!config.ParseFromArray(configBytes.data(), configBytes.size())) {
47 ALOGE("ShellSubscriberClient: failed to parse the config");
48 return nullopt;
49 }
50
51 ReadConfigResult result;
52
53 result.pushedMatchers.assign(config.pushed().begin(), config.pushed().end());
54
55 vector<ShellSubscriberClient::PullInfo> pullInfo;
56 for (const auto& pulled : config.pulled()) {
57 vector<string> packages;
58 vector<int32_t> uids;
59 for (const string& pkg : pulled.packages()) {
60 auto it = UidMap::sAidToUidMapping.find(pkg);
61 if (it != UidMap::sAidToUidMapping.end()) {
62 uids.push_back(it->second);
63 } else {
64 packages.push_back(pkg);
65 }
66 }
67
68 const int64_t pullIntervalMs = max(pulled.freq_millis(), minPullIntervalMs);
69 result.pullInfo.emplace_back(pulled.matcher(), startTimeMs, pullIntervalMs, packages, uids);
70 ALOGD("ShellSubscriberClient: adding matcher for pulled atom %d",
71 pulled.matcher().atom_id());
72 }
73
74 return result;
75 }
76
PullInfo(const SimpleAtomMatcher & matcher,int64_t startTimeMs,int64_t intervalMs,const std::vector<std::string> & packages,const std::vector<int32_t> & uids)77 ShellSubscriberClient::PullInfo::PullInfo(const SimpleAtomMatcher& matcher, int64_t startTimeMs,
78 int64_t intervalMs,
79 const std::vector<std::string>& packages,
80 const std::vector<int32_t>& uids)
81 : mPullerMatcher(matcher),
82 mIntervalMs(intervalMs),
83 mPrevPullElapsedRealtimeMs(startTimeMs),
84 mPullPackages(packages),
85 mPullUids(uids) {
86 }
87
ShellSubscriberClient(int out,const std::shared_ptr<IStatsSubscriptionCallback> & callback,const std::vector<SimpleAtomMatcher> & pushedMatchers,const std::vector<PullInfo> & pulledInfo,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)88 ShellSubscriberClient::ShellSubscriberClient(
89 int out, const std::shared_ptr<IStatsSubscriptionCallback>& callback,
90 const std::vector<SimpleAtomMatcher>& pushedMatchers,
91 const std::vector<PullInfo>& pulledInfo, int64_t timeoutSec, int64_t startTimeSec,
92 const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr)
93 : mUidMap(uidMap),
94 mPullerMgr(pullerMgr),
95 mDupOut(fcntl(out, F_DUPFD_CLOEXEC, 0)),
96 mPushedMatchers(pushedMatchers),
97 mPulledInfo(pulledInfo),
98 mCallback(callback),
99 mTimeoutSec(timeoutSec),
100 mStartTimeSec(startTimeSec),
101 mLastWriteMs(startTimeSec * 1000),
102 mCacheSize(0){};
103
create(int in,int out,int64_t timeoutSec,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)104 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
105 int in, int out, int64_t timeoutSec, int64_t startTimeSec, const sp<UidMap>& uidMap,
106 const sp<StatsPullerManager>& pullerMgr) {
107 // Read the size of the config.
108 size_t bufferSize;
109 if (!android::base::ReadFully(in, &bufferSize, sizeof(bufferSize))) {
110 return nullptr;
111 }
112
113 // Check bufferSize
114 if (bufferSize > (kMaxSizeKb * 1024)) {
115 ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
116 "bytes)",
117 bufferSize, (kMaxSizeKb * 1024));
118 return nullptr;
119 }
120
121 // Read the config.
122 vector<uint8_t> buffer(bufferSize);
123 if (!android::base::ReadFully(in, buffer.data(), bufferSize)) {
124 ALOGE("ShellSubscriberClient: failed to read the config from file descriptor");
125 return nullptr;
126 }
127
128 const optional<ReadConfigResult> readConfigResult =
129 readConfig(buffer, startTimeSec * 1000, /* minPullIntervalMs */ 0);
130 if (!readConfigResult.has_value()) {
131 return nullptr;
132 }
133
134 return make_unique<ShellSubscriberClient>(
135 out, /*callback=*/nullptr, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
136 timeoutSec, startTimeSec, uidMap, pullerMgr);
137 }
138
create(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback,int64_t startTimeSec,const sp<UidMap> & uidMap,const sp<StatsPullerManager> & pullerMgr)139 unique_ptr<ShellSubscriberClient> ShellSubscriberClient::create(
140 const vector<uint8_t>& subscriptionConfig,
141 const shared_ptr<IStatsSubscriptionCallback>& callback, int64_t startTimeSec,
142 const sp<UidMap>& uidMap, const sp<StatsPullerManager>& pullerMgr) {
143 if (callback == nullptr) {
144 ALOGE("ShellSubscriberClient: received nullptr callback");
145 return nullptr;
146 }
147
148 if (subscriptionConfig.size() > (kMaxSizeKb * 1024)) {
149 ALOGE("ShellSubscriberClient: received config (%zu bytes) is larger than the max size (%zu "
150 "bytes)",
151 subscriptionConfig.size(), (kMaxSizeKb * 1024));
152 return nullptr;
153 }
154
155 const optional<ReadConfigResult> readConfigResult =
156 readConfig(subscriptionConfig, startTimeSec * 1000,
157 ShellSubscriberClient::kMinCallbackPullIntervalMs);
158 if (!readConfigResult.has_value()) {
159 return nullptr;
160 }
161
162 return make_unique<ShellSubscriberClient>(
163 /*out=*/-1, callback, readConfigResult->pushedMatchers, readConfigResult->pullInfo,
164 /*timeoutSec=*/-1, startTimeSec, uidMap, pullerMgr);
165 }
166
writeEventToProtoIfMatched(const LogEvent & event,const SimpleAtomMatcher & matcher,const sp<UidMap> & uidMap)167 bool ShellSubscriberClient::writeEventToProtoIfMatched(const LogEvent& event,
168 const SimpleAtomMatcher& matcher,
169 const sp<UidMap>& uidMap) {
170 if (!matchesSimple(uidMap, matcher, event)) {
171 return false;
172 }
173
174 // Cache atom event in mProtoOut.
175 uint64_t atomToken = mProtoOut.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED |
176 FIELD_ID_SHELL_DATA__ATOM);
177 event.ToProto(mProtoOut);
178 mProtoOut.end(atomToken);
179
180 const int64_t timestampNs = truncateTimestampIfNecessary(event);
181 mProtoOut.write(util::FIELD_TYPE_INT64 | util::FIELD_COUNT_REPEATED |
182 FIELD_ID_SHELL_DATA__ELAPSED_TIMESTAMP_NANOS,
183 static_cast<long long>(timestampNs));
184
185 // Update byte size of cached data.
186 mCacheSize += getSize(event.getValues()) + sizeof(timestampNs);
187
188 return true;
189 }
190
191 // Called by ShellSubscriber when a pushed event occurs
onLogEvent(const LogEvent & event)192 void ShellSubscriberClient::onLogEvent(const LogEvent& event) {
193 for (const auto& matcher : mPushedMatchers) {
194 if (writeEventToProtoIfMatched(event, matcher, mUidMap)) {
195 flushProtoIfNeeded();
196 break;
197 }
198 }
199 }
200
flushProtoIfNeeded()201 void ShellSubscriberClient::flushProtoIfNeeded() {
202 if (mCallback == nullptr) { // Using file descriptor.
203 triggerFdFlush();
204 } else if (mCacheSize >= kMaxCacheSizeBytes) { // Using callback.
205 // Flush data if cache is full.
206 triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
207 }
208 }
209
pullIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)210 int64_t ShellSubscriberClient::pullIfNeeded(int64_t nowSecs, int64_t nowMillis, int64_t nowNanos) {
211 int64_t sleepTimeMs = 24 * 60 * 60 * 1000; // 24 hours.
212 for (PullInfo& pullInfo : mPulledInfo) {
213 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs <= nowMillis) {
214 vector<int32_t> uids;
215 getUidsForPullAtom(&uids, pullInfo);
216
217 vector<shared_ptr<LogEvent>> data;
218 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
219 VLOG("ShellSubscriberClient: pulled %zu atoms with id %d", data.size(),
220 pullInfo.mPullerMatcher.atom_id());
221
222 writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
223 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
224 }
225
226 // Determine how long to sleep before doing more work.
227 const int64_t nextPullTimeMs = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mIntervalMs;
228
229 const int64_t timeBeforePullMs =
230 nextPullTimeMs - nowMillis; // guaranteed to be non-negative
231 sleepTimeMs = min(sleepTimeMs, timeBeforePullMs);
232 }
233 return sleepTimeMs;
234 }
235
236 // The pullAndHeartbeat threads sleep for the minimum time
237 // among all clients' input
pullAndSendHeartbeatsIfNeeded(int64_t nowSecs,int64_t nowMillis,int64_t nowNanos)238 int64_t ShellSubscriberClient::pullAndSendHeartbeatsIfNeeded(int64_t nowSecs, int64_t nowMillis,
239 int64_t nowNanos) {
240 int64_t sleepTimeMs;
241 if (mCallback == nullptr) { // File descriptor subscription
242 if ((nowSecs - mStartTimeSec >= mTimeoutSec) && (mTimeoutSec > 0)) {
243 mClientAlive = false;
244 return kMsBetweenHeartbeats;
245 }
246
247 sleepTimeMs = min(kMsBetweenHeartbeats, pullIfNeeded(nowSecs, nowMillis, nowNanos));
248
249 // Send a heartbeat consisting of data size of 0, if
250 // the user hasn't recently received data from statsd. When it receives the data size of 0,
251 // the user will not expect any atoms and recheck whether the subscription should end.
252 if (nowMillis - mLastWriteMs >= kMsBetweenHeartbeats) {
253 triggerFdFlush();
254 if (!mClientAlive) return kMsBetweenHeartbeats;
255 }
256
257 int64_t timeBeforeHeartbeat = mLastWriteMs + kMsBetweenHeartbeats - nowMillis;
258 sleepTimeMs = min(sleepTimeMs, timeBeforeHeartbeat);
259 } else { // Callback subscription.
260 sleepTimeMs = min(kMsBetweenCallbacks, pullIfNeeded(nowSecs, nowMillis, nowNanos));
261
262 if (mCacheSize > 0 && nowMillis - mLastWriteMs >= kMsBetweenCallbacks) {
263 // Flush data if cache has kept data for longer than kMsBetweenCallbacks.
264 triggerCallback(StatsSubscriptionCallbackReason::STATSD_INITIATED);
265 }
266
267 // Cache should be flushed kMsBetweenCallbacks after mLastWrite.
268 const int64_t timeToCallbackMs = mLastWriteMs + kMsBetweenCallbacks - nowMillis;
269
270 // For callback subscriptions, ensure minimum sleep time is at least
271 // kMinCallbackSleepIntervalMs. Even if there is less than kMinCallbackSleepIntervalMs left
272 // before next pull time, sleep for at least kMinCallbackSleepIntervalMs. This has the
273 // effect of multiple pulled atoms that have a pull within kMinCallbackSleepIntervalMs from
274 // now to have their pulls batched together, mitigating frequent wakeups of the puller
275 // thread.
276 sleepTimeMs = max(kMinCallbackSleepIntervalMs, min(sleepTimeMs, timeToCallbackMs));
277 }
278 return sleepTimeMs;
279 }
280
writePulledAtomsLocked(const vector<shared_ptr<LogEvent>> & data,const SimpleAtomMatcher & matcher)281 void ShellSubscriberClient::writePulledAtomsLocked(const vector<shared_ptr<LogEvent>>& data,
282 const SimpleAtomMatcher& matcher) {
283 bool hasData = false;
284 for (const shared_ptr<LogEvent>& event : data) {
285 if (writeEventToProtoIfMatched(*event, matcher, mUidMap)) {
286 hasData = true;
287 }
288 }
289
290 if (hasData) {
291 flushProtoIfNeeded();
292 }
293 }
294
295 // Tries to write the atom encoded in mProtoOut to the pipe. If the write fails
296 // because the read end of the pipe has closed, change the client status so
297 // the manager knows the subscription is no longer active
attemptWriteToPipeLocked()298 void ShellSubscriberClient::attemptWriteToPipeLocked() {
299 const size_t dataSize = mProtoOut.size();
300 // First, write the payload size.
301 if (!android::base::WriteFully(mDupOut, &dataSize, sizeof(dataSize))) {
302 mClientAlive = false;
303 return;
304 }
305 // Then, write the payload if this is not just a heartbeat.
306 if (dataSize > 0 && !mProtoOut.flush(mDupOut.get())) {
307 mClientAlive = false;
308 return;
309 }
310 mLastWriteMs = getElapsedRealtimeMillis();
311 }
312
getUidsForPullAtom(vector<int32_t> * uids,const PullInfo & pullInfo)313 void ShellSubscriberClient::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
314 uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
315 // This is slow. Consider storing the uids per app and listening to uidmap updates.
316 for (const string& pkg : pullInfo.mPullPackages) {
317 set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
318 uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
319 }
320 uids->push_back(DEFAULT_PULL_UID);
321 }
322
clearCache()323 void ShellSubscriberClient::clearCache() {
324 mProtoOut.clear();
325 mCacheSize = 0;
326 }
327
triggerFdFlush()328 void ShellSubscriberClient::triggerFdFlush() {
329 attemptWriteToPipeLocked();
330 clearCache();
331 }
332
triggerCallback(StatsSubscriptionCallbackReason reason)333 void ShellSubscriberClient::triggerCallback(StatsSubscriptionCallbackReason reason) {
334 // Invoke Binder callback with cached event data.
335 vector<uint8_t> payloadBytes;
336 mProtoOut.serializeToVector(&payloadBytes);
337 const Status status = mCallback->onSubscriptionData(reason, payloadBytes);
338 if (status.getStatus() == STATUS_DEAD_OBJECT &&
339 status.getExceptionCode() == EX_TRANSACTION_FAILED) {
340 mClientAlive = false;
341 return;
342 }
343
344 mLastWriteMs = getElapsedRealtimeMillis();
345 clearCache();
346 }
347
flush()348 void ShellSubscriberClient::flush() {
349 triggerCallback(StatsSubscriptionCallbackReason::FLUSH_REQUESTED);
350 }
351
onUnsubscribe()352 void ShellSubscriberClient::onUnsubscribe() {
353 triggerCallback(StatsSubscriptionCallbackReason::SUBSCRIPTION_ENDED);
354 }
355
addAllAtomIds(LogEventFilter::AtomIdSet & allAtomIds) const356 void ShellSubscriberClient::addAllAtomIds(LogEventFilter::AtomIdSet& allAtomIds) const {
357 for (const auto& matcher : mPushedMatchers) {
358 allAtomIds.insert(matcher.atom_id());
359 }
360 }
361
362 } // namespace statsd
363 } // namespace os
364 } // namespace android
365