• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.car.telemetry.databroker;
18 
19 import static com.android.car.telemetry.CarTelemetryService.DEBUG;
20 
21 import android.annotation.NonNull;
22 import android.annotation.Nullable;
23 import android.car.builtin.util.Slogf;
24 import android.car.builtin.util.TimingsTraceLog;
25 import android.car.telemetry.TelemetryProto;
26 import android.car.telemetry.TelemetryProto.MetricsConfig;
27 import android.car.telemetry.TelemetryProto.TelemetryError;
28 import android.content.ComponentName;
29 import android.content.Context;
30 import android.content.Intent;
31 import android.content.ServiceConnection;
32 import android.content.pm.PackageInfo;
33 import android.content.pm.PackageManager;
34 import android.os.Handler;
35 import android.os.HandlerThread;
36 import android.os.IBinder;
37 import android.os.Looper;
38 import android.os.Message;
39 import android.os.ParcelFileDescriptor;
40 import android.os.PersistableBundle;
41 import android.os.RemoteException;
42 import android.os.UserHandle;
43 import android.util.ArrayMap;
44 import android.util.Log;
45 import android.util.SparseIntArray;
46 
47 import com.android.car.CarLog;
48 import com.android.car.CarServiceUtils;
49 import com.android.car.internal.LargeParcelable;
50 import com.android.car.telemetry.CarTelemetryService;
51 import com.android.car.telemetry.ResultStore;
52 import com.android.car.telemetry.publisher.AbstractPublisher;
53 import com.android.car.telemetry.publisher.PublisherFactory;
54 import com.android.car.telemetry.scriptexecutorinterface.BundleList;
55 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutor;
56 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener;
57 import com.android.car.telemetry.util.IoUtils;
58 import com.android.internal.annotations.VisibleForTesting;
59 
60 import java.io.IOException;
61 import java.io.OutputStream;
62 import java.lang.ref.WeakReference;
63 import java.util.ArrayList;
64 import java.util.Iterator;
65 import java.util.List;
66 import java.util.concurrent.PriorityBlockingQueue;
67 
68 /**
69  * Implementation of the data path component of CarTelemetryService. Forwards the published data
70  * from publishers to consumers subject to the Controller's decision.
71  * All methods should be called from the telemetry thread unless otherwise specified as thread-safe.
72  */
73 public class DataBrokerImpl implements DataBroker {
74 
75     @VisibleForTesting
76     static final int MSG_HANDLE_TASK = 1;
77     @VisibleForTesting
78     static final int MSG_BIND_TO_SCRIPT_EXECUTOR = 2;
79     @VisibleForTesting
80     static final int MSG_STOP_HANGING_SCRIPT = 3;
81 
82     /** Bind to script executor 5 times before entering disabled state. */
83     private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5;
84 
85     /** Maximum wait time for a script to finish. */
86     private static final long MAX_SCRIPT_EXECUTION_TIME_MILLIS = 30_000L; // 30 seconds
87 
88     private static final String[] SCRIPT_EXECUTOR_PACKAGE_CANDIDATES =
89             {"com.android.car.scriptexecutor", "com.google.android.car.scriptexecutor"};
90     private static final String SCRIPT_EXECUTOR_CLASS =
91             "com.android.car.scriptexecutor.ScriptExecutor";
92 
93     private final Context mContext;
94     private final PublisherFactory mPublisherFactory;
95     private final ResultStore mResultStore;
96     private final ScriptExecutorListener mScriptExecutorListener;
97     private final HandlerThread mTelemetryThread = CarServiceUtils.getHandlerThread(
98             CarTelemetryService.class.getSimpleName());
99     private final Handler mTelemetryHandler = new TaskHandler(mTelemetryThread.getLooper());
100 
101     /** Thread-safe priority queue for scheduling tasks. */
102     private final PriorityBlockingQueue<ScriptExecutionTask> mTaskQueue =
103             new PriorityBlockingQueue<>();
104 
105     /**
106      * Index is the type of {@link TelemetryProto.Publisher}, value is the number of tasks pending
107      * script execution that are produced by that publisher.
108      */
109     private final SparseIntArray mPublisherCountArray = new SparseIntArray();
110 
111     /**
112      * Maps MetricsConfig name to its subscriptions. This map is useful for removing MetricsConfigs.
113      */
114     private final ArrayMap<String, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>();
115 
116     /**
117      * If something irrecoverable happened, DataBroker should enter into a disabled state to prevent
118      * doing futile work.
119      */
120     private boolean mDisabled = false;
121 
122     /** Current number of attempts to bind to ScriptExecutor. */
123     private int mBindScriptExecutorAttempts = 0;
124 
125     /** Priority of current system to determine if a {@link ScriptExecutionTask} can run. */
126     private int mPriority = 1;
127 
128     /** Waiting period between attempts to bind script executor. Can be shortened for tests. */
129     @VisibleForTesting long mBindScriptExecutorDelayMillis = 3_000L;
130 
131     /**
132      * Name of the {@link MetricsConfig} that is currently running.
133      * A non-null value indicates ScriptExecutor is currently running this config, which means
134      * DataBroker should not make another ScriptExecutor binder call.
135      */
136     private String mCurrentMetricsConfigName;
137     private IScriptExecutor mScriptExecutor;
138     private DataBrokerListener mDataBrokerListener;
139 
140     /**
141      * Used only for the purpose of tracking the duration of running a script. The duration
142      * starts before the ScriptExecutor binder call and ends when a status is returned via
143      * ScriptExecutorListener or when the binder call throws an exception.
144      */
145     private TimingsTraceLog mScriptExecutionTraceLog;
146 
147     private final ServiceConnection mServiceConnection = new ServiceConnection() {
148         @Override
149         public void onServiceConnected(ComponentName name, IBinder service) {
150             mTelemetryHandler.post(() -> {
151                 mScriptExecutor = IScriptExecutor.Stub.asInterface(service);
152                 scheduleNextTask();
153             });
154         }
155 
156         @Override
157         public void onServiceDisconnected(ComponentName name) {
158             // TODO(b/198684473): clean up the state after script executor disconnects
159             mTelemetryHandler.post(() -> {
160                 unbindScriptExecutor();
161             });
162         }
163     };
164 
165     private final AbstractPublisher.PublisherListener mPublisherListener =
166             new AbstractPublisher.PublisherListener() {
167         @Override
168         public void onPublisherFailure(
169                 @NonNull List<TelemetryProto.MetricsConfig> affectedConfigs,
170                 @Nullable Throwable error) {
171             Slogf.w(CarLog.TAG_TELEMETRY, "Publisher failed", error);
172             // when a publisher fails, construct an TelemetryError result and send to client
173             String stackTrace = null;
174             if (error != null) {
175                 stackTrace = Log.getStackTraceString(error);
176             }
177             TelemetryError telemetryError = buildTelemetryError(
178                     TelemetryError.ErrorType.PUBLISHER_FAILED, "Publisher failed", stackTrace);
179             for (TelemetryProto.MetricsConfig config : affectedConfigs) {
180                 // this will remove the MetricsConfig and notify the client of result
181                 mDataBrokerListener.onReportFinished(config.getName(), telemetryError);
182             }
183         }
184 
185         @Override
186         public void onConfigFinished(@NonNull TelemetryProto.MetricsConfig metricsConfig) {
187             String configName = metricsConfig.getName();
188             Slogf.i(CarLog.TAG_TELEMETRY,
189                     "Publisher sets MetricsConfig(" + configName + ") as finished");
190             mDataBrokerListener.onReportFinished(configName);
191         }
192     };
193 
DataBrokerImpl( @onNull Context context, @NonNull PublisherFactory publisherFactory, @NonNull ResultStore resultStore, @NonNull TimingsTraceLog traceLog)194     public DataBrokerImpl(
195             @NonNull Context context,
196             @NonNull PublisherFactory publisherFactory,
197             @NonNull ResultStore resultStore,
198             @NonNull TimingsTraceLog traceLog) {
199         mContext = context;
200         mPublisherFactory = publisherFactory;
201         mResultStore = resultStore;
202         mScriptExecutorListener = new ScriptExecutorListener(this);
203         mPublisherFactory.initialize(mPublisherListener);
204         mScriptExecutionTraceLog = traceLog;
205     }
206 
207     @Nullable
findExecutorPackage()208     private String findExecutorPackage() {
209         PackageInfo info = null;
210         for (int i = 0; i < SCRIPT_EXECUTOR_PACKAGE_CANDIDATES.length; i++) {
211             try {
212                 info = mContext.getPackageManager().getPackageInfo(
213                         SCRIPT_EXECUTOR_PACKAGE_CANDIDATES[i], /* flags= */ 0);
214                 if (info != null) {
215                     break;
216                 }
217             } catch (PackageManager.NameNotFoundException e) {
218                 // ignore
219             }
220         }
221         if (info == null) {
222             return null;
223         }
224         return info.packageName;
225     }
226 
bindScriptExecutor()227     private void bindScriptExecutor() {
228         // do not re-bind if broker is in a disabled state or if script executor is nonnull
229         if (mDisabled || mScriptExecutor != null) {
230             return;
231         }
232         String executorPackage = findExecutorPackage();
233         if (executorPackage == null) {
234             Slogf.w(CarLog.TAG_TELEMETRY, "Cannot find executor package");
235             return;
236         }
237         Intent intent = new Intent();
238         intent.setComponent(new ComponentName(executorPackage, SCRIPT_EXECUTOR_CLASS));
239         boolean success = mContext.bindServiceAsUser(
240                 intent,
241                 mServiceConnection,
242                 Context.BIND_AUTO_CREATE,
243                 UserHandle.SYSTEM);
244         if (success) {
245             mBindScriptExecutorAttempts = 0; // reset
246             return;
247         }
248         unbindScriptExecutor();
249         mBindScriptExecutorAttempts++;
250         if (mBindScriptExecutorAttempts < MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS) {
251             Slogf.w(CarLog.TAG_TELEMETRY,
252                     "failed to get valid connection to ScriptExecutor, retrying in "
253                             + mBindScriptExecutorDelayMillis + "ms.");
254             mTelemetryHandler.sendEmptyMessageDelayed(MSG_BIND_TO_SCRIPT_EXECUTOR,
255                     mBindScriptExecutorDelayMillis);
256         } else {
257             Slogf.w(CarLog.TAG_TELEMETRY, "failed to get valid connection to ScriptExecutor, "
258                     + "disabling DataBroker");
259             disableBroker();
260         }
261     }
262 
263     /**
264      * Unbinds {@link ScriptExecutor} to release the connection. This method should be called from
265      * the telemetry thread.
266      */
unbindScriptExecutor()267     private void unbindScriptExecutor() {
268         // TODO(b/198648763): unbind from script executor when there is no work to do
269         // if a script is running while we unbind from ScriptExecutor, end trace log first
270         if (mCurrentMetricsConfigName != null) {
271             mScriptExecutionTraceLog.traceEnd();
272             mCurrentMetricsConfigName = null;
273         }
274         mScriptExecutor = null;
275         try {
276             mContext.unbindService(mServiceConnection);
277         } catch (IllegalArgumentException e) {
278             // If ScriptExecutor is gone before unbinding, it will throw this exception
279             Slogf.w(CarLog.TAG_TELEMETRY, "Failed to unbind from ScriptExecutor", e);
280         }
281     }
282 
283     /**
284      * Enters into a disabled state because something irrecoverable happened.
285      * TODO(b/200841260): expose the state to the caller.
286      */
disableBroker()287     private void disableBroker() {
288         mDisabled = true;
289         // remove all MetricConfigs, disable all publishers, stop receiving data
290         for (String configName : mSubscriptionMap.keySet()) {
291             // get the metrics config from the DataSubscriber and remove the metrics config
292             if (mSubscriptionMap.get(configName).size() != 0) {
293                 removeMetricsConfig(configName);
294             }
295         }
296         mSubscriptionMap.clear();
297     }
298 
299     @Override
addMetricsConfig( @onNull String metricsConfigName, @NonNull MetricsConfig metricsConfig)300     public void addMetricsConfig(
301             @NonNull String metricsConfigName, @NonNull MetricsConfig metricsConfig) {
302         // TODO(b/187743369): pass status back to caller
303         // if broker is disabled or metricsConfig already exists, do nothing
304         if (mDisabled || mSubscriptionMap.containsKey(metricsConfigName)) {
305             return;
306         }
307         // Create the subscribers for this metrics configuration
308         List<DataSubscriber> dataSubscribers = new ArrayList<>(
309                 metricsConfig.getSubscribersList().size());
310         for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) {
311             if (subscriber.getPriority() < 0) {
312                 throw new IllegalArgumentException("Subscribers must have non-negative priority");
313             }
314             // protobuf publisher to a concrete Publisher
315             AbstractPublisher publisher = mPublisherFactory.getPublisher(
316                     subscriber.getPublisher().getPublisherCase());
317             // create DataSubscriber from TelemetryProto.Subscriber
318             DataSubscriber dataSubscriber = new DataSubscriber(
319                     this,
320                     metricsConfig,
321                     subscriber);
322             dataSubscribers.add(dataSubscriber);
323             // addDataSubscriber could throw an exception, let CarTelemetryService handle it
324             publisher.addDataSubscriber(dataSubscriber);
325         }
326         mSubscriptionMap.put(metricsConfigName, dataSubscribers);
327     }
328 
329     @Override
removeMetricsConfig(@onNull String metricsConfigName)330     public void removeMetricsConfig(@NonNull String metricsConfigName) {
331         // TODO(b/187743369): pass status back to caller
332         if (!mSubscriptionMap.containsKey(metricsConfigName)) {
333             return;
334         }
335         // get the subscriptions associated with this MetricsConfig, remove it from the map
336         List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(metricsConfigName);
337         // for each subscriber, remove it from publishers
338         for (DataSubscriber subscriber : dataSubscribers) {
339             AbstractPublisher publisher = mPublisherFactory.getPublisher(
340                     subscriber.getPublisherParam().getPublisherCase());
341             try {
342                 publisher.removeDataSubscriber(subscriber);
343             } catch (IllegalArgumentException e) {
344                 // It shouldn't happen, but if happens, let's just log it.
345                 Slogf.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e);
346             }
347         }
348         // Remove all the tasks associated with this metrics config. The underlying impl uses the
349         // weakly consistent iterator, which is thread-safe but does not freeze the collection while
350         // iterating, so it may or may not reflect any updates since the iterator was created.
351         // But since adding & polling from queue should happen in the same thread, the task queue
352         // should not be changed while tasks are being iterated and removed.
353         Iterator<ScriptExecutionTask> it = mTaskQueue.iterator();
354         while (it.hasNext()) {
355             ScriptExecutionTask task = it.next();
356             if (task.isAssociatedWithMetricsConfig(metricsConfigName)) {
357                 mTaskQueue.remove(task);
358                 mPublisherCountArray.append(
359                         task.getPublisherType(),
360                         mPublisherCountArray.get(task.getPublisherType()) - 1);
361             }
362         }
363     }
364 
365     @Override
removeAllMetricsConfigs()366     public void removeAllMetricsConfigs() {
367         mPublisherFactory.removeAllDataSubscribers();
368         mSubscriptionMap.clear();
369         mTaskQueue.clear();
370         mPublisherCountArray.clear();
371     }
372 
373     @Override
addTaskToQueue(@onNull ScriptExecutionTask task)374     public int addTaskToQueue(@NonNull ScriptExecutionTask task) {
375         if (mDisabled) {
376             return mPublisherCountArray.get(task.getPublisherType());
377         }
378         mTaskQueue.add(task);
379         mPublisherCountArray.append(
380                 task.getPublisherType(),
381                 mPublisherCountArray.get(task.getPublisherType()) + 1);
382         scheduleNextTask();
383         return mPublisherCountArray.get(task.getPublisherType());
384     }
385 
386     /**
387      * This method can be called from any thread.
388      * It is possible for this method to be invoked from different threads at the same time, but
389      * it is not possible to schedule the same task twice, because the handler handles message
390      * in the order they come in, this means the task will be polled sequentially instead of
391      * concurrently. Every task that is scheduled and run will be distinct.
392      * TODO(b/187743369): If the threading behavior in DataSubscriber changes, ScriptExecutionTask
393      *  will also have different threading behavior. Update javadoc when the behavior is decided.
394      */
395     @Override
scheduleNextTask()396     public void scheduleNextTask() {
397         if (mDisabled || mTelemetryHandler.hasMessages(MSG_HANDLE_TASK)) {
398             return;
399         }
400         mTelemetryHandler.sendEmptyMessage(MSG_HANDLE_TASK);
401     }
402 
403     @Override
setDataBrokerListener(@onNull DataBrokerListener dataBrokerListener)404     public void setDataBrokerListener(@NonNull DataBrokerListener dataBrokerListener) {
405         if (mDisabled) {
406             return;
407         }
408         mDataBrokerListener = dataBrokerListener;
409     }
410 
411     @Override
setTaskExecutionPriority(int priority)412     public void setTaskExecutionPriority(int priority) {
413         if (mDisabled) {
414             return;
415         }
416         if (priority == mPriority) {
417             return;
418         }
419         mPriority = priority;
420         scheduleNextTask(); // when priority updates, schedule a task which checks task queue
421     }
422 
423     @VisibleForTesting
424     @NonNull
getSubscriptionMap()425     ArrayMap<String, List<DataSubscriber>> getSubscriptionMap() {
426         return new ArrayMap<>(mSubscriptionMap);
427     }
428 
429     @VisibleForTesting
430     @NonNull
getTelemetryHandler()431     Handler getTelemetryHandler() {
432         return mTelemetryHandler;
433     }
434 
435     @VisibleForTesting
436     @NonNull
getTaskQueue()437     PriorityBlockingQueue<ScriptExecutionTask> getTaskQueue() {
438         return mTaskQueue;
439     }
440 
441     /**
442      * Polls and runs a task from the head of the priority queue if the queue is nonempty and the
443      * head of the queue has priority higher than or equal to the current priority. A higher
444      * priority is denoted by a lower priority number, so head of the queue should have equal or
445      * lower priority number to be polled.
446      */
pollAndExecuteTask()447     private void pollAndExecuteTask() {
448         // check databroker state is ready to run script
449         if (mDisabled || mCurrentMetricsConfigName != null) {
450             Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, disabled or no config.");
451             return;
452         }
453         // check task is valid and ready to be run
454         ScriptExecutionTask task = mTaskQueue.peek();
455         if (task == null || task.getPriority() > mPriority) {
456             Slogf.d(CarLog.TAG_TELEMETRY, "Ignoring the task, either task is null or low priority");
457             return;
458         }
459         // if script executor is null, bind service
460         if (mScriptExecutor == null) {
461             Slogf.w(CarLog.TAG_TELEMETRY, "script executor is null, binding to script executor");
462             // upon successful binding, a task will be scheduled to run if there are any
463             mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
464             return;
465         }
466         mTaskQueue.poll(); // remove task from queue
467         mPublisherCountArray.append(
468                 task.getPublisherType(),
469                 mPublisherCountArray.get(task.getPublisherType()) - 1);
470 
471         if (task.bypassScriptExecutor()) {
472             // delegate to DataBrokerListener to handle storing data and scheduling next task
473             mDataBrokerListener.onMetricsReport(task.getMetricsConfig().getName(),
474                     task.getData(), /* state= */ null);
475             return;
476         }
477 
478         // update current config name because a script is currently running
479         mCurrentMetricsConfigName = task.getMetricsConfig().getName();
480         mScriptExecutionTraceLog.traceBegin(
481                 "executing script " + mCurrentMetricsConfigName);
482         try {
483             if (task.isLargeData()) {
484                 if (DEBUG) {
485                     Slogf.d(CarLog.TAG_TELEMETRY,
486                             "Running with large func %s of %s in ScriptExecutor.",
487                             task.getHandlerName(),
488                             mCurrentMetricsConfigName);
489                 }
490                 invokeScriptForLargeInput(task);
491             } else if (task.isBundleList()) {
492                 if (DEBUG) {
493                     Slogf.d(CarLog.TAG_TELEMETRY,
494                             "Running with bundle list func %s of %s in ScriptExecutor.",
495                             task.getHandlerName(),
496                             mCurrentMetricsConfigName);
497                 }
498                 invokeScriptForBundleList(task);
499             } else {
500                 if (DEBUG) {
501                     Slogf.d(CarLog.TAG_TELEMETRY, "Running func %s of %s in ScriptExecutor.",
502                             task.getHandlerName(),
503                             mCurrentMetricsConfigName);
504                 }
505                 mScriptExecutor.invokeScript(
506                         task.getMetricsConfig().getScript(),
507                         task.getHandlerName(),
508                         task.getData(),
509                         mResultStore.getInterimResult(mCurrentMetricsConfigName),
510                         mScriptExecutorListener);
511             }
512             mTelemetryHandler.sendEmptyMessageDelayed(
513                     MSG_STOP_HANGING_SCRIPT, MAX_SCRIPT_EXECUTION_TIME_MILLIS);
514         } catch (RemoteException e) {
515             mScriptExecutionTraceLog.traceEnd();
516             Slogf.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
517             unbindScriptExecutor();
518             addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor
519         } catch (IOException e) {
520             mScriptExecutionTraceLog.traceEnd();
521             Slogf.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data"
522                     + " to ScriptExecutor. Skipping the published data", e);
523             mCurrentMetricsConfigName = null;
524             scheduleNextTask(); // drop this task and schedule the next one
525         }
526     }
527 
528     /**
529      * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the
530      * script input to the pipe.
531      *
532      * @param task containing all the necessary parameters for ScriptExecutor API.
533      * @throws IOException if cannot create pipe or cannot write the bundle to pipe.
534      * @throws RemoteException if ScriptExecutor failed.
535      */
invokeScriptForLargeInput(@onNull ScriptExecutionTask task)536     private void invokeScriptForLargeInput(@NonNull ScriptExecutionTask task)
537             throws IOException, RemoteException {
538         ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
539         ParcelFileDescriptor readFd = fds[0];
540         ParcelFileDescriptor writeFd = fds[1];
541         try {
542             mScriptExecutor.invokeScriptForLargeInput(
543                     task.getMetricsConfig().getScript(),
544                     task.getHandlerName(),
545                     readFd,
546                     mResultStore.getInterimResult(mCurrentMetricsConfigName),
547                     mScriptExecutorListener);
548         } catch (RemoteException e) {
549             IoUtils.closeQuietly(readFd);
550             IoUtils.closeQuietly(writeFd);
551             throw e;
552         }
553         IoUtils.closeQuietly(readFd);
554 
555         Slogf.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe");
556         try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) {
557             task.getData().writeToStream(outputStream);
558         }
559     }
560 
561     /**
562      * Sends bundle list with LargeParcelable mechanism.
563      *
564      * @param task containing all the necessary parameters for ScriptExecutor API.
565      * @throws RemoteException if ScriptExecutor failed.
566      */
invokeScriptForBundleList(@onNull ScriptExecutionTask task)567     private void invokeScriptForBundleList(@NonNull ScriptExecutionTask task)
568             throws RemoteException {
569         BundleList bl = new BundleList();
570         bl.bundles = task.getBundleList();
571         bl = (BundleList) LargeParcelable.toLargeParcelable(
572                 bl, () -> {
573                     BundleList bundleList = new BundleList();
574                     bundleList.bundles = new ArrayList<>();
575                     return bundleList;
576                 });
577         mScriptExecutor.invokeScriptForBundleList(
578                 task.getMetricsConfig().getScript(),
579                 task.getHandlerName(),
580                 bl,
581                 mResultStore.getInterimResult(mCurrentMetricsConfigName),
582                 mScriptExecutorListener);
583     }
584 
buildTelemetryError( @onNull TelemetryError.ErrorType errorType, @NonNull String message, @Nullable String stackTrace)585     private TelemetryError buildTelemetryError(
586             @NonNull TelemetryError.ErrorType errorType,
587             @NonNull String message,
588             @Nullable String stackTrace) {
589         TelemetryError.Builder error = TelemetryError.newBuilder()
590                 .setErrorType(errorType)
591                 .setMessage(message);
592         if (stackTrace != null) {
593             error.setStackTrace(stackTrace);
594         }
595         return error.build();
596     }
597 
598     /**
599      * This helper method should be called as soon as script execution returns.
600      * It returns the name of the MetricsConfig whose script returned.
601      */
endScriptExecution()602     private String endScriptExecution() {
603         mScriptExecutionTraceLog.traceEnd(); // end trace as soon as script completes running
604         mTelemetryHandler.removeMessages(MSG_STOP_HANGING_SCRIPT); // script did not hang
605         // get and set the mCurrentMetricsConfigName to null
606         String configName = mCurrentMetricsConfigName;
607         mCurrentMetricsConfigName = null;
608         return configName;
609     }
610 
611     /** Stores final metrics and schedules the next task. */
onScriptFinished(@onNull PersistableBundle result)612     private void onScriptFinished(@NonNull PersistableBundle result) {
613         if (DEBUG) {
614             Slogf.d(CarLog.TAG_TELEMETRY, "A script finished, storing the final result.");
615         }
616         mTelemetryHandler.post(() -> {
617             String configName = endScriptExecution();
618             if (configName == null) {
619                 return;
620             }
621             // delegate to DataBrokerListener to handle storing data and scheduling next task
622             mDataBrokerListener.onReportFinished(configName, result);
623         });
624     }
625 
626     /** Stores interim metrics and schedules the next task. */
onScriptSuccess(@onNull PersistableBundle stateToPersist)627     private void onScriptSuccess(@NonNull PersistableBundle stateToPersist) {
628         if (DEBUG) {
629             Slogf.d(CarLog.TAG_TELEMETRY, "A script succeeded, storing the interim result.");
630         }
631         mTelemetryHandler.post(() -> {
632             String configName = endScriptExecution();
633             if (configName == null) {
634                 return;
635             }
636             // delegate to DataBrokerListener to handle storing data and scheduling next task
637             mDataBrokerListener.onEventConsumed(configName, stateToPersist);
638         });
639     }
640 
641     /** Stores telemetry error and schedules the next task. */
onScriptError( int errorType, @NonNull String message, @Nullable String stackTrace)642     private void onScriptError(
643             int errorType, @NonNull String message, @Nullable String stackTrace) {
644         if (DEBUG) {
645             Slogf.d(CarLog.TAG_TELEMETRY, "A script failed: %d %s\n%s",
646                     errorType, message, stackTrace);
647         }
648         mTelemetryHandler.post(() -> {
649             String configName = endScriptExecution();
650             if (configName == null) {
651                 return;
652             }
653             // delegate to DataBrokerListener to handle storing data and scheduling next task
654             mDataBrokerListener.onReportFinished(
655                     configName,
656                     buildTelemetryError(
657                             TelemetryError.ErrorType.forNumber(errorType),
658                             message,
659                             stackTrace));
660         });
661     }
662 
onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)663     private void onMetricsReport(
664             @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) {
665         if (DEBUG) {
666             Slogf.d(CarLog.TAG_TELEMETRY, "A script produced a report without finishing.");
667         }
668         mTelemetryHandler.post(() -> {
669             String configName = endScriptExecution();
670             if (configName == null) {
671                 return;
672             }
673             mDataBrokerListener.onMetricsReport(configName, report, stateToPersist);
674         });
675     }
676 
677     /** Listens for script execution status. Methods are called on the binder thread. */
678     private static final class ScriptExecutorListener extends IScriptExecutorListener.Stub {
679         private final WeakReference<DataBrokerImpl> mWeakDataBroker;
680 
ScriptExecutorListener(@onNull DataBrokerImpl dataBroker)681         private ScriptExecutorListener(@NonNull DataBrokerImpl dataBroker) {
682             mWeakDataBroker = new WeakReference<>(dataBroker);
683         }
684 
685         @Override
onScriptFinished(@onNull PersistableBundle result)686         public void onScriptFinished(@NonNull PersistableBundle result) {
687             DataBrokerImpl dataBroker = mWeakDataBroker.get();
688             if (dataBroker == null) {
689                 return;
690             }
691             dataBroker.onScriptFinished(result);
692         }
693 
694         @Override
onSuccess(@onNull PersistableBundle stateToPersist)695         public void onSuccess(@NonNull PersistableBundle stateToPersist) {
696             DataBrokerImpl dataBroker = mWeakDataBroker.get();
697             if (dataBroker == null) {
698                 return;
699             }
700             dataBroker.onScriptSuccess(stateToPersist);
701         }
702 
703         @Override
onError(int errorType, @NonNull String message, @Nullable String stackTrace)704         public void onError(int errorType, @NonNull String message, @Nullable String stackTrace) {
705             DataBrokerImpl dataBroker = mWeakDataBroker.get();
706             if (dataBroker == null) {
707                 return;
708             }
709             dataBroker.onScriptError(errorType, message, stackTrace);
710         }
711 
712         @Override
onMetricsReport( @onNull PersistableBundle report, @Nullable PersistableBundle stateToPersist)713         public void onMetricsReport(
714                 @NonNull PersistableBundle report, @Nullable PersistableBundle stateToPersist) {
715             DataBrokerImpl dataBroker = mWeakDataBroker.get();
716             if (dataBroker == null) {
717                 return;
718             }
719             dataBroker.onMetricsReport(report, stateToPersist);
720         }
721     }
722 
723     /** Callback handler to handle scheduling and rescheduling of {@link ScriptExecutionTask}s. */
724     class TaskHandler extends Handler {
TaskHandler(@onNull Looper looper)725         TaskHandler(@NonNull Looper looper) {
726             super(looper);
727         }
728 
729         /**
730          * Handles a message depending on the message ID.
731          * If the msg ID is MSG_HANDLE_TASK, it polls a task from the priority queue and executing a
732          * {@link ScriptExecutionTask}. There are multiple places where this message is sent: when
733          * priority updates, when a new task is added to the priority queue, and when a task
734          * finishes running.
735          */
736         @Override
handleMessage(@onNull Message msg)737         public void handleMessage(@NonNull Message msg) {
738             switch (msg.what) {
739                 case MSG_HANDLE_TASK:
740                     pollAndExecuteTask(); // run the next task
741                     break;
742                 case MSG_BIND_TO_SCRIPT_EXECUTOR:
743                     bindScriptExecutor();
744                     break;
745                 case MSG_STOP_HANGING_SCRIPT:
746                     // TODO(b/223224704): log error
747                     unbindScriptExecutor();
748                     scheduleNextTask();
749                     break;
750                 default:
751                     Slogf.w(CarLog.TAG_TELEMETRY, "TaskHandler received unknown message.");
752             }
753         }
754     }
755 }
756