1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.android.tradefed.invoker.shard; 17 18 import com.android.annotations.VisibleForTesting; 19 import com.android.tradefed.config.IConfiguration; 20 import com.android.tradefed.error.HarnessRuntimeException; 21 import com.android.tradefed.invoker.IRescheduler; 22 import com.android.tradefed.invoker.TestInformation; 23 import com.android.tradefed.log.ITestLogger; 24 import com.android.tradefed.log.LogUtil.CLog; 25 import com.android.tradefed.result.error.InfraErrorIdentifier; 26 import com.android.tradefed.service.TradefedFeatureClient; 27 import com.android.tradefed.testtype.IRemoteTest; 28 import com.android.tradefed.testtype.suite.ITestSuite; 29 import com.android.tradefed.testtype.suite.ModuleDefinition; 30 31 import com.google.common.base.Strings; 32 import com.google.internal.android.engprod.v1.ProvideTestTargetRequest; 33 import com.google.internal.android.engprod.v1.SerializedTestTarget; 34 import com.proto.tradefed.feature.FeatureResponse; 35 36 import io.grpc.Status; 37 import io.grpc.StatusRuntimeException; 38 39 import java.util.ArrayList; 40 import java.util.Collection; 41 import java.util.HashMap; 42 import java.util.List; 43 import java.util.Map; 44 import java.util.ServiceLoader; 45 import java.util.concurrent.CountDownLatch; 46 import java.util.stream.Collectors; 47 48 /** Sharding strategy to allow work remote work queueing between multiple TF instances */ 49 public class DynamicShardHelper extends StrictShardHelper { 50 51 /** {@inheritDoc} */ 52 @Override shardConfig( IConfiguration config, TestInformation testInfo, IRescheduler rescheduler, ITestLogger logger)53 public boolean shardConfig( 54 IConfiguration config, 55 TestInformation testInfo, 56 IRescheduler rescheduler, 57 ITestLogger logger) { 58 // Check preconditions 59 Integer shardCount = config.getCommandOptions().getShardCount(); 60 Integer shardIndex = config.getCommandOptions().getShardIndex(); 61 62 String invocationId = testInfo.getContext().getAttribute("invocation_id"); 63 String attemptId = testInfo.getContext().getAttribute("attempt_index"); 64 65 boolean shouldDelegate = false; 66 67 // We should re-delegate this to strict sharding so it can delegate this case to local 68 // sharding 69 if (shardIndex == null) { 70 shouldDelegate = true; 71 } 72 73 if (shardIndex != null && shardCount == null) { 74 throw new HarnessRuntimeException( 75 "shard-count is null while shard-index is " + shardIndex, 76 InfraErrorIdentifier.INTERNAL_CONFIG_ERROR); 77 } 78 79 // redelegate to strict sharding 80 if (Strings.isNullOrEmpty(attemptId)) { 81 shouldDelegate = true; 82 } 83 84 // If we don't have sufficient information to properly key the pool, then fall 85 // back to strict sharding. 86 if (Strings.isNullOrEmpty(invocationId)) { 87 CLog.d("No invocation_id specified, falling back to strict sharding."); 88 shouldDelegate = true; 89 } 90 91 // Check if any of the tests are not ITestSuite instances 92 // If not, make sure that intra-module sharding is off and delegate 93 if (!shouldDelegate 94 && config.getTests().stream() 95 .anyMatch(x -> !ITestSuite.class.isAssignableFrom(x.getClass()))) { 96 CLog.d("Found non-ITestSuite tests, falling back to strict sharding"); 97 shouldDelegate = true; 98 } 99 100 List<ITestSuite> allModules = null; 101 102 if (!shouldDelegate) { 103 allModules = getAllModules(config, testInfo); 104 if (allModules == null) { 105 CLog.w("No sharding supported."); 106 shouldDelegate = true; 107 } 108 } 109 110 if (shouldDelegate) { 111 CLog.d( 112 "Re-entering StrictShardHelper with dynamic sharding disabled due to failed" 113 + " precondition checks."); 114 return shardConfigStrict(config, testInfo, rescheduler, logger); 115 } 116 117 // Initialize Dynamic Sharding client 118 IDynamicShardingClient client = getClient(); 119 120 String poolId = String.format("invocation-%s", invocationId); 121 122 Map<String, ITestSuite> moduleMapping = new HashMap<>(); 123 for (ITestSuite test : allModules) { 124 ModuleDefinition moduleDef = test.getDirectModule(); 125 if (moduleDef == null) { 126 throw new HarnessRuntimeException( 127 "Module definition is null", InfraErrorIdentifier.INTERNAL_CONFIG_ERROR); 128 } 129 String moduleName = moduleDef.getId(); 130 if (moduleName == null) { 131 throw new HarnessRuntimeException( 132 "Module name is null", InfraErrorIdentifier.INTERNAL_CONFIG_ERROR); 133 } 134 moduleMapping.put(moduleName, test); 135 } 136 137 // if we're shard 0 populate the pool with the list of tests 138 try { 139 // Populate the pool 140 List<SerializedTestTarget> targetsToUpload = 141 moduleMapping.keySet().stream() 142 .map(x -> SerializedTestTarget.newBuilder().setTargetName(x).build()) 143 .collect(Collectors.toList()); 144 CLog.d("Uploading to pool %s test targets: %s", poolId, targetsToUpload); 145 ProvideTestTargetRequest request = 146 ProvideTestTargetRequest.newBuilder() 147 .setReferencePoolId(poolId) 148 .setUseOneShotSeeding(true) 149 .addAllTestTargets(targetsToUpload) 150 .build(); 151 client.provideTestTarget(request); 152 } catch (StatusRuntimeException e) { 153 // If it is just the ALREADY_EXISTS error, that's ok; it just means 154 // that one of the other shards got to it before this one. 155 if (Status.fromThrowable(e).getCode() != Status.Code.ALREADY_EXISTS) { 156 // rethrow if it isn't the error we were expecting. 157 throw e; 158 } 159 // will only reach this point if the error code is ALREADY_EXISTS 160 CLog.v("Another shard has already seeded the pool '%s'.", poolId); 161 } 162 163 // if we're any shard, create a test pool poller that polls the sharding server 164 ITestsPool pool = RemoteDynamicPool.newInstance(client, poolId, moduleMapping); 165 166 // For now this should disable the reporting of not executed tests since each 167 // poller can only decrement this by 1 and this mode only uses one poller per 168 // cluster command. 169 // At some point we should probably have some way for pollers to register and 170 // deregister from a pool in order to be able to tell how many pollers are still 171 // listening to a pool. 172 CountDownLatch tracker = new CountDownLatch(2); 173 TestsPoolPoller poller = new TestsPoolPoller(pool, tracker); 174 175 // set our main test to be the test pool poller. 176 config.setTest(poller); 177 178 // We cannot shuffle to get better average results 179 return false; 180 } 181 182 @VisibleForTesting shardConfigStrict( IConfiguration config, TestInformation testInfo, IRescheduler rescheduler, ITestLogger logger)183 protected boolean shardConfigStrict( 184 IConfiguration config, 185 TestInformation testInfo, 186 IRescheduler rescheduler, 187 ITestLogger logger) { 188 return super.shardConfigInternal(config, testInfo, rescheduler, logger); 189 } 190 191 @VisibleForTesting getClient()192 private IDynamicShardingClient getClient() { 193 FeatureResponse resp = null; 194 try (TradefedFeatureClient featureClient = new TradefedFeatureClient()) { 195 resp = 196 featureClient.triggerFeature( 197 "getDynamicShardingConnectionInfo", new HashMap<String, String>()); 198 } 199 if (resp.hasMultiPartResponse()) { 200 DynamicShardingConnectionInfoMessage msg = 201 DynamicShardingConnectionInfoMessage.fromMultiPartResponse( 202 resp.getMultiPartResponse()); 203 return new ConfigurableGrpcDynamicShardingClient(msg); 204 } else { 205 CLog.v( 206 "Failed to get connection info from feature client, will attempt to load a" 207 + " client using the service loader"); 208 ServiceLoader<IDynamicShardingClient> serviceLoader = 209 ServiceLoader.load(IDynamicShardingClient.class); 210 for (IDynamicShardingClient client : serviceLoader) { 211 // the first (and should be only) implementation of this feature should be the 212 // internal one 213 if (IDynamicShardingConnectionInfo.class.isAssignableFrom(client.getClass())) { 214 // use the internal one to configure the generic one 215 return new ConfigurableGrpcDynamicShardingClient( 216 (IDynamicShardingConnectionInfo) client); 217 } 218 } 219 } 220 throw new HarnessRuntimeException( 221 "Failed to retrieve dynamic sharding connection info, feature server problem?", 222 InfraErrorIdentifier.INTERNAL_CONFIG_ERROR); 223 } 224 getAllModules(IConfiguration config, TestInformation testInfo)225 private List<ITestSuite> getAllModules(IConfiguration config, TestInformation testInfo) { 226 List<ITestSuite> allTests = new ArrayList<>(); 227 for (IRemoteTest test : config.getTests()) { 228 if (test instanceof ITestSuite) { 229 ITestSuite suite = (ITestSuite) test; 230 // Disable intra-module-sharding when requesting dynamic sharding 231 // as it's currently not supported together. 232 if (suite.getIntraModuleSharding()) { 233 CLog.w( 234 "Disabling intra-module sharding because it is not supported with" 235 + "dynamic sharding."); 236 suite.setIntraModuleSharding(false); 237 } 238 239 Collection<IRemoteTest> splitSuite = suite.split(1000000, testInfo); 240 if (splitSuite == null) { 241 return null; 242 } else { 243 allTests.addAll( 244 splitSuite.stream() 245 .map(x -> (ITestSuite) x) 246 .collect(Collectors.toList())); 247 } 248 } else { 249 throw new HarnessRuntimeException( 250 "Test not an instance of ITestSuite, cannot execute this using dynamic" 251 + " sharding.", 252 InfraErrorIdentifier.INTERNAL_CONFIG_ERROR); 253 } 254 } 255 return allTests; 256 } 257 } 258