• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <aidl/android/os/BnPullAtomCallback.h>
18 #include <aidl/android/os/IPullAtomResultReceiver.h>
19 #include <aidl/android/os/IStatsd.h>
20 #include <aidl/android/util/StatsEventParcel.h>
21 #include <android/binder_auto_utils.h>
22 #include <android/binder_ibinder.h>
23 #include <android/binder_manager.h>
24 #include <stats_event.h>
25 #include <stats_pull_atom_callback.h>
26 
27 #include <map>
28 #include <queue>
29 #include <thread>
30 #include <vector>
31 
32 #include "utils.h"
33 
34 using Status = ::ndk::ScopedAStatus;
35 using aidl::android::os::BnPullAtomCallback;
36 using aidl::android::os::IPullAtomResultReceiver;
37 using aidl::android::os::IStatsd;
38 using aidl::android::util::StatsEventParcel;
39 using ::ndk::SharedRefBase;
40 
41 struct AStatsEventList {
42     std::vector<AStatsEvent*> data;
43 };
44 
AStatsEventList_addStatsEvent(AStatsEventList * pull_data)45 AStatsEvent* AStatsEventList_addStatsEvent(AStatsEventList* pull_data) {
46     AStatsEvent* event = AStatsEvent_obtain();
47     pull_data->data.push_back(event);
48     return event;
49 }
50 
51 constexpr int64_t DEFAULT_COOL_DOWN_MILLIS = 1000LL;  // 1 second.
52 constexpr int64_t DEFAULT_TIMEOUT_MILLIS = 1500LL;    // 1.5 seconds.
53 
54 struct AStatsManager_PullAtomMetadata {
55     int64_t cool_down_millis;
56     int64_t timeout_millis;
57     std::vector<int32_t> additive_fields;
58 };
59 
AStatsManager_PullAtomMetadata_obtain()60 AStatsManager_PullAtomMetadata* AStatsManager_PullAtomMetadata_obtain() {
61     AStatsManager_PullAtomMetadata* metadata = new AStatsManager_PullAtomMetadata();
62     metadata->cool_down_millis = DEFAULT_COOL_DOWN_MILLIS;
63     metadata->timeout_millis = DEFAULT_TIMEOUT_MILLIS;
64     metadata->additive_fields = std::vector<int32_t>();
65     return metadata;
66 }
67 
AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata * metadata)68 void AStatsManager_PullAtomMetadata_release(AStatsManager_PullAtomMetadata* metadata) {
69     delete metadata;
70 }
71 
AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata * metadata,int64_t cool_down_millis)72 void AStatsManager_PullAtomMetadata_setCoolDownMillis(AStatsManager_PullAtomMetadata* metadata,
73                                                       int64_t cool_down_millis) {
74     metadata->cool_down_millis = cool_down_millis;
75 }
76 
AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata * metadata)77 int64_t AStatsManager_PullAtomMetadata_getCoolDownMillis(AStatsManager_PullAtomMetadata* metadata) {
78     return metadata->cool_down_millis;
79 }
80 
AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata * metadata,int64_t timeout_millis)81 void AStatsManager_PullAtomMetadata_setTimeoutMillis(AStatsManager_PullAtomMetadata* metadata,
82                                                      int64_t timeout_millis) {
83     metadata->timeout_millis = timeout_millis;
84 }
85 
AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata * metadata)86 int64_t AStatsManager_PullAtomMetadata_getTimeoutMillis(AStatsManager_PullAtomMetadata* metadata) {
87     return metadata->timeout_millis;
88 }
89 
AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * additive_fields,int32_t num_fields)90 void AStatsManager_PullAtomMetadata_setAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
91                                                       int32_t* additive_fields,
92                                                       int32_t num_fields) {
93     metadata->additive_fields.assign(additive_fields, additive_fields + num_fields);
94 }
95 
AStatsManager_PullAtomMetadata_getNumAdditiveFields(AStatsManager_PullAtomMetadata * metadata)96 int32_t AStatsManager_PullAtomMetadata_getNumAdditiveFields(
97         AStatsManager_PullAtomMetadata* metadata) {
98     return metadata->additive_fields.size();
99 }
100 
AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata * metadata,int32_t * fields)101 void AStatsManager_PullAtomMetadata_getAdditiveFields(AStatsManager_PullAtomMetadata* metadata,
102                                                       int32_t* fields) {
103     std::copy(metadata->additive_fields.begin(), metadata->additive_fields.end(), fields);
104 }
105 
106 class StatsPullAtomCallbackInternal : public BnPullAtomCallback {
107   public:
StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback,void * cookie,const int64_t coolDownMillis,const int64_t timeoutMillis,const std::vector<int32_t> additiveFields)108     StatsPullAtomCallbackInternal(const AStatsManager_PullAtomCallback callback, void* cookie,
109                                   const int64_t coolDownMillis, const int64_t timeoutMillis,
110                                   const std::vector<int32_t> additiveFields)
111         : mCallback(callback),
112           mCookie(cookie),
113           mCoolDownMillis(coolDownMillis),
114           mTimeoutMillis(timeoutMillis),
115           mAdditiveFields(additiveFields) {}
116 
onPullAtom(int32_t atomTag,const std::shared_ptr<IPullAtomResultReceiver> & resultReceiver)117     Status onPullAtom(int32_t atomTag,
118                       const std::shared_ptr<IPullAtomResultReceiver>& resultReceiver) override {
119         AStatsEventList statsEventList;
120         int successInt = mCallback(atomTag, &statsEventList, mCookie);
121         bool success = successInt == AStatsManager_PULL_SUCCESS;
122 
123         // Convert stats_events into StatsEventParcels.
124         std::vector<StatsEventParcel> parcels;
125 
126         for (int i = 0; i < statsEventList.data.size(); i++) {
127             size_t size;
128             uint8_t* buffer = AStatsEvent_getBuffer(statsEventList.data[i], &size);
129 
130             StatsEventParcel p;
131             // vector.assign() creates a copy, but this is inevitable unless
132             // stats_event.h/c uses a vector as opposed to a buffer.
133             p.buffer.assign(buffer, buffer + size);
134             parcels.push_back(std::move(p));
135         }
136 
137         Status status = resultReceiver->pullFinished(atomTag, success, parcels);
138         if (!status.isOk()) {
139             std::vector<StatsEventParcel> emptyParcels;
140             resultReceiver->pullFinished(atomTag, /*success=*/false, emptyParcels);
141         }
142         for (int i = 0; i < statsEventList.data.size(); i++) {
143             AStatsEvent_release(statsEventList.data[i]);
144         }
145         return Status::ok();
146     }
147 
getCoolDownMillis() const148     int64_t getCoolDownMillis() const { return mCoolDownMillis; }
getTimeoutMillis() const149     int64_t getTimeoutMillis() const { return mTimeoutMillis; }
getAdditiveFields() const150     const std::vector<int32_t>& getAdditiveFields() const { return mAdditiveFields; }
151 
152   private:
153     const AStatsManager_PullAtomCallback mCallback;
154     void* mCookie;
155     const int64_t mCoolDownMillis;
156     const int64_t mTimeoutMillis;
157     const std::vector<int32_t> mAdditiveFields;
158 };
159 
160 /**
161  * @brief pullersMutex is used to guard simultaneous access to pullers from below threads
162  * Main thread
163  * - AStatsManager_setPullAtomCallback()
164  * - AStatsManager_clearPullAtomCallback()
165  * Binder thread:
166  * - StatsdProvider::binderDied()
167  */
168 static std::mutex pullersMutex;
169 
170 static std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullers;
171 
172 class StatsdProvider {
173 public:
StatsdProvider()174     StatsdProvider() : mDeathRecipient(AIBinder_DeathRecipient_new(binderDied)) {
175     }
176 
~StatsdProvider()177     ~StatsdProvider() {
178         resetStatsService();
179     }
180 
getStatsService()181     std::shared_ptr<IStatsd> getStatsService() {
182         // There are host unit tests which are using libstatspull
183         // Since we do not have statsd on host - the getStatsService() is no-op and
184         // should return nullptr
185 #ifdef __ANDROID__
186         std::lock_guard<std::mutex> lock(mStatsdMutex);
187         if (!mStatsd) {
188             // Fetch statsd
189             ndk::SpAIBinder binder(getStatsdBinder());
190             mStatsd = IStatsd::fromBinder(binder);
191             if (mStatsd) {
192                 AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), this);
193             }
194         }
195 #endif  //  __ANDROID__
196         return mStatsd;
197     }
198 
resetStatsService()199     void resetStatsService() {
200         std::lock_guard<std::mutex> lock(mStatsdMutex);
201         mStatsd = nullptr;
202     }
203 
binderDied(void * cookie)204     static void binderDied(void* cookie) {
205         StatsdProvider* statsProvider = static_cast<StatsdProvider*>(cookie);
206         statsProvider->resetStatsService();
207 
208         std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
209         if (statsService == nullptr) {
210             return;
211         }
212 
213         // Since we do not want to make an IPC with the lock held, we first create a
214         // copy of the data with the lock held before iterating through the map.
215         std::map<int32_t, std::shared_ptr<StatsPullAtomCallbackInternal>> pullersCopy;
216         {
217             std::lock_guard<std::mutex> lock(pullersMutex);
218             pullersCopy = pullers;
219         }
220         for (const auto& it : pullersCopy) {
221             statsService->registerNativePullAtomCallback(it.first, it.second->getCoolDownMillis(),
222                                                          it.second->getTimeoutMillis(),
223                                                          it.second->getAdditiveFields(), it.second);
224         }
225     }
226 
227 private:
228     /**
229      * @brief mStatsdMutex is used to guard simultaneous access to mStatsd from below threads:
230      * Work thread
231      * - registerStatsPullAtomCallbackBlocking()
232      * - unregisterStatsPullAtomCallbackBlocking()
233      * Binder thread:
234      * - StatsdProvider::binderDied()
235      */
236     std::mutex mStatsdMutex;
237     std::shared_ptr<IStatsd> mStatsd;
238     ::ndk::ScopedAIBinder_DeathRecipient mDeathRecipient;
239 };
240 
241 static std::shared_ptr<StatsdProvider> statsProvider = std::make_shared<StatsdProvider>();
242 
registerStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider,std::shared_ptr<StatsPullAtomCallbackInternal> cb)243 void registerStatsPullAtomCallbackBlocking(int32_t atomTag,
244                                            std::shared_ptr<StatsdProvider> statsProvider,
245                                            std::shared_ptr<StatsPullAtomCallbackInternal> cb) {
246     const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
247     if (statsService == nullptr) {
248         // Statsd not available
249         return;
250     }
251 
252     statsService->registerNativePullAtomCallback(
253             atomTag, cb->getCoolDownMillis(), cb->getTimeoutMillis(), cb->getAdditiveFields(), cb);
254 }
255 
unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,std::shared_ptr<StatsdProvider> statsProvider)256 void unregisterStatsPullAtomCallbackBlocking(int32_t atomTag,
257                                              std::shared_ptr<StatsdProvider> statsProvider) {
258     const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
259     if (statsService == nullptr) {
260         // Statsd not available
261         return;
262     }
263 
264     statsService->unregisterNativePullAtomCallback(atomTag);
265 }
266 
267 class CallbackOperationsHandler {
268     struct Cmd {
269         enum Type { CMD_REGISTER, CMD_UNREGISTER };
270 
271         Type type;
272         int atomTag;
273         std::shared_ptr<StatsPullAtomCallbackInternal> callback;
274     };
275 
276 public:
~CallbackOperationsHandler()277     ~CallbackOperationsHandler() {
278         if (mWorkThread.joinable()) {
279             mWorkThread.join();
280         }
281     }
282 
getInstance()283     static CallbackOperationsHandler& getInstance() {
284         static CallbackOperationsHandler handler;
285         return handler;
286     }
287 
registerCallback(int atomTag,std::shared_ptr<StatsPullAtomCallbackInternal> callback)288     void registerCallback(int atomTag, std::shared_ptr<StatsPullAtomCallbackInternal> callback) {
289         auto registerCmd = std::make_unique<Cmd>();
290         registerCmd->type = Cmd::CMD_REGISTER;
291         registerCmd->atomTag = atomTag;
292         registerCmd->callback = std::move(callback);
293         pushToQueue(std::move(registerCmd));
294 
295         startWorkerThread();
296     }
297 
unregisterCallback(int atomTag)298     void unregisterCallback(int atomTag) {
299         auto unregisterCmd = std::make_unique<Cmd>();
300         unregisterCmd->type = Cmd::CMD_UNREGISTER;
301         unregisterCmd->atomTag = atomTag;
302         pushToQueue(std::move(unregisterCmd));
303 
304         startWorkerThread();
305     }
306 
307 private:
308     std::atomic_bool mThreadAlive = false;
309     std::thread mWorkThread;
310 
311     std::mutex mMutex;
312     std::queue<std::unique_ptr<Cmd>> mCmdQueue;
313 
CallbackOperationsHandler()314     CallbackOperationsHandler() {
315     }
316 
pushToQueue(std::unique_ptr<Cmd> cmd)317     void pushToQueue(std::unique_ptr<Cmd> cmd) {
318         std::unique_lock<std::mutex> lock(mMutex);
319         mCmdQueue.push(std::move(cmd));
320     }
321 
startWorkerThread()322     void startWorkerThread() {
323         // Only spawn one thread to manage requests
324         if (mThreadAlive) {
325             return;
326         }
327         mThreadAlive = true;
328         if (mWorkThread.joinable()) {
329             mWorkThread.join();
330         }
331         mWorkThread = std::thread(&CallbackOperationsHandler::processCommands, this, statsProvider);
332     }
333 
processCommands(std::shared_ptr<StatsdProvider> statsProvider)334     void processCommands(std::shared_ptr<StatsdProvider> statsProvider) {
335         /**
336          * First trying to obtain stats service instance
337          * This is a blocking call, which waits on service readiness
338          */
339         const std::shared_ptr<IStatsd> statsService = statsProvider->getStatsService();
340 
341         if (!statsService) {
342             // Statsd not available - dropping all submitted command requests
343             std::queue<std::unique_ptr<Cmd>> emptyQueue;
344             std::unique_lock<std::mutex> lock(mMutex);
345             mCmdQueue.swap(emptyQueue);
346             mThreadAlive = false;
347             return;
348         }
349 
350         while (true) {
351             std::unique_ptr<Cmd> cmd = nullptr;
352             {
353                 /**
354                  * To guarantee sequential commands processing we need to lock mutex queue
355                  */
356                 std::unique_lock<std::mutex> lock(mMutex);
357                 if (mCmdQueue.empty()) {
358                     mThreadAlive = false;
359                     return;
360                 }
361 
362                 cmd = std::move(mCmdQueue.front());
363                 mCmdQueue.pop();
364             }
365 
366             switch (cmd->type) {
367                 case Cmd::CMD_REGISTER: {
368                     registerStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider,
369                                                           cmd->callback);
370                     break;
371                 }
372                 case Cmd::CMD_UNREGISTER: {
373                     unregisterStatsPullAtomCallbackBlocking(cmd->atomTag, statsProvider);
374                     break;
375                 }
376             }
377         }
378     }
379 };
380 
AStatsManager_setPullAtomCallback(int32_t atom_tag,AStatsManager_PullAtomMetadata * metadata,AStatsManager_PullAtomCallback callback,void * cookie)381 void AStatsManager_setPullAtomCallback(int32_t atom_tag, AStatsManager_PullAtomMetadata* metadata,
382                                        AStatsManager_PullAtomCallback callback, void* cookie) {
383     int64_t coolDownMillis =
384             metadata == nullptr ? DEFAULT_COOL_DOWN_MILLIS : metadata->cool_down_millis;
385     int64_t timeoutMillis = metadata == nullptr ? DEFAULT_TIMEOUT_MILLIS : metadata->timeout_millis;
386 
387     std::vector<int32_t> additiveFields;
388     if (metadata != nullptr) {
389         additiveFields = metadata->additive_fields;
390     }
391 
392     std::shared_ptr<StatsPullAtomCallbackInternal> callbackBinder =
393             SharedRefBase::make<StatsPullAtomCallbackInternal>(callback, cookie, coolDownMillis,
394                                                                timeoutMillis, additiveFields);
395 
396     {
397         std::lock_guard<std::mutex> lock(pullersMutex);
398         // Always add to the map. If statsd is dead, we will add them when it comes back.
399         pullers[atom_tag] = callbackBinder;
400     }
401 
402     CallbackOperationsHandler::getInstance().registerCallback(atom_tag, callbackBinder);
403 }
404 
AStatsManager_clearPullAtomCallback(int32_t atom_tag)405 void AStatsManager_clearPullAtomCallback(int32_t atom_tag) {
406     {
407         std::lock_guard<std::mutex> lock(pullersMutex);
408         // Always remove the puller from our map.
409         // If statsd is down, we will not register it when it comes back.
410         pullers.erase(atom_tag);
411     }
412 
413     CallbackOperationsHandler::getInstance().unregisterCallback(atom_tag);
414 }
415