1 /*
2 * Copyright (C) 2019, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <aidl/android/os/BnPullAtomCallback.h>
18 #include <aidl/android/os/IPullAtomResultReceiver.h>
19 #include <aidl/android/os/IStatsd.h>
20 #include <aidl/android/util/StatsEventParcel.h>
21 #include <android/binder_auto_utils.h>
22 #include <android/binder_ibinder.h>
23 #include <android/binder_manager.h>
24 #include <stats_event.h>
25 #include <stats_pull_atom_callback.h>
26
27 #include <map>
28 #include <queue>
29 #include <thread>
30 #include <vector>
31
32 #include "utils.h"
33
34 using Status = ::ndk::ScopedAStatus;
35 using aidl::android::os::BnPullAtomCallback;
36 using aidl::android::os::IPullAtomResultReceiver;
37 using aidl::android::os::IStatsd;
38 using aidl::android::util::StatsEventParcel;
39 using ::ndk::SharedRefBase;
40
41 struct AStatsEventList {
42 std::vector<AStatsEvent*> data;
43 };
44
AStatsEventList_addStatsEvent(AStatsEventList * pull_data)45 AStatsEvent* AStatsEventList_addStatsEvent(AStatsEventList* pull_data) {
46 AStatsEvent* event = AStatsEvent_obtain();
47 pull_data->data.push_back(event);
48 return event;
49 }
50
51 constexpr int64_t DEFAULT_COOL_DOWN_MILLIS = 1000LL; // 1 second.
52 constexpr int64_t DEFAULT_TIMEOUT_MILLIS = 1500LL; // 1.5 seconds.
53
54 struct AStatsManager_PullAtomMetadata {
55 int64_t cool_down_millis;
56 int64_t timeout_millis;
57 std::vector<int32_t> additive_fields;
58 };
59
AStatsManager_PullAtomMetadata_obtain()60 AStatsManager_PullAtomMetadata* AStatsManager_PullAtomMetadata_obtain() {
61 AStatsManager_PullAtomMetadata* metadata = new AStatsManager_PullAtomMetadata();
62 metadata->cool_down_millis = DEFAULT_COOL_DOWN_MILLIS;
63 metadata->timeout_millis = DEFAULT_TIMEOUT_MILLIS;
64 metadata->additive_fields = std::vector<int32_t>();
65 return metadata;
66 }
67
AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata * metadata)68 void AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata* metadata) {
69 delete metadata;
70 }
71
AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata * metadata,int64_t cool_down_millis)72 void AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata* metadata,
73 int64_t cool_down_millis) {
74 metadata->cool_down_millis = cool_down_millis;
75 }
76
AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata * metadata)77 int64_t AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata* metadata) {
78 return metadata->cool_down_millis;
79 }
80
AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata * metadata,int64_t timeout_millis)81 void AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata* metadata,
82 int64_t timeout_millis) {
83 metadata->timeout_millis = timeout_millis;
84 }
85
AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata * metadata)86 int64_t AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata* metadata) {
87 return metadata->timeout_millis;
88 }
89
AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * additive_fields,int32_t num_fields)90 void AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
91 int32_t* additive_fields,
92 int32_t num_fields) {
93 metadata->additive_fields.assign(additive_fields, additive_fields + num_fields);
94 }
95
AStatsManager_PullAtomMetadata_getNumAdditiveFields(AStatsManager_PullAtomMetadata * metadata)96 int32_t AStatsManager_PullAtomMetadata_getNumAdditiveFields(
97 AStatsManager_PullAtomMetadata* metadata) {
98 return metadata->additive_fields.size();
99 }
100
AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * fields)101 void AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
102 int32_t* fields) {
103 std::copy(metadata->additive_fields.begin(), metadata->additive_fields.end(), fields);
104 }
105
106 class StatsPullAtomCallbackInternal : public BnPullAtomCallback {
107 public:
StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback,void * cookie,const int64_t coolDownMillis,const int64_t timeoutMillis,const std::vector<int32_t> additiveFields)108 StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback, void* cookie,
109 const int64_t coolDownMillis, const int64_t timeoutMillis,
110 const std::vector<int32_t> additiveFields)
111 : mCallback(callback),
112 mCookie(cookie),
113 mCoolDownMillis(coolDownMillis),
114 mTimeoutMillis(timeoutMillis),
115 mAdditiveFields(additiveFields) {}
116
onPullAtom(int32_t atomTag,const std::shared_ptr<IPullAtomResultReceiver> & resultReceiver)117 Status onPullAtom(int32_t atomTag,
118 const std::shared_ptr<IPullAtomResultReceiver>& resultReceiver) override {
119 AStatsEventList statsEventList;
120 int successInt = mCallback(atomTag, &statsEventList, mCookie);
121 bool success = successInt == AStatsManager_PULL_SUCCESS;
122
123 // Convert stats_events into StatsEventParcels.
124 std::vector<StatsEventParcel> parcels;
125
126 for (int i = 0; i < statsEventList.data.size(); i++) {
127 size_t size;
128 uint8_t* buffer = AStatsEvent_getBuffer(statsEventList.data[i], &size);
129
130 StatsEventParcel p;
131 // vector.assign() creates a copy, but this is inevitable unless
132 // stats_event.h/c uses a vector as opposed to a buffer.
133 p.buffer.assign(buffer, buffer + size);
134 parcels.push_back(std::move(p));
135 }
136
137 Status status = resultReceiver->pullFinished(atomTag, success, parcels);
138 if (!status.isOk()) {
139 std::vector<StatsEventParcel> emptyParcels;
140 resultReceiver->pullFinished(atomTag, /*success=*/false, emptyParcels);
141 }
142 for (int i = 0; i < statsEventList.data.size(); i++) {
143 AStatsEvent_release(statsEventList.data[i]);
144 }
145 return Status::ok();
146 }
147
getCoolDownMillis() const148 int64_t getCoolDownMillis() const { return mCoolDownMillis; }
getTimeoutMillis() const149 int64_t getTimeoutMillis() const { return mTimeoutMillis; }
getAdditiveFields() const150 const std::vector<int32_t>& getAdditiveFields() const { return mAdditiveFields; }
151
152 private:
153 const AStatsManager_PullAtomCallback mCallback;
154 void* mCookie;
155 const int64_t mCoolDownMillis;
156 const int64_t mTimeoutMillis;
157 const std::vector<int32_t> mAdditiveFields;
158 };
159
160 /**
161 * @brief pullersMutex is used to guard simultaneous access to pullers from below threads
162 * Main thread
163 * - AStatsManager_setPullAtomCallback()
164 * - AStatsManager_clearPullAtomCallback()
165 * Binder thread:
166 * - StatsdProvider::binderDied()
167 */
168 static std::mutex pullersMutex;
169
170 static std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullers;
171
172 class StatsdProvider {
173 public:
StatsdProvider()174 StatsdProvider() : mDeathRecipient(AIBinder_DeathRecipient_new(binderDied)) {
175 }
176
~StatsdProvider()177 ~StatsdProvider() {
178 resetStatsService();
179 }
180
getStatsService()181 std::shared_ptr<IStatsd> getStatsService() {
182 // There are host unit tests which are using libstatspull
183 // Since we do not have statsd on host - the getStatsService() is no-op and
184 // should return nullptr
185 #ifdef __ANDROID__
186 std::lock_guard<std::mutex> lock(mStatsdMutex);
187 if (!mStatsd) {
188 // Fetch statsd
189 ndk::SpAIBinder binder(getStatsdBinder());
190 mStatsd = IStatsd::fromBinder(binder);
191 if (mStatsd) {
192 AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), this);
193 }
194 }
195 #endif // __ANDROID__
196 return mStatsd;
197 }
198
resetStatsService()199 void resetStatsService() {
200 std::lock_guard<std::mutex> lock(mStatsdMutex);
201 mStatsd = nullptr;
202 }
203
binderDied(void * cookie)204 static void binderDied(void* cookie) {
205 StatsdProvider* statsProvider = static_cast<StatsdProvider*>(cookie);
206 statsProvider->resetStatsService();
207
208 std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
209 if (statsService == nullptr) {
210 return;
211 }
212
213 // Since we do not want to make an IPC with the lock held, we first create a
214 // copy of the data with the lock held before iterating through the map.
215 std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullersCopy;
216 {
217 std::lock_guard<std::mutex> lock(pullersMutex);
218 pullersCopy = pullers;
219 }
220 for (const auto& it : pullersCopy) {
221 statsService->registerNativePullAtomCallback(it.first, it.second->getCoolDownMillis(),
222 it.second->getTimeoutMillis(),
223 it.second->getAdditiveFields(), it.second);
224 }
225 }
226
227 private:
228 /**
229 * @brief mStatsdMutex is used to guard simultaneous access to mStatsd from below threads:
230 * Work thread
231 * - registerStatsPullAtomCallbackBlocking()
232 * - unregisterStatsPullAtomCallbackBlocking()
233 * Binder thread:
234 * - StatsdProvider::binderDied()
235 */
236 std::mutex mStatsdMutex;
237 std::shared_ptr<IStatsd> mStatsd;
238 ::ndk::ScopedAIBinder_DeathRecipient mDeathRecipient;
239 };
240
241 static std::shared_ptr<StatsdProvider> statsProvider = std::make_shared<StatsdProvider>();
242
registerStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider,std::shared_ptr<StatsPullAtomCallbackInternal> cb)243 void registerStatsPullAtomCallbackBlocking(int32_t atomTag,
244 std::shared_ptr<StatsdProvider> statsProvider,
245 std::shared_ptr<StatsPullAtomCallbackInternal> cb) {
246 const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
247 if (statsService == nullptr) {
248 // Statsd not available
249 return;
250 }
251
252 statsService->registerNativePullAtomCallback(
253 atomTag, cb->getCoolDownMillis(), cb->getTimeoutMillis(), cb->getAdditiveFields(), cb);
254 }
255
unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider)256 void unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,
257 std::shared_ptr<StatsdProvider> statsProvider) {
258 const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
259 if (statsService == nullptr) {
260 // Statsd not available
261 return;
262 }
263
264 statsService->unregisterNativePullAtomCallback(atomTag);
265 }
266
267 class CallbackOperationsHandler {
268 struct Cmd {
269 enum Type { CMD_REGISTER, CMD_UNREGISTER };
270
271 Type type;
272 int atomTag;
273 std::shared_ptr<StatsPullAtomCallbackInternal> callback;
274 };
275
276 public:
~CallbackOperationsHandler()277 ~CallbackOperationsHandler() {
278 if (mWorkThread.joinable()) {
279 mWorkThread.join();
280 }
281 }
282
getInstance()283 static CallbackOperationsHandler& getInstance() {
284 static CallbackOperationsHandler handler;
285 return handler;
286 }
287
registerCallback(int atomTag,std::shared_ptr<StatsPullAtomCallbackInternal> callback)288 void registerCallback(int atomTag, std::shared_ptr<StatsPullAtomCallbackInternal> callback) {
289 auto registerCmd = std::make_unique<Cmd>();
290 registerCmd->type = Cmd::CMD_REGISTER;
291 registerCmd->atomTag = atomTag;
292 registerCmd->callback = std::move(callback);
293 pushToQueue(std::move(registerCmd));
294
295 startWorkerThread();
296 }
297
unregisterCallback(int atomTag)298 void unregisterCallback(int atomTag) {
299 auto unregisterCmd = std::make_unique<Cmd>();
300 unregisterCmd->type = Cmd::CMD_UNREGISTER;
301 unregisterCmd->atomTag = atomTag;
302 pushToQueue(std::move(unregisterCmd));
303
304 startWorkerThread();
305 }
306
307 private:
308 std::atomic_bool mThreadAlive = false;
309 std::thread mWorkThread;
310
311 std::mutex mMutex;
312 std::queue<std::unique_ptr<Cmd>> mCmdQueue;
313
CallbackOperationsHandler()314 CallbackOperationsHandler() {
315 }
316
pushToQueue(std::unique_ptr<Cmd> cmd)317 void pushToQueue(std::unique_ptr<Cmd> cmd) {
318 std::unique_lock<std::mutex> lock(mMutex);
319 mCmdQueue.push(std::move(cmd));
320 }
321
startWorkerThread()322 void startWorkerThread() {
323 // Only spawn one thread to manage requests
324 if (mThreadAlive) {
325 return;
326 }
327 mThreadAlive = true;
328 if (mWorkThread.joinable()) {
329 mWorkThread.join();
330 }
331 mWorkThread = std::thread(&CallbackOperationsHandler::processCommands, this, statsProvider);
332 }
333
processCommands(std::shared_ptr<StatsdProvider> statsProvider)334 void processCommands(std::shared_ptr<StatsdProvider> statsProvider) {
335 /**
336 * First trying to obtain stats service instance
337 * This is a blocking call, which waits on service readiness
338 */
339 const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
340
341 if (!statsService) {
342 // Statsd not available - dropping all submitted command requests
343 std::queue<std::unique_ptr<Cmd>> emptyQueue;
344 std::unique_lock<std::mutex> lock(mMutex);
345 mCmdQueue.swap(emptyQueue);
346 mThreadAlive = false;
347 return;
348 }
349
350 while (true) {
351 std::unique_ptr<Cmd> cmd = nullptr;
352 {
353 /**
354 * To guarantee sequential commands processing we need to lock mutex queue
355 */
356 std::unique_lock<std::mutex> lock(mMutex);
357 if (mCmdQueue.empty()) {
358 mThreadAlive = false;
359 return;
360 }
361
362 cmd = std::move(mCmdQueue.front());
363 mCmdQueue.pop();
364 }
365
366 switch (cmd->type) {
367 case Cmd::CMD_REGISTER: {
368 registerStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider,
369 cmd->callback);
370 break;
371 }
372 case Cmd::CMD_UNREGISTER: {
373 unregisterStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider);
374 break;
375 }
376 }
377 }
378 }
379 };
380
AStatsManager_setPullAtomCallback(int32_t atom_tag,AStatsManager_PullAtomMetadata * metadata,AStatsManager_PullAtomCallback callback,void * cookie)381 void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomMetadata* metadata,
382 AStatsManager_PullAtomCallback callback, void* cookie) {
383 int64_t coolDownMillis =
384 metadata == nullptr ? DEFAULT_COOL_DOWN_MILLIS : metadata->cool_down_millis;
385 int64_t timeoutMillis = metadata == nullptr ? DEFAULT_TIMEOUT_MILLIS : metadata->timeout_millis;
386
387 std::vector<int32_t> additiveFields;
388 if (metadata != nullptr) {
389 additiveFields = metadata->additive_fields;
390 }
391
392 std::shared_ptr<StatsPullAtomCallbackInternal> callbackBinder =
393 SharedRefBase::make<StatsPullAtomCallbackInternal>(callback, cookie, coolDownMillis,
394 timeoutMillis, additiveFields);
395
396 {
397 std::lock_guard<std::mutex> lock(pullersMutex);
398 // Always add to the map. If statsd is dead, we will add them when it comes back.
399 pullers[atom_tag] = callbackBinder;
400 }
401
402 CallbackOperationsHandler::getInstance().registerCallback(atom_tag, callbackBinder);
403 }
404
AStatsManager_clearPullAtomCallback(int32_t atom_tag)405 void AStatsManager_clearPullAtomCallback(int32_t atom_tag) {
406 {
407 std::lock_guard<std::mutex> lock(pullersMutex);
408 // Always remove the puller from our map.
409 // If statsd is down, we will not register it when it comes back.
410 pullers.erase(atom_tag);
411 }
412
413 CallbackOperationsHandler::getInstance().unregisterCallback(atom_tag);
414 }
415