• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.android.tradefed.invoker.tracing;
17 
18 import com.android.tradefed.log.LogUtil.CLog;
19 
20 import com.google.common.base.Throwables;
21 
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Objects;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 
34 import javax.annotation.Nullable;
35 
36 /**
37  * An executor service that forwards tasks to an underlying implementation while propagating the
38  * tracing context.
39  *
40  * <p>This enables using tracing facilities such as {@code CloseableTraceScope} in submitted tasks.
41  */
42 public final class TracePropagatingExecutorService implements ExecutorService {
43 
44     private final ExecutorService delegate;
45 
46     /**
47      * Creates an {@link ExecutorService} that delegates to the given delegate executor.
48      *
49      * <p>Note that the active trace on is that is propagated to tasks is the one active on calls to
50      * the executor method. This is done because TF constructs most objects before starting the
51      * invocation and attaching the trace.
52      */
create(ExecutorService delegate)53     public static TracePropagatingExecutorService create(ExecutorService delegate) {
54         return new TracePropagatingExecutorService(delegate);
55     }
56 
57     @Override
submit(Callable<T> task)58     public <T> Future<T> submit(Callable<T> task) {
59         return delegate.submit(wrapTask(task));
60     }
61 
62     @Override
submit(Runnable task, T result)63     public <T> Future<T> submit(Runnable task, T result) {
64         return delegate.submit(wrapTask(task), result);
65     }
66 
67     @Override
submit(Runnable task)68     public Future<?> submit(Runnable task) {
69         return delegate.submit(wrapTask(task));
70     }
71 
72     @Override
invokeAll(Collection<? extends Callable<T>> tasks)73     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
74             throws InterruptedException {
75         return delegate.invokeAll(wrapTasks(tasks));
76     }
77 
78     @Override
invokeAll( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)79     public <T> List<Future<T>> invokeAll(
80             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
81             throws InterruptedException {
82         return delegate.invokeAll(wrapTasks(tasks), timeout, unit);
83     }
84 
85     @Override
invokeAny(Collection<? extends Callable<T>> tasks)86     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
87             throws InterruptedException, ExecutionException {
88         return delegate.invokeAny(wrapTasks(tasks));
89     }
90 
91     @Override
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)92     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
93             throws InterruptedException, ExecutionException, TimeoutException {
94         return delegate.invokeAny(wrapTasks(tasks), timeout, unit);
95     }
96 
97     @Override
execute(Runnable command)98     public void execute(Runnable command) {
99         delegate.execute(wrapTask(command));
100     }
101 
102     @Override
awaitTermination(long timeout, TimeUnit unit)103     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
104         return delegate.awaitTermination(timeout, unit);
105     }
106 
107     @Override
isShutdown()108     public boolean isShutdown() {
109         return delegate.isShutdown();
110     }
111 
112     @Override
isTerminated()113     public boolean isTerminated() {
114         return delegate.isTerminated();
115     }
116 
117     @Override
shutdown()118     public void shutdown() {
119         delegate.shutdown();
120     }
121 
122     @Override
shutdownNow()123     public List<Runnable> shutdownNow() {
124         return delegate.shutdownNow();
125     }
126 
TracePropagatingExecutorService(ExecutorService delegate)127     private TracePropagatingExecutorService(ExecutorService delegate) {
128         this.delegate = Objects.requireNonNull(delegate);
129     }
130 
wrapTask(Callable<T> task)131     private <T> Callable<T> wrapTask(Callable<T> task) {
132         // Note that we query the active trace on the thread calling the Executor method to
133         // then propagate it to tasks.
134         @Nullable ActiveTrace rootTrace = TracingLogger.getActiveTrace();
135         return () -> {
136             try (TraceScope ignored = makeActive(rootTrace)) {
137                 return task.call();
138             }
139         };
140     }
141 
142     private Runnable wrapTask(Runnable command) {
143         Callable<?> wrapped = wrapTask(Executors.callable(command));
144         return () -> {
145             try {
146                 wrapped.call();
147             } catch (Exception e) {
148                 Throwables.throwIfUnchecked(e);
149                 // We should never really get here since we're wrapping a Runnable that never throws
150                 // a checked exception.
151                 throw new AssertionError(e);
152             }
153         };
154     }
155 
156     protected <T> Collection<? extends Callable<T>> wrapTasks(
157             Collection<? extends Callable<T>> tasks) {
158         List<Callable<T>> wrapped = new ArrayList<>();
159         for (Callable<T> task : tasks) {
160             wrapped.add(wrapTask(task));
161         }
162         return wrapped;
163     }
164 
165     private static TraceScope makeActive(@Nullable ActiveTrace toAttach) {
166         // Save the active trace that is currently active on the task's thread for us to later
167         // restore it. This is not necessarily {@code null} since tasks could submit additional
168         // tasks using the same executor. The thread could also be reused for other tasks which
169         // were submitted with different trace contexts.
170         ActiveTrace toRestore = setActiveTraceIfChanged(toAttach);
171 
172         return () -> {
173             // We always restore since the task may have switched traces while running.
174             if (setActiveTraceIfChanged(toRestore) != toAttach) {
175                 CLog.w("Unexpected active trace, close was not correctly called");
176             }
177             ;
178         };
179     }
180 
181     private static @Nullable ActiveTrace setActiveTraceIfChanged(@Nullable ActiveTrace toAttach) {
182         ActiveTrace toRestore = TracingLogger.getActiveTrace();
183 
184         // This is an optimization since there's no point switching if the current and target
185         // trace are the same.
186         if (toAttach != toRestore) {
187             TracingLogger.setActiveTrace(toAttach);
188         }
189 
190         return toRestore;
191     }
192 
193     interface TraceScope extends AutoCloseable {
194         @Override
195         public void close();
196     }
197 }
198