• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define STATSD_DEBUG false  // STOPSHIP if true
17 #include "Log.h"
18 
19 #include "ShellSubscriber.h"
20 
21 #include <android-base/file.h>
22 #include <inttypes.h>
23 #include <utils/Timers.h>
24 
25 #include "stats_log_util.h"
26 
27 using aidl::android::os::IStatsSubscriptionCallback;
28 
29 namespace android {
30 namespace os {
31 namespace statsd {
32 
~ShellSubscriber()33 ShellSubscriber::~ShellSubscriber() {
34     {
35         std::unique_lock<std::mutex> lock(mMutex);
36         mClientSet.clear();
37         updateLogEventFilterLocked();
38     }
39     mThreadSleepCV.notify_one();
40     if (mThread.joinable()) {
41         mThread.join();
42     }
43 }
44 
startNewSubscription(int in,int out,int64_t timeoutSec)45 bool ShellSubscriber::startNewSubscription(int in, int out, int64_t timeoutSec) {
46     std::unique_lock<std::mutex> lock(mMutex);
47     VLOG("ShellSubscriber: new subscription has come in");
48     if (mClientSet.size() >= kMaxSubscriptions) {
49         ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
50               "%zu. Limit: %zu",
51               mClientSet.size(), kMaxSubscriptions);
52         return false;
53     }
54 
55     return startNewSubscriptionLocked(ShellSubscriberClient::create(
56             in, out, timeoutSec, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
57 }
58 
startNewSubscription(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback)59 bool ShellSubscriber::startNewSubscription(const vector<uint8_t>& subscriptionConfig,
60                                            const shared_ptr<IStatsSubscriptionCallback>& callback) {
61     std::unique_lock<std::mutex> lock(mMutex);
62     VLOG("ShellSubscriber: new subscription has come in");
63     if (mClientSet.size() >= kMaxSubscriptions) {
64         ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
65               "%zu. Limit: %zu",
66               mClientSet.size(), kMaxSubscriptions);
67         return false;
68     }
69 
70     return startNewSubscriptionLocked(ShellSubscriberClient::create(
71             subscriptionConfig, callback, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
72 }
73 
startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client)74 bool ShellSubscriber::startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client) {
75     if (client == nullptr) return false;
76 
77     // Add new valid client to the client set
78     mClientSet.insert(std::move(client));
79     updateLogEventFilterLocked();
80 
81     // Only spawn one thread to manage pulling atoms and sending
82     // heartbeats.
83     if (!mThreadAlive) {
84         mThreadAlive = true;
85         if (mThread.joinable()) {
86             mThread.join();
87         }
88         mThread = thread([this] { pullAndSendHeartbeats(); });
89     }
90 
91     return true;
92 }
93 
94 // Sends heartbeat signals and sleeps between doing work
pullAndSendHeartbeats()95 void ShellSubscriber::pullAndSendHeartbeats() {
96     VLOG("ShellSubscriber: helper thread starting");
97     std::unique_lock<std::mutex> lock(mMutex);
98     while (true) {
99         int64_t sleepTimeMs = 24 * 60 * 60 * 1000;  // 24 hours.
100         const int64_t nowNanos = getElapsedRealtimeNs();
101         const int64_t nowMillis = nanoseconds_to_milliseconds(nowNanos);
102         const int64_t nowSecs = nanoseconds_to_seconds(nowNanos);
103         for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
104             int64_t subscriptionSleepMs =
105                     (*clientIt)->pullAndSendHeartbeatsIfNeeded(nowSecs, nowMillis, nowNanos);
106             sleepTimeMs = std::min(sleepTimeMs, subscriptionSleepMs);
107             if ((*clientIt)->isAlive()) {
108                 ++clientIt;
109             } else {
110                 VLOG("ShellSubscriber: removing client!");
111                 clientIt = mClientSet.erase(clientIt);
112                 updateLogEventFilterLocked();
113             }
114         }
115         if (mClientSet.empty()) {
116             mThreadAlive = false;
117             VLOG("ShellSubscriber: helper thread done!");
118             return;
119         }
120         VLOG("ShellSubscriber: helper thread sleeping for %" PRId64 "ms", sleepTimeMs);
121         mThreadSleepCV.wait_for(lock, sleepTimeMs * 1ms, [this] { return mClientSet.empty(); });
122     }
123 }
124 
onLogEvent(const LogEvent & event)125 void ShellSubscriber::onLogEvent(const LogEvent& event) {
126     // Skip if event is skipped
127     if (event.isParsedHeaderOnly()) {
128         return;
129     }
130     // Skip RestrictedLogEvents
131     if (event.isRestricted()) {
132         return;
133     }
134     std::unique_lock<std::mutex> lock(mMutex);
135     for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
136         (*clientIt)->onLogEvent(event);
137         if ((*clientIt)->isAlive()) {
138             ++clientIt;
139         } else {
140             VLOG("ShellSubscriber: removing client!");
141             clientIt = mClientSet.erase(clientIt);
142             updateLogEventFilterLocked();
143         }
144     }
145 }
146 
flushSubscription(const shared_ptr<IStatsSubscriptionCallback> & callback)147 void ShellSubscriber::flushSubscription(const shared_ptr<IStatsSubscriptionCallback>& callback) {
148     std::unique_lock<std::mutex> lock(mMutex);
149 
150     // TODO(b/268822860): Consider storing callback clients in a map keyed by
151     // IStatsSubscriptionCallback to avoid this linear search.
152     for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
153         if ((*clientIt)->hasCallback(callback)) {
154             if ((*clientIt)->isAlive()) {
155                 (*clientIt)->flush();
156             } else {
157                 VLOG("ShellSubscriber: removing client!");
158 
159                 // Erasing a value moves the iterator to the next value. The update expression also
160                 // moves the iterator, skipping a value. This is fine because we do an early return
161                 // before next iteration of the loop.
162                 clientIt = mClientSet.erase(clientIt);
163                 updateLogEventFilterLocked();
164             }
165             return;
166         }
167     }
168 }
169 
unsubscribe(const shared_ptr<IStatsSubscriptionCallback> & callback)170 void ShellSubscriber::unsubscribe(const shared_ptr<IStatsSubscriptionCallback>& callback) {
171     std::unique_lock<std::mutex> lock(mMutex);
172 
173     // TODO(b/268822860): Consider storing callback clients in a map keyed by
174     // IStatsSubscriptionCallback to avoid this linear search.
175     for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
176         if ((*clientIt)->hasCallback(callback)) {
177             if ((*clientIt)->isAlive()) {
178                 (*clientIt)->onUnsubscribe();
179             }
180             VLOG("ShellSubscriber: removing client!");
181 
182             // Erasing a value moves the iterator to the next value. The update expression also
183             // moves the iterator, skipping a value. This is fine because we do an early return
184             // before next iteration of the loop.
185             clientIt = mClientSet.erase(clientIt);
186             updateLogEventFilterLocked();
187             return;
188         }
189     }
190 }
191 
updateLogEventFilterLocked() const192 void ShellSubscriber::updateLogEventFilterLocked() const {
193     VLOG("ShellSubscriber: Updating allAtomIds");
194     if (!mLogEventFilter) {
195         return;
196     }
197     LogEventFilter::AtomIdSet allAtomIds;
198     for (const auto& client : mClientSet) {
199         client->addAllAtomIds(allAtomIds);
200     }
201     VLOG("ShellSubscriber: Updating allAtomIds done. Total atoms %d", (int)allAtomIds.size());
202     mLogEventFilter->setAtomIds(std::move(allAtomIds), this);
203 }
204 
205 }  // namespace statsd
206 }  // namespace os
207 }  // namespace android
208