1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.car.telemetry.databroker; 18 19 import static com.android.car.telemetry.CarTelemetryService.DEBUG; 20 21 import android.annotation.NonNull; 22 import android.annotation.Nullable; 23 import android.car.builtin.util.Slogf; 24 import android.car.builtin.util.TimingsTraceLog; 25 import android.car.telemetry.TelemetryProto; 26 import android.car.telemetry.TelemetryProto.MetricsConfig; 27 import android.car.telemetry.TelemetryProto.TelemetryError; 28 import android.content.ComponentName; 29 import android.content.Context; 30 import android.content.Intent; 31 import android.content.ServiceConnection; 32 import android.content.pm.PackageInfo; 33 import android.content.pm.PackageManager; 34 import android.os.Handler; 35 import android.os.HandlerThread; 36 import android.os.IBinder; 37 import android.os.Looper; 38 import android.os.Message; 39 import android.os.ParcelFileDescriptor; 40 import android.os.PersistableBundle; 41 import android.os.RemoteException; 42 import android.os.UserHandle; 43 import android.util.ArrayMap; 44 import android.util.Log; 45 import android.util.SparseIntArray; 46 47 import com.android.car.CarLog; 48 import com.android.car.CarServiceUtils; 49 import com.android.car.internal.LargeParcelable; 50 import com.android.car.telemetry.CarTelemetryService; 51 import com.android.car.telemetry.ResultStore; 52 import com.android.car.telemetry.publisher.AbstractPublisher; 53 import com.android.car.telemetry.publisher.PublisherFactory; 54 import com.android.car.telemetry.scriptexecutorinterface.BundleList; 55 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor; 56 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener; 57 import com.android.car.telemetry.util.IoUtils; 58 import com.android.internal.annotations.VisibleForTesting; 59 60 import java.io.IOException; 61 import java.io.OutputStream; 62 import java.lang.ref.WeakReference; 63 import java.util.ArrayList; 64 import java.util.Iterator; 65 import java.util.List; 66 import java.util.concurrent.PriorityBlockingQueue; 67 68 /** 69 * Implementation of the data path component of CarTelemetryService. Forwards the published data 70 * from publishers to consumers subject to the Controller's decision. 71 * All methods should be called from the telemetry thread unless otherwise specified as thread-safe. 72 */ 73 public class DataBrokerImpl implements DataBroker { 74 75 @VisibleForTesting 76 static final int MSG_HANDLE_TASK = 1; 77 @VisibleForTesting 78 static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2; 79 @VisibleForTesting 80 static final int MSG_STOP_HANGING_SCRIPT = 3; 81 82 /** Bind to script executor 5 times before entering disabled state. */ 83 private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5; 84 85 /** Maximum wait time for a script to finish. */ 86 private static final long MAX_SCRIPT_EXECUTION_TIME_MILLIS = 30_000L; // 30 seconds 87 88 private static final String[] SCRIPT_EXECUTOR_PACKAGE_CANDIDATES = 89 {"com.android.car.scriptexecutor", "com.google.android.car.scriptexecutor"}; 90 private static final String SCRIPT_EXECUTOR_CLASS = 91 "com.android.car.scriptexecutor.ScriptExecutor"; 92 93 private final Context mContext; 94 private final PublisherFactory mPublisherFactory; 95 private final ResultStore mResultStore; 96 private final ScriptExecutorListener mScriptExecutorListener; 97 private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread( 98 CarTelemetryService.class.getSimpleName()); 99 private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper()); 100 101 /** Thread-safe priority queue for scheduling tasks. */ 102 private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue = 103 new PriorityBlockingQueue<>(); 104 105 /** 106 * Index is the type of {@link TelemetryProto.Publisher}, value is the number of tasks pending 107 * script execution that are produced by that publisher. 108 */ 109 private final SparseIntArray mPublisherCountArray = new SparseIntArray(); 110 111 /** 112 * Maps MetricsConfig name to its subscriptions. This map is useful for removing MetricsConfigs. 113 */ 114 private final ArrayMap<String, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>(); 115 116 /** 117 * If something irrecoverable happened, DataBroker should enter into a disabled state to prevent 118 * doing futile work. 119 */ 120 private boolean mDisabled = false; 121 122 /** Current number of attempts to bind to ScriptExecutor. */ 123 private int mBindScriptExecutorAttempts = 0; 124 125 /** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */ 126 private int mPriority = 1; 127 128 /** Waiting period between attempts to bind script executor. Can be shortened for tests. */ 129 @VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L; 130 131 /** 132 * Name of the {@link MetricsConfig} that is currently running. 133 * A non-null value indicates ScriptExecutor is currently running this config, which means 134 * DataBroker should not make another ScriptExecutor binder call. 135 */ 136 private String mCurrentMetricsConfigName; 137 private IScriptExecutor mScriptExecutor; 138 private DataBrokerListener mDataBrokerListener; 139 140 /** 141 * Used only for the purpose of tracking the duration of running a script. The duration 142 * starts before the ScriptExecutor binder call and ends when a status is returned via 143 * ScriptExecutorListener or when the binder call throws an exception. 144 */ 145 private TimingsTraceLog mScriptExecutionTraceLog; 146 147 private final ServiceConnection mServiceConnection = new ServiceConnection() { 148 @Override 149 public void onServiceConnected(ComponentName name, IBinder service) { 150 mTelemetryHandler.post(() -> { 151 mScriptExecutor = IScriptExecutor.Stub.asInterface(service); 152 scheduleNextTask(); 153 }); 154 } 155 156 @Override 157 public void onServiceDisconnected(ComponentName name) { 158 // TODO(b/198684473): clean up the state after script executor disconnects 159 mTelemetryHandler.post(() -> { 160 unbindScriptExecutor(); 161 }); 162 } 163 }; 164 165 private final AbstractPublisher.PublisherListener mPublisherListener = 166 new AbstractPublisher.PublisherListener() { 167 @Override 168 public void onPublisherFailure( 169 @NonNull List<TelemetryProto.MetricsConfig> affectedConfigs, 170 @Nullable Throwable error) { 171 Slogf.w(CarLog.TAG_TELEMETRY, "Publisher failed", error); 172 // when a publisher fails, construct an TelemetryError result and send to client 173 String stackTrace = null; 174 if (error != null) { 175 stackTrace = Log.getStackTraceString(error); 176 } 177 TelemetryError telemetryError = buildTelemetryError( 178 TelemetryError.ErrorType.PUBLISHER_FAILED, "Publisher failed", stackTrace); 179 for (TelemetryProto.MetricsConfig config : affectedConfigs) { 180 // this will remove the MetricsConfig and notify the client of result 181 mDataBrokerListener.onReportFinished(config.getName(), telemetryError); 182 } 183 } 184 185 @Override 186 public void onConfigFinished(@NonNull TelemetryProto.MetricsConfig metricsConfig) { 187 String configName = metricsConfig.getName(); 188 Slogf.i(CarLog.TAG_TELEMETRY, 189 "Publisher sets MetricsConfig(" + configName + ") as finished"); 190 mDataBrokerListener.onReportFinished(configName); 191 } 192 }; 193 DataBrokerImpl( @onNull Context context, @NonNull PublisherFactory publisherFactory, @NonNull ResultStore resultStore, @NonNull TimingsTraceLog traceLog)194 public DataBrokerImpl( 195 @NonNull Context context, 196 @NonNull PublisherFactory publisherFactory, 197 @NonNull ResultStore resultStore, 198 @NonNull TimingsTraceLog traceLog) { 199 mContext = context; 200 mPublisherFactory = publisherFactory; 201 mResultStore = resultStore; 202 mScriptExecutorListener = new ScriptExecutorListener(this); 203 mPublisherFactory.initialize(mPublisherListener); 204 mScriptExecutionTraceLog = traceLog; 205 } 206 207 @Nullable findExecutorPackage()208 private String findExecutorPackage() { 209 PackageInfo info = null; 210 for (int i = 0; i < SCRIPT_EXECUTOR_PACKAGE_CANDIDATES.length; i++) { 211 try { 212 info = mContext.getPackageManager().getPackageInfo( 213 SCRIPT_EXECUTOR_PACKAGE_CANDIDATES[i], /* flags= */ 0); 214 if (info != null) { 215 break; 216 } 217 } catch (PackageManager.NameNotFoundException e) { 218 // ignore 219 } 220 } 221 if (info == null) { 222 return null; 223 } 224 return info.packageName; 225 } 226 bindScriptExecutor()227 private void bindScriptExecutor() { 228 // do not re-bind if broker is in a disabled state or if script executor is nonnull 229 if (mDisabled || mScriptExecutor != null) { 230 return; 231 } 232 String executorPackage = findExecutorPackage(); 233 if (executorPackage == null) { 234 Slogf.w(CarLog.TAG_TELEMETRY, "Cannot find executor package"); 235 return; 236 } 237 Intent intent = new Intent(); 238 intent.setComponent(new ComponentName(executorPackage, SCRIPT_EXECUTOR_CLASS)); 239 boolean success = mContext.bindServiceAsUser( 240 intent, 241 mServiceConnection, 242 Context.BIND_AUTO_CREATE, 243 UserHandle.SYSTEM); 244 if (success) { 245 mBindScriptExecutorAttempts = 0; // reset 246 return; 247 } 248 unbindScriptExecutor(); 249 mBindScriptExecutorAttempts++; 250 if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) { 251 Slogf.w(CarLog.TAG_TELEMETRY, 252 "failed to get valid connection to ScriptExecutor, retrying in " 253 + mBindScriptExecutorDelayMillis + "ms."); 254 mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR, 255 mBindScriptExecutorDelayMillis); 256 } else { 257 Slogf.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, " 258 + "disabling DataBroker"); 259 disableBroker(); 260 } 261 } 262 263 /** 264 * Unbinds {@link ScriptExecutor} to release the connection. This method should be called from 265 * the telemetry thread. 266 */ unbindScriptExecutor()267 private void unbindScriptExecutor() { 268 // TODO(b/198648763): unbind from script executor when there is no work to do 269 // if a script is running while we unbind from ScriptExecutor, end trace log first 270 if (mCurrentMetricsConfigName != null) { 271 mScriptExecutionTraceLog.traceEnd(); 272 mCurrentMetricsConfigName = null; 273 } 274 mScriptExecutor = null; 275 try { 276 mContext.unbindService(mServiceConnection); 277 } catch (IllegalArgumentException e) { 278 // If ScriptExecutor is gone before unbinding, it will throw this exception 279 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e); 280 } 281 } 282 283 /** 284 * Enters into a disabled state because something irrecoverable happened. 285 * TODO(b/200841260): expose the state to the caller. 286 */ disableBroker()287 private void disableBroker() { 288 mDisabled = true; 289 // remove all MetricConfigs, disable all publishers, stop receiving data 290 for (String configName : mSubscriptionMap.keySet()) { 291 // get the metrics config from the DataSubscriber and remove the metrics config 292 if (mSubscriptionMap.get(configName).size() != 0) { 293 removeMetricsConfig(configName); 294 } 295 } 296 mSubscriptionMap.clear(); 297 } 298 299 @Override addMetricsConfig( @onNull String metricsConfigName, @NonNull MetricsConfig metricsConfig)300 public void addMetricsConfig( 301 @NonNull String metricsConfigName, @NonNull MetricsConfig metricsConfig) { 302 // TODO(b/187743369): pass status back to caller 303 // if broker is disabled or metricsConfig already exists, do nothing 304 if (mDisabled || mSubscriptionMap.containsKey(metricsConfigName)) { 305 return; 306 } 307 // Create the subscribers for this metrics configuration 308 List<DataSubscriber> dataSubscribers = new ArrayList<>( 309 metricsConfig.getSubscribersList().size()); 310 for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) { 311 if (subscriber.getPriority() < 0) { 312 throw new IllegalArgumentException("Subscribers must have non-negative priority"); 313 } 314 // protobuf publisher to a concrete Publisher 315 AbstractPublisher publisher = mPublisherFactory.getPublisher( 316 subscriber.getPublisher().getPublisherCase()); 317 // create DataSubscriber from TelemetryProto.Subscriber 318 DataSubscriber dataSubscriber = new DataSubscriber( 319 this, 320 metricsConfig, 321 subscriber); 322 dataSubscribers.add(dataSubscriber); 323 // addDataSubscriber could throw an exception, let CarTelemetryService handle it 324 publisher.addDataSubscriber(dataSubscriber); 325 } 326 mSubscriptionMap.put(metricsConfigName, dataSubscribers); 327 } 328 329 @Override removeMetricsConfig(@onNull String metricsConfigName)330 public void removeMetricsConfig(@NonNull String metricsConfigName) { 331 // TODO(b/187743369): pass status back to caller 332 if (!mSubscriptionMap.containsKey(metricsConfigName)) { 333 return; 334 } 335 // get the subscriptions associated with this MetricsConfig, remove it from the map 336 List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(metricsConfigName); 337 // for each subscriber, remove it from publishers 338 for (DataSubscriber subscriber : dataSubscribers) { 339 AbstractPublisher publisher = mPublisherFactory.getPublisher( 340 subscriber.getPublisherParam().getPublisherCase()); 341 try { 342 publisher.removeDataSubscriber(subscriber); 343 } catch (IllegalArgumentException e) { 344 // It shouldn't happen, but if happens, let's just log it. 345 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e); 346 } 347 } 348 // Remove all the tasks associated with this metrics config. The underlying impl uses the 349 // weakly consistent iterator, which is thread-safe but does not freeze the collection while 350 // iterating, so it may or may not reflect any updates since the iterator was created. 351 // But since adding & polling from queue should happen in the same thread, the task queue 352 // should not be changed while tasks are being iterated and removed. 353 Iterator<ScriptExecutionTask> it = mTaskQueue.iterator(); 354 while (it.hasNext()) { 355 ScriptExecutionTask task = it.next(); 356 if (task.isAssociatedWithMetricsConfig(metricsConfigName)) { 357 mTaskQueue.remove(task); 358 mPublisherCountArray.append( 359 task.getPublisherType(), 360 mPublisherCountArray.get(task.getPublisherType()) - 1); 361 } 362 } 363 } 364 365 @Override removeAllMetricsConfigs()366 public void removeAllMetricsConfigs() { 367 mPublisherFactory.removeAllDataSubscribers(); 368 mSubscriptionMap.clear(); 369 mTaskQueue.clear(); 370 mPublisherCountArray.clear(); 371 } 372 373 @Override addTaskToQueue(@onNull ScriptExecutionTask task)374 public int addTaskToQueue(@NonNull ScriptExecutionTask task) { 375 if (mDisabled) { 376 return mPublisherCountArray.get(task.getPublisherType()); 377 } 378 mTaskQueue.add(task); 379 mPublisherCountArray.append( 380 task.getPublisherType(), 381 mPublisherCountArray.get(task.getPublisherType()) + 1); 382 scheduleNextTask(); 383 return mPublisherCountArray.get(task.getPublisherType()); 384 } 385 386 /** 387 * This method can be called from any thread. 388 * It is possible for this method to be invoked from different threads at the same time, but 389 * it is not possible to schedule the same task twice, because the handler handles message 390 * in the order they come in, this means the task will be polled sequentially instead of 391 * concurrently. Every task that is scheduled and run will be distinct. 392 * TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask 393 * will also have different threading behavior. Update javadoc when the behavior is decided. 394 */ 395 @Override scheduleNextTask()396 public void scheduleNextTask() { 397 if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) { 398 return; 399 } 400 mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK); 401 } 402 403 @Override setDataBrokerListener(@onNull DataBrokerListener dataBrokerListener)404 public void setDataBrokerListener(@NonNull DataBrokerListener dataBrokerListener) { 405 if (mDisabled) { 406 return; 407 } 408 mDataBrokerListener = dataBrokerListener; 409 } 410 411 @Override setTaskExecutionPriority(int priority)412 public void setTaskExecutionPriority(int priority) { 413 if (mDisabled) { 414 return; 415 } 416 if (priority == mPriority) { 417 return; 418 } 419 mPriority = priority; 420 scheduleNextTask(); // when priority updates, schedule a task which checks task queue 421 } 422 423 @VisibleForTesting 424 @NonNull getSubscriptionMap()425 ArrayMap<String, List<DataSubscriber>> getSubscriptionMap() { 426 return new ArrayMap<>(mSubscriptionMap); 427 } 428 429 @VisibleForTesting 430 @NonNull getTelemetryHandler()431 Handler getTelemetryHandler() { 432 return mTelemetryHandler; 433 } 434 435 @VisibleForTesting 436 @NonNull getTaskQueue()437 PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() { 438 return mTaskQueue; 439 } 440 441 /** 442 * Polls and runs a task from the head of the priority queue if the queue is nonempty and the 443 * head of the queue has priority higher than or equal to the current priority. A higher 444 * priority is denoted by a lower priority number, so head of the queue should have equal or 445 * lower priority number to be polled. 446 */ pollAndExecuteTask()447 private void pollAndExecuteTask() { 448 // check databroker state is ready to run script 449 if (mDisabled || mCurrentMetricsConfigName != null) { 450 Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, disabled or no config."); 451 return; 452 } 453 // check task is valid and ready to be run 454 ScriptExecutionTask task = mTaskQueue.peek(); 455 if (task == null || task.getPriority() > mPriority) { 456 Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, either task is null or low priority"); 457 return; 458 } 459 // if script executor is null, bind service 460 if (mScriptExecutor == null) { 461 Slogf.w(CarLog.TAG_TELEMETRY, "script executor is null, binding to script executor"); 462 // upon successful binding, a task will be scheduled to run if there are any 463 mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR); 464 return; 465 } 466 mTaskQueue.poll(); // remove task from queue 467 mPublisherCountArray.append( 468 task.getPublisherType(), 469 mPublisherCountArray.get(task.getPublisherType()) - 1); 470 471 if (task.bypassScriptExecutor()) { 472 // delegate to DataBrokerListener to handle storing data and scheduling next task 473 mDataBrokerListener.onMetricsReport(task.getMetricsConfig().getName(), 474 task.getData(), /* state= */ null); 475 return; 476 } 477 478 // update current config name because a script is currently running 479 mCurrentMetricsConfigName = task.getMetricsConfig().getName(); 480 mScriptExecutionTraceLog.traceBegin( 481 "executing script " + mCurrentMetricsConfigName); 482 try { 483 if (task.isLargeData()) { 484 if (DEBUG) { 485 Slogf.d(CarLog.TAG_TELEMETRY, 486 "Running with large func %s of %s in ScriptExecutor.", 487 task.getHandlerName(), 488 mCurrentMetricsConfigName); 489 } 490 invokeScriptForLargeInput(task); 491 } else if (task.isBundleList()) { 492 if (DEBUG) { 493 Slogf.d(CarLog.TAG_TELEMETRY, 494 "Running with bundle list func %s of %s in ScriptExecutor.", 495 task.getHandlerName(), 496 mCurrentMetricsConfigName); 497 } 498 invokeScriptForBundleList(task); 499 } else { 500 if (DEBUG) { 501 Slogf.d(CarLog.TAG_TELEMETRY, "Running func %s of %s in ScriptExecutor.", 502 task.getHandlerName(), 503 mCurrentMetricsConfigName); 504 } 505 mScriptExecutor.invokeScript( 506 task.getMetricsConfig().getScript(), 507 task.getHandlerName(), 508 task.getData(), 509 mResultStore.getInterimResult(mCurrentMetricsConfigName), 510 mScriptExecutorListener); 511 } 512 mTelemetryHandler.sendEmptyMessageDelayed( 513 MSG_STOP_HANGING_SCRIPT, MAX_SCRIPT_EXECUTION_TIME_MILLIS); 514 } catch (RemoteException e) { 515 mScriptExecutionTraceLog.traceEnd(); 516 Slogf.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e); 517 unbindScriptExecutor(); 518 addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor 519 } catch (IOException e) { 520 mScriptExecutionTraceLog.traceEnd(); 521 Slogf.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data" 522 + " to ScriptExecutor. Skipping the published data", e); 523 mCurrentMetricsConfigName = null; 524 scheduleNextTask(); // drop this task and schedule the next one 525 } 526 } 527 528 /** 529 * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the 530 * script input to the pipe. 531 * 532 * @param task containing all the necessary parameters for ScriptExecutor API. 533 * @throws IOException if cannot create pipe or cannot write the bundle to pipe. 534 * @throws RemoteException if ScriptExecutor failed. 535 */ invokeScriptForLargeInput(@onNull ScriptExecutionTask task)536 private void invokeScriptForLargeInput(@NonNull ScriptExecutionTask task) 537 throws IOException, RemoteException { 538 ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe(); 539 ParcelFileDescriptor readFd = fds[0]; 540 ParcelFileDescriptor writeFd = fds[1]; 541 try { 542 mScriptExecutor.invokeScriptForLargeInput( 543 task.getMetricsConfig().getScript(), 544 task.getHandlerName(), 545 readFd, 546 mResultStore.getInterimResult(mCurrentMetricsConfigName), 547 mScriptExecutorListener); 548 } catch (RemoteException e) { 549 IoUtils.closeQuietly(readFd); 550 IoUtils.closeQuietly(writeFd); 551 throw e; 552 } 553 IoUtils.closeQuietly(readFd); 554 555 Slogf.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe"); 556 try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) { 557 task.getData().writeToStream(outputStream); 558 } 559 } 560 561 /** 562 * Sends bundle list with LargeParcelable mechanism. 563 * 564 * @param task containing all the necessary parameters for ScriptExecutor API. 565 * @throws RemoteException if ScriptExecutor failed. 566 */ invokeScriptForBundleList(@onNull ScriptExecutionTask task)567 private void invokeScriptForBundleList(@NonNull ScriptExecutionTask task) 568 throws RemoteException { 569 BundleList bl = new BundleList(); 570 bl.bundles = task.getBundleList(); 571 bl = (BundleList) LargeParcelable.toLargeParcelable( 572 bl, () -> { 573 BundleList bundleList = new BundleList(); 574 bundleList.bundles = new ArrayList<>(); 575 return bundleList; 576 }); 577 mScriptExecutor.invokeScriptForBundleList( 578 task.getMetricsConfig().getScript(), 579 task.getHandlerName(), 580 bl, 581 mResultStore.getInterimResult(mCurrentMetricsConfigName), 582 mScriptExecutorListener); 583 } 584 buildTelemetryError( @onNull TelemetryError.ErrorType errorType, @NonNull String message, @Nullable String stackTrace)585 private TelemetryError buildTelemetryError( 586 @NonNull TelemetryError.ErrorType errorType, 587 @NonNull String message, 588 @Nullable String stackTrace) { 589 TelemetryError.Builder error = TelemetryError.newBuilder() 590 .setErrorType(errorType) 591 .setMessage(message); 592 if (stackTrace != null) { 593 error.setStackTrace(stackTrace); 594 } 595 return error.build(); 596 } 597 598 /** 599 * This helper method should be called as soon as script execution returns. 600 * It returns the name of the MetricsConfig whose script returned. 601 */ endScriptExecution()602 private String endScriptExecution() { 603 mScriptExecutionTraceLog.traceEnd(); // end trace as soon as script completes running 604 mTelemetryHandler.removeMessages(MSG_STOP_HANGING_SCRIPT); // script did not hang 605 // get and set the mCurrentMetricsConfigName to null 606 String configName = mCurrentMetricsConfigName; 607 mCurrentMetricsConfigName = null; 608 return configName; 609 } 610 611 /** Stores final metrics and schedules the next task. */ onScriptFinished(@onNull PersistableBundle result)612 private void onScriptFinished(@NonNull PersistableBundle result) { 613 if (DEBUG) { 614 Slogf.d(CarLog.TAG_TELEMETRY, "A script finished, storing the final result."); 615 } 616 mTelemetryHandler.post(() -> { 617 String configName = endScriptExecution(); 618 if (configName == null) { 619 return; 620 } 621 // delegate to DataBrokerListener to handle storing data and scheduling next task 622 mDataBrokerListener.onReportFinished(configName, result); 623 }); 624 } 625 626 /** Stores interim metrics and schedules the next task. */ onScriptSuccess(@onNull PersistableBundle stateToPersist)627 private void onScriptSuccess(@NonNull PersistableBundle stateToPersist) { 628 if (DEBUG) { 629 Slogf.d(CarLog.TAG_TELEMETRY, "A script succeeded, storing the interim result."); 630 } 631 mTelemetryHandler.post(() -> { 632 String configName = endScriptExecution(); 633 if (configName == null) { 634 return; 635 } 636 // delegate to DataBrokerListener to handle storing data and scheduling next task 637 mDataBrokerListener.onEventConsumed(configName, stateToPersist); 638 }); 639 } 640 641 /** Stores telemetry error and schedules the next task. */ onScriptError( int errorType, @NonNull String message, @Nullable String stackTrace)642 private void onScriptError( 643 int errorType, @NonNull String message, @Nullable String stackTrace) { 644 if (DEBUG) { 645 Slogf.d(CarLog.TAG_TELEMETRY, "A script failed: %d %s\n%s", 646 errorType, message, stackTrace); 647 } 648 mTelemetryHandler.post(() -> { 649 String configName = endScriptExecution(); 650 if (configName == null) { 651 return; 652 } 653 // delegate to DataBrokerListener to handle storing data and scheduling next task 654 mDataBrokerListener.onReportFinished( 655 configName, 656 buildTelemetryError( 657 TelemetryError.ErrorType.forNumber(errorType), 658 message, 659 stackTrace)); 660 }); 661 } 662 onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)663 private void onMetricsReport( 664 @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) { 665 if (DEBUG) { 666 Slogf.d(CarLog.TAG_TELEMETRY, "A script produced a report without finishing."); 667 } 668 mTelemetryHandler.post(() -> { 669 String configName = endScriptExecution(); 670 if (configName == null) { 671 return; 672 } 673 mDataBrokerListener.onMetricsReport(configName, report, stateToPersist); 674 }); 675 } 676 677 /** Listens for script execution status. Methods are called on the binder thread. */ 678 private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub { 679 private final WeakReference<DataBrokerImpl> mWeakDataBroker; 680 ScriptExecutorListener(@onNull DataBrokerImpl dataBroker)681 private ScriptExecutorListener(@NonNull DataBrokerImpl dataBroker) { 682 mWeakDataBroker = new WeakReference<>(dataBroker); 683 } 684 685 @Override onScriptFinished(@onNull PersistableBundle result)686 public void onScriptFinished(@NonNull PersistableBundle result) { 687 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 688 if (dataBroker == null) { 689 return; 690 } 691 dataBroker.onScriptFinished(result); 692 } 693 694 @Override onSuccess(@onNull PersistableBundle stateToPersist)695 public void onSuccess(@NonNull PersistableBundle stateToPersist) { 696 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 697 if (dataBroker == null) { 698 return; 699 } 700 dataBroker.onScriptSuccess(stateToPersist); 701 } 702 703 @Override onError(int errorType, @NonNull String message, @Nullable String stackTrace)704 public void onError(int errorType, @NonNull String message, @Nullable String stackTrace) { 705 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 706 if (dataBroker == null) { 707 return; 708 } 709 dataBroker.onScriptError(errorType, message, stackTrace); 710 } 711 712 @Override onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)713 public void onMetricsReport( 714 @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) { 715 DataBrokerImpl dataBroker = mWeakDataBroker.get(); 716 if (dataBroker == null) { 717 return; 718 } 719 dataBroker.onMetricsReport(report, stateToPersist); 720 } 721 } 722 723 /** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */ 724 class TaskHandler extends Handler { TaskHandler(@onNull Looper looper)725 TaskHandler(@NonNull Looper looper) { 726 super(looper); 727 } 728 729 /** 730 * Handles a message depending on the message ID. 731 * If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a 732 * {@link ScriptExecutionTask}. There are multiple places where this message is sent: when 733 * priority updates, when a new task is added to the priority queue, and when a task 734 * finishes running. 735 */ 736 @Override handleMessage(@onNull Message msg)737 public void handleMessage(@NonNull Message msg) { 738 switch (msg.what) { 739 case MSG_HANDLE_TASK: 740 pollAndExecuteTask(); // run the next task 741 break; 742 case MSG_BIND_TO_SCRIPT_EXECUTOR: 743 bindScriptExecutor(); 744 break; 745 case MSG_STOP_HANGING_SCRIPT: 746 // TODO(b/223224704): log error 747 unbindScriptExecutor(); 748 scheduleNextTask(); 749 break; 750 default: 751 Slogf.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message."); 752 } 753 } 754 } 755 } 756