• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.android.tradefed.invoker.shard;
17 
18 import com.android.annotations.VisibleForTesting;
19 import com.android.tradefed.config.IConfiguration;
20 import com.android.tradefed.error.HarnessRuntimeException;
21 import com.android.tradefed.invoker.IRescheduler;
22 import com.android.tradefed.invoker.TestInformation;
23 import com.android.tradefed.log.ITestLogger;
24 import com.android.tradefed.log.LogUtil.CLog;
25 import com.android.tradefed.result.error.InfraErrorIdentifier;
26 import com.android.tradefed.service.TradefedFeatureClient;
27 import com.android.tradefed.testtype.IRemoteTest;
28 import com.android.tradefed.testtype.suite.ITestSuite;
29 import com.android.tradefed.testtype.suite.ModuleDefinition;
30 
31 import com.google.common.base.Strings;
32 import com.google.internal.android.engprod.v1.ProvideTestTargetRequest;
33 import com.google.internal.android.engprod.v1.SerializedTestTarget;
34 import com.proto.tradefed.feature.FeatureResponse;
35 
36 import io.grpc.Status;
37 import io.grpc.StatusRuntimeException;
38 
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.HashMap;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.ServiceLoader;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.stream.Collectors;
47 
48 /** Sharding strategy to allow work remote work queueing between multiple TF instances */
49 public class DynamicShardHelper extends StrictShardHelper {
50 
51     /** {@inheritDoc} */
52     @Override
shardConfig( IConfiguration config, TestInformation testInfo, IRescheduler rescheduler, ITestLogger logger)53     public boolean shardConfig(
54             IConfiguration config,
55             TestInformation testInfo,
56             IRescheduler rescheduler,
57             ITestLogger logger) {
58         // Check preconditions
59         Integer shardCount = config.getCommandOptions().getShardCount();
60         Integer shardIndex = config.getCommandOptions().getShardIndex();
61 
62         String invocationId = testInfo.getContext().getAttribute("invocation_id");
63         String attemptId = testInfo.getContext().getAttribute("attempt_index");
64 
65         boolean shouldDelegate = false;
66 
67         // We should re-delegate this to strict sharding so it can delegate this case to local
68         // sharding
69         if (shardIndex == null) {
70             shouldDelegate = true;
71         }
72 
73         if (shardIndex != null && shardCount == null) {
74             throw new HarnessRuntimeException(
75                     "shard-count is null while shard-index is " + shardIndex,
76                     InfraErrorIdentifier.INTERNAL_CONFIG_ERROR);
77         }
78 
79         // redelegate to strict sharding
80         if (Strings.isNullOrEmpty(attemptId)) {
81             shouldDelegate = true;
82         }
83 
84         // If we don't have sufficient information to properly key the pool, then fall
85         // back to strict sharding.
86         if (Strings.isNullOrEmpty(invocationId)) {
87             CLog.d("No invocation_id specified, falling back to strict sharding.");
88             shouldDelegate = true;
89         }
90 
91         // Check if any of the tests are not ITestSuite instances
92         // If not, make sure that intra-module sharding is off and delegate
93         if (!shouldDelegate
94                 && config.getTests().stream()
95                         .anyMatch(x -> !ITestSuite.class.isAssignableFrom(x.getClass()))) {
96             CLog.d("Found non-ITestSuite tests, falling back to strict sharding");
97             shouldDelegate = true;
98         }
99 
100         List<ITestSuite> allModules = null;
101 
102         if (!shouldDelegate) {
103             allModules = getAllModules(config, testInfo);
104             if (allModules == null) {
105                 CLog.w("No sharding supported.");
106                 shouldDelegate = true;
107             }
108         }
109 
110         if (shouldDelegate) {
111             CLog.d(
112                     "Re-entering StrictShardHelper with dynamic sharding disabled due to failed"
113                             + " precondition checks.");
114             return shardConfigStrict(config, testInfo, rescheduler, logger);
115         }
116 
117         // Initialize Dynamic Sharding client
118         IDynamicShardingClient client = getClient();
119 
120         String poolId = String.format("invocation-%s", invocationId);
121 
122         Map<String, ITestSuite> moduleMapping = new HashMap<>();
123         for (ITestSuite test : allModules) {
124             ModuleDefinition moduleDef = test.getDirectModule();
125             if (moduleDef == null) {
126                 throw new HarnessRuntimeException(
127                         "Module definition is null", InfraErrorIdentifier.INTERNAL_CONFIG_ERROR);
128             }
129             String moduleName = moduleDef.getId();
130             if (moduleName == null) {
131                 throw new HarnessRuntimeException(
132                         "Module name is null", InfraErrorIdentifier.INTERNAL_CONFIG_ERROR);
133             }
134             moduleMapping.put(moduleName, test);
135         }
136 
137         // if we're shard 0 populate the pool with the list of tests
138         try {
139             // Populate the pool
140             List<SerializedTestTarget> targetsToUpload =
141                     moduleMapping.keySet().stream()
142                             .map(x -> SerializedTestTarget.newBuilder().setTargetName(x).build())
143                             .collect(Collectors.toList());
144             CLog.d("Uploading to pool %s test targets: %s", poolId, targetsToUpload);
145             ProvideTestTargetRequest request =
146                     ProvideTestTargetRequest.newBuilder()
147                             .setReferencePoolId(poolId)
148                             .setUseOneShotSeeding(true)
149                             .addAllTestTargets(targetsToUpload)
150                             .build();
151             client.provideTestTarget(request);
152         } catch (StatusRuntimeException e) {
153             // If it is just the ALREADY_EXISTS error, that's ok; it just means
154             // that one of the other shards got to it before this one.
155             if (Status.fromThrowable(e).getCode() != Status.Code.ALREADY_EXISTS) {
156                 // rethrow if it isn't the error we were expecting.
157                 throw e;
158             }
159             // will only reach this point if the error code is ALREADY_EXISTS
160             CLog.v("Another shard has already seeded the pool '%s'.", poolId);
161         }
162 
163         // if we're any shard, create a test pool poller that polls the sharding server
164         ITestsPool pool = RemoteDynamicPool.newInstance(client, poolId, moduleMapping);
165 
166         // For now this should disable the reporting of not executed tests since each
167         // poller can only decrement this by 1 and this mode only uses one poller per
168         // cluster command.
169         // At some point we should probably have some way for pollers to register and
170         // deregister from a pool in order to be able to tell how many pollers are still
171         // listening to a pool.
172         CountDownLatch tracker = new CountDownLatch(2);
173         TestsPoolPoller poller = new TestsPoolPoller(pool, tracker);
174 
175         // set our main test to be the test pool poller.
176         config.setTest(poller);
177 
178         // We cannot shuffle to get better average results
179         return false;
180     }
181 
182     @VisibleForTesting
shardConfigStrict( IConfiguration config, TestInformation testInfo, IRescheduler rescheduler, ITestLogger logger)183     protected boolean shardConfigStrict(
184             IConfiguration config,
185             TestInformation testInfo,
186             IRescheduler rescheduler,
187             ITestLogger logger) {
188         return super.shardConfigInternal(config, testInfo, rescheduler, logger);
189     }
190 
191     @VisibleForTesting
getClient()192     private IDynamicShardingClient getClient() {
193         FeatureResponse resp = null;
194         try (TradefedFeatureClient featureClient = new TradefedFeatureClient()) {
195             resp =
196                     featureClient.triggerFeature(
197                             "getDynamicShardingConnectionInfo", new HashMap<String, String>());
198         }
199         if (resp.hasMultiPartResponse()) {
200             DynamicShardingConnectionInfoMessage msg =
201                     DynamicShardingConnectionInfoMessage.fromMultiPartResponse(
202                             resp.getMultiPartResponse());
203             return new ConfigurableGrpcDynamicShardingClient(msg);
204         } else {
205             CLog.v(
206                     "Failed to get connection info from feature client, will attempt to load a"
207                             + " client using the service loader");
208             ServiceLoader<IDynamicShardingClient> serviceLoader =
209                     ServiceLoader.load(IDynamicShardingClient.class);
210             for (IDynamicShardingClient client : serviceLoader) {
211                 // the first (and should be only) implementation of this feature should be the
212                 // internal one
213                 if (IDynamicShardingConnectionInfo.class.isAssignableFrom(client.getClass())) {
214                     // use the internal one to configure the generic one
215                     return new ConfigurableGrpcDynamicShardingClient(
216                             (IDynamicShardingConnectionInfo) client);
217                 }
218             }
219         }
220         throw new HarnessRuntimeException(
221                 "Failed to retrieve dynamic sharding connection info, feature server problem?",
222                 InfraErrorIdentifier.INTERNAL_CONFIG_ERROR);
223     }
224 
getAllModules(IConfiguration config, TestInformation testInfo)225     private List<ITestSuite> getAllModules(IConfiguration config, TestInformation testInfo) {
226         List<ITestSuite> allTests = new ArrayList<>();
227         for (IRemoteTest test : config.getTests()) {
228             if (test instanceof ITestSuite) {
229                 ITestSuite suite = (ITestSuite) test;
230                 // Disable intra-module-sharding when requesting dynamic sharding
231                 // as it's currently not supported together.
232                 if (suite.getIntraModuleSharding()) {
233                     CLog.w(
234                             "Disabling intra-module sharding because it is not supported with"
235                                     + "dynamic sharding.");
236                     suite.setIntraModuleSharding(false);
237                 }
238 
239                 Collection<IRemoteTest> splitSuite = suite.split(1000000, testInfo);
240                 if (splitSuite == null) {
241                     return null;
242                 } else {
243                     allTests.addAll(
244                             splitSuite.stream()
245                                     .map(x -> (ITestSuite) x)
246                                     .collect(Collectors.toList()));
247                 }
248             } else {
249                 throw new HarnessRuntimeException(
250                         "Test not an instance of ITestSuite, cannot execute this using dynamic"
251                                 + " sharding.",
252                         InfraErrorIdentifier.INTERNAL_CONFIG_ERROR);
253             }
254         }
255         return allTests;
256     }
257 }
258