1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.android.tradefed.invoker.tracing; 17 18 import com.android.tradefed.log.LogUtil.CLog; 19 20 import com.google.common.base.Throwables; 21 22 import java.util.ArrayList; 23 import java.util.Collection; 24 import java.util.List; 25 import java.util.Objects; 26 import java.util.concurrent.Callable; 27 import java.util.concurrent.ExecutionException; 28 import java.util.concurrent.ExecutorService; 29 import java.util.concurrent.Executors; 30 import java.util.concurrent.Future; 31 import java.util.concurrent.TimeUnit; 32 import java.util.concurrent.TimeoutException; 33 34 import javax.annotation.Nullable; 35 36 /** 37 * An executor service that forwards tasks to an underlying implementation while propagating the 38 * tracing context. 39 * 40 * <p>This enables using tracing facilities such as {@code CloseableTraceScope} in submitted tasks. 41 */ 42 public final class TracePropagatingExecutorService implements ExecutorService { 43 44 private final ExecutorService delegate; 45 46 /** 47 * Creates an {@link ExecutorService} that delegates to the given delegate executor. 48 * 49 * <p>Note that the active trace on is that is propagated to tasks is the one active on calls to 50 * the executor method. This is done because TF constructs most objects before starting the 51 * invocation and attaching the trace. 52 */ create(ExecutorService delegate)53 public static TracePropagatingExecutorService create(ExecutorService delegate) { 54 return new TracePropagatingExecutorService(delegate); 55 } 56 57 @Override submit(Callable<T> task)58 public <T> Future<T> submit(Callable<T> task) { 59 return delegate.submit(wrapTask(task)); 60 } 61 62 @Override submit(Runnable task, T result)63 public <T> Future<T> submit(Runnable task, T result) { 64 return delegate.submit(wrapTask(task), result); 65 } 66 67 @Override submit(Runnable task)68 public Future<?> submit(Runnable task) { 69 return delegate.submit(wrapTask(task)); 70 } 71 72 @Override invokeAll(Collection<? extends Callable<T>> tasks)73 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 74 throws InterruptedException { 75 return delegate.invokeAll(wrapTasks(tasks)); 76 } 77 78 @Override invokeAll( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)79 public <T> List<Future<T>> invokeAll( 80 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 81 throws InterruptedException { 82 return delegate.invokeAll(wrapTasks(tasks), timeout, unit); 83 } 84 85 @Override invokeAny(Collection<? extends Callable<T>> tasks)86 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 87 throws InterruptedException, ExecutionException { 88 return delegate.invokeAny(wrapTasks(tasks)); 89 } 90 91 @Override invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)92 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 93 throws InterruptedException, ExecutionException, TimeoutException { 94 return delegate.invokeAny(wrapTasks(tasks), timeout, unit); 95 } 96 97 @Override execute(Runnable command)98 public void execute(Runnable command) { 99 delegate.execute(wrapTask(command)); 100 } 101 102 @Override awaitTermination(long timeout, TimeUnit unit)103 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 104 return delegate.awaitTermination(timeout, unit); 105 } 106 107 @Override isShutdown()108 public boolean isShutdown() { 109 return delegate.isShutdown(); 110 } 111 112 @Override isTerminated()113 public boolean isTerminated() { 114 return delegate.isTerminated(); 115 } 116 117 @Override shutdown()118 public void shutdown() { 119 delegate.shutdown(); 120 } 121 122 @Override shutdownNow()123 public List<Runnable> shutdownNow() { 124 return delegate.shutdownNow(); 125 } 126 TracePropagatingExecutorService(ExecutorService delegate)127 private TracePropagatingExecutorService(ExecutorService delegate) { 128 this.delegate = Objects.requireNonNull(delegate); 129 } 130 wrapTask(Callable<T> task)131 private <T> Callable<T> wrapTask(Callable<T> task) { 132 // Note that we query the active trace on the thread calling the Executor method to 133 // then propagate it to tasks. 134 @Nullable ActiveTrace rootTrace = TracingLogger.getActiveTrace(); 135 return () -> { 136 try (TraceScope ignored = makeActive(rootTrace)) { 137 return task.call(); 138 } 139 }; 140 } 141 142 private Runnable wrapTask(Runnable command) { 143 Callable<?> wrapped = wrapTask(Executors.callable(command)); 144 return () -> { 145 try { 146 wrapped.call(); 147 } catch (Exception e) { 148 Throwables.throwIfUnchecked(e); 149 // We should never really get here since we're wrapping a Runnable that never throws 150 // a checked exception. 151 throw new AssertionError(e); 152 } 153 }; 154 } 155 156 protected <T> Collection<? extends Callable<T>> wrapTasks( 157 Collection<? extends Callable<T>> tasks) { 158 List<Callable<T>> wrapped = new ArrayList<>(); 159 for (Callable<T> task : tasks) { 160 wrapped.add(wrapTask(task)); 161 } 162 return wrapped; 163 } 164 165 private static TraceScope makeActive(@Nullable ActiveTrace toAttach) { 166 // Save the active trace that is currently active on the task's thread for us to later 167 // restore it. This is not necessarily {@code null} since tasks could submit additional 168 // tasks using the same executor. The thread could also be reused for other tasks which 169 // were submitted with different trace contexts. 170 ActiveTrace toRestore = setActiveTraceIfChanged(toAttach); 171 172 return () -> { 173 // We always restore since the task may have switched traces while running. 174 if (setActiveTraceIfChanged(toRestore) != toAttach) { 175 CLog.w("Unexpected active trace, close was not correctly called"); 176 } 177 ; 178 }; 179 } 180 181 private static @Nullable ActiveTrace setActiveTraceIfChanged(@Nullable ActiveTrace toAttach) { 182 ActiveTrace toRestore = TracingLogger.getActiveTrace(); 183 184 // This is an optimization since there's no point switching if the current and target 185 // trace are the same. 186 if (toAttach != toRestore) { 187 TracingLogger.setActiveTrace(toAttach); 188 } 189 190 return toRestore; 191 } 192 193 interface TraceScope extends AutoCloseable { 194 @Override 195 public void close(); 196 } 197 } 198