1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #define STATSD_DEBUG false // STOPSHIP if true
17 #include "Log.h"
18
19 #include "ShellSubscriber.h"
20
21 #include <android-base/file.h>
22 #include <inttypes.h>
23 #include <utils/Timers.h>
24
25 #include "stats_log_util.h"
26
27 using aidl::android::os::IStatsSubscriptionCallback;
28
29 namespace android {
30 namespace os {
31 namespace statsd {
32
~ShellSubscriber()33 ShellSubscriber::~ShellSubscriber() {
34 {
35 std::unique_lock<std::mutex> lock(mMutex);
36 mClientSet.clear();
37 updateLogEventFilterLocked();
38 }
39 mThreadSleepCV.notify_one();
40 if (mThread.joinable()) {
41 mThread.join();
42 }
43 }
44
startNewSubscription(int in,int out,int64_t timeoutSec)45 bool ShellSubscriber::startNewSubscription(int in, int out, int64_t timeoutSec) {
46 std::unique_lock<std::mutex> lock(mMutex);
47 VLOG("ShellSubscriber: new subscription has come in");
48 if (mClientSet.size() >= kMaxSubscriptions) {
49 ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
50 "%zu. Limit: %zu",
51 mClientSet.size(), kMaxSubscriptions);
52 return false;
53 }
54
55 return startNewSubscriptionLocked(ShellSubscriberClient::create(
56 in, out, timeoutSec, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
57 }
58
startNewSubscription(const vector<uint8_t> & subscriptionConfig,const shared_ptr<IStatsSubscriptionCallback> & callback)59 bool ShellSubscriber::startNewSubscription(const vector<uint8_t>& subscriptionConfig,
60 const shared_ptr<IStatsSubscriptionCallback>& callback) {
61 std::unique_lock<std::mutex> lock(mMutex);
62 VLOG("ShellSubscriber: new subscription has come in");
63 if (mClientSet.size() >= kMaxSubscriptions) {
64 ALOGE("ShellSubscriber: cannot have another active subscription. Current Subscriptions: "
65 "%zu. Limit: %zu",
66 mClientSet.size(), kMaxSubscriptions);
67 return false;
68 }
69
70 return startNewSubscriptionLocked(ShellSubscriberClient::create(
71 subscriptionConfig, callback, getElapsedRealtimeSec(), mUidMap, mPullerMgr));
72 }
73
startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client)74 bool ShellSubscriber::startNewSubscriptionLocked(unique_ptr<ShellSubscriberClient> client) {
75 if (client == nullptr) return false;
76
77 // Add new valid client to the client set
78 mClientSet.insert(std::move(client));
79 updateLogEventFilterLocked();
80
81 // Only spawn one thread to manage pulling atoms and sending
82 // heartbeats.
83 if (!mThreadAlive) {
84 mThreadAlive = true;
85 if (mThread.joinable()) {
86 mThread.join();
87 }
88 mThread = thread([this] { pullAndSendHeartbeats(); });
89 }
90
91 return true;
92 }
93
94 // Sends heartbeat signals and sleeps between doing work
pullAndSendHeartbeats()95 void ShellSubscriber::pullAndSendHeartbeats() {
96 VLOG("ShellSubscriber: helper thread starting");
97 std::unique_lock<std::mutex> lock(mMutex);
98 while (true) {
99 int64_t sleepTimeMs = 24 * 60 * 60 * 1000; // 24 hours.
100 const int64_t nowNanos = getElapsedRealtimeNs();
101 const int64_t nowMillis = nanoseconds_to_milliseconds(nowNanos);
102 const int64_t nowSecs = nanoseconds_to_seconds(nowNanos);
103 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
104 int64_t subscriptionSleepMs =
105 (*clientIt)->pullAndSendHeartbeatsIfNeeded(nowSecs, nowMillis, nowNanos);
106 sleepTimeMs = std::min(sleepTimeMs, subscriptionSleepMs);
107 if ((*clientIt)->isAlive()) {
108 ++clientIt;
109 } else {
110 VLOG("ShellSubscriber: removing client!");
111 clientIt = mClientSet.erase(clientIt);
112 updateLogEventFilterLocked();
113 }
114 }
115 if (mClientSet.empty()) {
116 mThreadAlive = false;
117 VLOG("ShellSubscriber: helper thread done!");
118 return;
119 }
120 VLOG("ShellSubscriber: helper thread sleeping for %" PRId64 "ms", sleepTimeMs);
121 mThreadSleepCV.wait_for(lock, sleepTimeMs * 1ms, [this] { return mClientSet.empty(); });
122 }
123 }
124
onLogEvent(const LogEvent & event)125 void ShellSubscriber::onLogEvent(const LogEvent& event) {
126 // Skip if event is skipped
127 if (event.isParsedHeaderOnly()) {
128 return;
129 }
130 // Skip RestrictedLogEvents
131 if (event.isRestricted()) {
132 return;
133 }
134 std::unique_lock<std::mutex> lock(mMutex);
135 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end();) {
136 (*clientIt)->onLogEvent(event);
137 if ((*clientIt)->isAlive()) {
138 ++clientIt;
139 } else {
140 VLOG("ShellSubscriber: removing client!");
141 clientIt = mClientSet.erase(clientIt);
142 updateLogEventFilterLocked();
143 }
144 }
145 }
146
flushSubscription(const shared_ptr<IStatsSubscriptionCallback> & callback)147 void ShellSubscriber::flushSubscription(const shared_ptr<IStatsSubscriptionCallback>& callback) {
148 std::unique_lock<std::mutex> lock(mMutex);
149
150 // TODO(b/268822860): Consider storing callback clients in a map keyed by
151 // IStatsSubscriptionCallback to avoid this linear search.
152 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
153 if ((*clientIt)->hasCallback(callback)) {
154 if ((*clientIt)->isAlive()) {
155 (*clientIt)->flush();
156 } else {
157 VLOG("ShellSubscriber: removing client!");
158
159 // Erasing a value moves the iterator to the next value. The update expression also
160 // moves the iterator, skipping a value. This is fine because we do an early return
161 // before next iteration of the loop.
162 clientIt = mClientSet.erase(clientIt);
163 updateLogEventFilterLocked();
164 }
165 return;
166 }
167 }
168 }
169
unsubscribe(const shared_ptr<IStatsSubscriptionCallback> & callback)170 void ShellSubscriber::unsubscribe(const shared_ptr<IStatsSubscriptionCallback>& callback) {
171 std::unique_lock<std::mutex> lock(mMutex);
172
173 // TODO(b/268822860): Consider storing callback clients in a map keyed by
174 // IStatsSubscriptionCallback to avoid this linear search.
175 for (auto clientIt = mClientSet.begin(); clientIt != mClientSet.end(); ++clientIt) {
176 if ((*clientIt)->hasCallback(callback)) {
177 if ((*clientIt)->isAlive()) {
178 (*clientIt)->onUnsubscribe();
179 }
180 VLOG("ShellSubscriber: removing client!");
181
182 // Erasing a value moves the iterator to the next value. The update expression also
183 // moves the iterator, skipping a value. This is fine because we do an early return
184 // before next iteration of the loop.
185 clientIt = mClientSet.erase(clientIt);
186 updateLogEventFilterLocked();
187 return;
188 }
189 }
190 }
191
updateLogEventFilterLocked() const192 void ShellSubscriber::updateLogEventFilterLocked() const {
193 VLOG("ShellSubscriber: Updating allAtomIds");
194 if (!mLogEventFilter) {
195 return;
196 }
197 LogEventFilter::AtomIdSet allAtomIds;
198 for (const auto& client : mClientSet) {
199 client->addAllAtomIds(allAtomIds);
200 }
201 VLOG("ShellSubscriber: Updating allAtomIds done. Total atoms %d", (int)allAtomIds.size());
202 mLogEventFilter->setAtomIds(std::move(allAtomIds), this);
203 }
204
205 } // namespace statsd
206 } // namespace os
207 } // namespace android
208