1 /*
2 * Copyright (c) 2021, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "TelemetryServer.h"
18
19 #include "CarTelemetryImpl.h"
20 #include "RingBuffer.h"
21
22 #include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
23 #include <android-base/logging.h>
24
25 #include <inttypes.h> // for PRIu64 and friends
26
27 #include <cstdint>
28 #include <memory>
29
30 namespace android {
31 namespace automotive {
32 namespace telemetry {
33
34 namespace {
35
36 using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
37 using ::aidl::android::automotive::telemetry::internal::ICarDataListener;
38 using ::aidl::android::frameworks::automotive::telemetry::CarData;
39 using ::aidl::android::frameworks::automotive::telemetry::ICarTelemetryCallback;
40 using ::android::base::Error;
41 using ::android::base::Result;
42
43 constexpr int kMsgPushCarDataToListener = 1;
44
45 // If ICarDataListener cannot accept data, the next push should be delayed little bit to allow
46 // the listener to recover.
47 constexpr const std::chrono::seconds kPushCarDataFailureDelaySeconds = 1s;
48 } // namespace
49
TelemetryServer(LooperWrapper * looper,const std::chrono::nanoseconds & pushCarDataDelayNs,const int maxBufferSize)50 TelemetryServer::TelemetryServer(LooperWrapper* looper,
51 const std::chrono::nanoseconds& pushCarDataDelayNs,
52 const int maxBufferSize) :
53 mLooper(looper),
54 mPushCarDataDelayNs(pushCarDataDelayNs),
55 mRingBuffer(maxBufferSize),
56 mMessageHandler(new MessageHandlerImpl(this)) {}
57
setListener(const std::shared_ptr<ICarDataListener> & listener)58 void TelemetryServer::setListener(const std::shared_ptr<ICarDataListener>& listener) {
59 const std::scoped_lock<std::mutex> lock(mMutex);
60 mCarDataListener = listener;
61 mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
62 kMsgPushCarDataToListener);
63 }
64
clearListener()65 void TelemetryServer::clearListener() {
66 const std::scoped_lock<std::mutex> lock(mMutex);
67 if (mCarDataListener == nullptr) {
68 return;
69 }
70 mCarDataListener = nullptr;
71 mLooper->removeMessages(mMessageHandler, kMsgPushCarDataToListener);
72 }
73
findCarDataIdsIntersection(const std::vector<int32_t> & ids)74 std::vector<int32_t> TelemetryServer::findCarDataIdsIntersection(const std::vector<int32_t>& ids) {
75 std::vector<int32_t> interestedIds;
76 for (int32_t id : ids) {
77 if (mCarDataIds.find(id) != mCarDataIds.end()) {
78 interestedIds.push_back(id);
79 }
80 }
81 return interestedIds;
82 }
83
addCarDataIds(const std::vector<int32_t> & ids)84 void TelemetryServer::addCarDataIds(const std::vector<int32_t>& ids) {
85 const std::scoped_lock<std::mutex> lock(mMutex);
86 mCarDataIds.insert(ids.cbegin(), ids.cend());
87 std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction> invokedCallbacks;
88 LOG(VERBOSE) << "Received addCarDataIds call from CarTelemetryService, notifying callbacks";
89 for (int32_t id : ids) {
90 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
91 // prevent out of range exception when calling unordered_map.at()
92 continue;
93 }
94 const auto& callbacksForId = mIdToCallbacksMap.at(id);
95 LOG(VERBOSE) << "Invoking " << callbacksForId.size() << " callbacks for ID=" << id;
96 for (const TelemetryCallback& tc : callbacksForId) {
97 if (invokedCallbacks.find(tc) != invokedCallbacks.end()) {
98 // skipping already invoked callbacks
99 continue;
100 }
101 invokedCallbacks.insert(tc);
102 ndk::ScopedAStatus status =
103 tc.callback->onChange(findCarDataIdsIntersection(tc.config.carDataIds));
104 if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
105 status.getStatus() == STATUS_DEAD_OBJECT) {
106 LOG(WARNING) << "Failed to invoke onChange() on a dead object, removing callback";
107 removeCallback(tc.callback);
108 }
109 }
110 }
111 }
112
removeCarDataIds(const std::vector<int32_t> & ids)113 void TelemetryServer::removeCarDataIds(const std::vector<int32_t>& ids) {
114 const std::scoped_lock<std::mutex> lock(mMutex);
115 for (int32_t id : ids) {
116 mCarDataIds.erase(id);
117 }
118 std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction> invokedCallbacks;
119 LOG(VERBOSE) << "Received removeCarDataIds call from CarTelemetryService, notifying callbacks";
120 for (int32_t id : ids) {
121 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
122 // prevent out of range exception when calling unordered_map.at()
123 continue;
124 }
125 const auto& callbacksForId = mIdToCallbacksMap.at(id);
126 LOG(VERBOSE) << "Invoking " << callbacksForId.size() << " callbacks for ID=" << id;
127 for (const TelemetryCallback& tc : callbacksForId) {
128 if (invokedCallbacks.find(tc) != invokedCallbacks.end()) {
129 // skipping already invoked callbacks
130 continue;
131 }
132 invokedCallbacks.insert(tc);
133 ndk::ScopedAStatus status =
134 tc.callback->onChange(findCarDataIdsIntersection(tc.config.carDataIds));
135 if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
136 status.getStatus() == STATUS_DEAD_OBJECT) {
137 LOG(WARNING) << "Failed to invoke onChange() on a dead object, removing callback";
138 removeCallback(tc.callback);
139 }
140 }
141 }
142 }
143
getListener()144 std::shared_ptr<ICarDataListener> TelemetryServer::getListener() {
145 const std::scoped_lock<std::mutex> lock(mMutex);
146 return mCarDataListener;
147 }
148
dump(int fd)149 void TelemetryServer::dump(int fd) {
150 const std::scoped_lock<std::mutex> lock(mMutex);
151 dprintf(fd, " TelemetryServer:\n");
152 mRingBuffer.dump(fd);
153 }
154
addCallback(const CallbackConfig & config,const std::shared_ptr<ICarTelemetryCallback> & callback)155 Result<void> TelemetryServer::addCallback(const CallbackConfig& config,
156 const std::shared_ptr<ICarTelemetryCallback>& callback) {
157 const std::scoped_lock<std::mutex> lock(mMutex);
158 TelemetryCallback cb(config, callback);
159 if (mCallbacks.find(cb) != mCallbacks.end()) {
160 const std::string msg = "The ICarTelemetryCallback already exists. "
161 "Use removeCarTelemetryCallback() to remove it first";
162 LOG(WARNING) << msg;
163 return Error(EX_ILLEGAL_ARGUMENT) << msg;
164 }
165
166 mCallbacks.insert(cb);
167
168 // link each interested CarData ID with the new callback
169 for (int32_t id : config.carDataIds) {
170 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
171 mIdToCallbacksMap[id] =
172 std::unordered_set<TelemetryCallback, TelemetryCallback::HashFunction>{cb};
173 } else {
174 mIdToCallbacksMap.at(id).insert(cb);
175 }
176 LOG(VERBOSE) << "CarData ID=" << id << " has " << mIdToCallbacksMap.at(id).size()
177 << " associated callbacks";
178 }
179
180 std::vector<int32_t> interestedIds = findCarDataIdsIntersection(config.carDataIds);
181 if (interestedIds.size() == 0) {
182 return {};
183 }
184 LOG(VERBOSE) << "Notifying new callback with active CarData IDs";
185 ndk::ScopedAStatus status = callback->onChange(interestedIds);
186 if (status.getExceptionCode() == EX_TRANSACTION_FAILED &&
187 status.getStatus() == STATUS_DEAD_OBJECT) {
188 removeCallback(callback);
189 return Error(EX_ILLEGAL_ARGUMENT)
190 << "Failed to invoke onChange() on a dead object, removing callback";
191 }
192 return {};
193 }
194
removeCallback(const std::shared_ptr<ICarTelemetryCallback> & callback)195 Result<void> TelemetryServer::removeCallback(
196 const std::shared_ptr<ICarTelemetryCallback>& callback) {
197 const std::scoped_lock<std::mutex> lock(mMutex);
198 auto it = mCallbacks.find(TelemetryCallback(callback));
199 if (it == mCallbacks.end()) {
200 constexpr char msg[] = "Attempting to remove a CarTelemetryCallback that does not exist";
201 LOG(WARNING) << msg;
202 return Error(EX_ILLEGAL_ARGUMENT) << msg;
203 }
204
205 const TelemetryCallback& tc = *it;
206 // unlink callback from ID in the mIdToCallbacksMap
207 for (int32_t id : tc.config.carDataIds) {
208 if (mIdToCallbacksMap.find(id) == mIdToCallbacksMap.end()) {
209 LOG(ERROR) << "The callback is not linked to its interested IDs.";
210 continue;
211 }
212 auto& associatedCallbacks = mIdToCallbacksMap.at(id);
213 auto associatedCallbackIterator = associatedCallbacks.find(tc);
214 if (associatedCallbackIterator == associatedCallbacks.end()) {
215 continue;
216 }
217 associatedCallbacks.erase(associatedCallbackIterator);
218 LOG(VERBOSE) << "After unlinking a callback from ID=" << id << ", the ID has "
219 << mIdToCallbacksMap.at(id).size() << " associated callbacks";
220 if (associatedCallbacks.size() == 0) {
221 mIdToCallbacksMap.erase(id);
222 }
223 }
224
225 mCallbacks.erase(it);
226 LOG(VERBOSE) << "After removeCallback, there are " << mCallbacks.size()
227 << " callbacks in cartelemetryd";
228 return {};
229 }
230
writeCarData(const std::vector<CarData> & dataList,uid_t publisherUid)231 void TelemetryServer::writeCarData(const std::vector<CarData>& dataList, uid_t publisherUid) {
232 const std::scoped_lock<std::mutex> lock(mMutex);
233 bool bufferWasEmptyBefore = mRingBuffer.size() == 0;
234 for (auto&& data : dataList) {
235 // ignore data that has no subscribers in CarTelemetryService
236 if (mCarDataIds.find(data.id) == mCarDataIds.end()) {
237 LOG(VERBOSE) << "Ignoring CarData with ID=" << data.id;
238 continue;
239 }
240 mRingBuffer.push({.mId = data.id,
241 .mContent = std::move(data.content),
242 .mPublisherUid = publisherUid});
243 }
244 // If the mRingBuffer was not empty, the message is already scheduled. It prevents scheduling
245 // too many unnecessary idendical messages in the looper.
246 if (mCarDataListener != nullptr && bufferWasEmptyBefore && mRingBuffer.size() > 0) {
247 mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
248 kMsgPushCarDataToListener);
249 }
250 }
251
252 // Runs on the main thread.
pushCarDataToListeners()253 void TelemetryServer::pushCarDataToListeners() {
254 std::vector<CarDataInternal> pendingCarDataInternals;
255 {
256 const std::scoped_lock<std::mutex> lock(mMutex);
257 // Remove extra messages.
258 mLooper->removeMessages(mMessageHandler, kMsgPushCarDataToListener);
259 if (mCarDataListener == nullptr || mRingBuffer.size() == 0) {
260 return;
261 }
262 // Push elements to pendingCarDataInternals in reverse order so we can send data
263 // from the back of the pendingCarDataInternals vector.
264 while (mRingBuffer.size() > 0) {
265 auto carData = std::move(mRingBuffer.popBack());
266 CarDataInternal data;
267 data.id = carData.mId;
268 data.content = std::move(carData.mContent);
269 pendingCarDataInternals.push_back(data);
270 }
271 }
272
273 // TODO(b/186477983): send data in batch to improve performance, but careful sending too
274 // many data at once, as it could clog the Binder - it has <1MB limit.
275 while (!pendingCarDataInternals.empty()) {
276 ndk::ScopedAStatus status = ndk::ScopedAStatus::ok();
277 {
278 const std::scoped_lock<std::mutex> lock(mMutex);
279 if (mCarDataListener != nullptr) {
280 status = mCarDataListener->onCarDataReceived({pendingCarDataInternals.back()});
281 } else {
282 status = ndk::ScopedAStatus::
283 fromServiceSpecificErrorWithMessage(EX_NULL_POINTER,
284 "mCarDataListener is currently set to "
285 "null, will try again.");
286 }
287 }
288 if (!status.isOk()) {
289 LOG(WARNING) << "Failed to push CarDataInternal, will try again. Status: "
290 << status.getStatus()
291 << ", service-specific error: " << status.getServiceSpecificError()
292 << ", message: " << status.getMessage()
293 << ", exception code: " << status.getExceptionCode()
294 << ", description: " << status.getDescription();
295 sleep(kPushCarDataFailureDelaySeconds.count());
296 } else {
297 pendingCarDataInternals.pop_back();
298 }
299 }
300 }
301
MessageHandlerImpl(TelemetryServer * server)302 TelemetryServer::MessageHandlerImpl::MessageHandlerImpl(TelemetryServer* server) :
303 mTelemetryServer(server) {}
304
handleMessage(const Message & message)305 void TelemetryServer::MessageHandlerImpl::handleMessage(const Message& message) {
306 switch (message.what) {
307 case kMsgPushCarDataToListener:
308 mTelemetryServer->pushCarDataToListeners();
309 break;
310 default:
311 LOG(WARNING) << "Unknown message: " << message.what;
312 }
313 }
314
315 } // namespace telemetry
316 } // namespace automotive
317 } // namespace android
318