1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package android.cts.statsd.subscriber; 17 18 import static com.google.common.truth.Truth.assertThat; 19 20 import com.android.compatibility.common.util.CpuFeatures; 21 import com.android.internal.os.StatsdConfigProto; 22 import com.android.os.AtomsProto; 23 import com.android.os.AtomsProto.Atom; 24 import com.android.os.ShellConfig; 25 import com.android.os.statsd.ShellDataProto; 26 import com.android.tradefed.device.CollectingByteOutputReceiver; 27 import com.android.tradefed.device.DeviceNotAvailableException; 28 import com.android.tradefed.device.ITestDevice; 29 import com.android.tradefed.log.LogUtil; 30 import com.android.tradefed.testtype.DeviceTestCase; 31 import com.android.tradefed.util.RunUtil; 32 33 import com.google.common.io.Files; 34 import com.google.protobuf.InvalidProtocolBufferException; 35 36 import java.io.File; 37 import java.nio.ByteBuffer; 38 import java.nio.ByteOrder; 39 import java.util.Arrays; 40 import java.util.concurrent.TimeUnit; 41 42 import android.cts.statsdatom.lib.AtomTestUtils; 43 44 /** 45 * Statsd shell data subscription test. 46 */ 47 public class ShellSubscriberTest extends DeviceTestCase { 48 private int sizetBytes; 49 50 public class ShellSubscriptionThread extends Thread { 51 String cmd; 52 CollectingByteOutputReceiver receiver; 53 int maxTimeoutForCommandSec; 54 ShellSubscriptionThread( String cmd, CollectingByteOutputReceiver receiver, int maxTimeoutForCommandSec)55 public ShellSubscriptionThread( 56 String cmd, 57 CollectingByteOutputReceiver receiver, 58 int maxTimeoutForCommandSec) { 59 this.cmd = cmd; 60 this.receiver = receiver; 61 this.maxTimeoutForCommandSec = maxTimeoutForCommandSec; 62 } 63 run()64 public void run() { 65 try { 66 getDevice().executeShellCommand(cmd, receiver, maxTimeoutForCommandSec, 67 /*maxTimeToOutputShellResponse=*/maxTimeoutForCommandSec, TimeUnit.SECONDS, 68 /*retryAttempts=*/0); 69 } catch (Exception e) { 70 fail(e.getMessage()); 71 } 72 } 73 } 74 75 @Override setUp()76 protected void setUp() throws Exception { 77 super.setUp(); 78 sizetBytes = getSizetBytes(); 79 } 80 testShellSubscription()81 public void testShellSubscription() { 82 if (sizetBytes < 0) { 83 return; 84 } 85 86 CollectingByteOutputReceiver receiver = startSubscription(); 87 checkOutput(receiver); 88 } 89 90 // This is testShellSubscription but 5x testShellSubscriptionReconnect()91 public void testShellSubscriptionReconnect() { 92 int numOfSubs = 5; 93 if (sizetBytes < 0) { 94 return; 95 } 96 97 for (int i = 0; i < numOfSubs; i++) { 98 CollectingByteOutputReceiver receiver = startSubscription(); 99 checkOutput(receiver); 100 } 101 } 102 103 // Tests that multiple clients can run at once: 104 // -Runs maximum number of active subscriptions (20) at once. 105 // -Maximum number of subscriptions minus 1 return: 106 // --Leave 1 subscription alive to ensure the subscriber helper thread stays alive. 107 // -Run maximum number of subscriptions minus 1 to reach the maximum running again. 108 // -Attempt to run one more subscription, which will fail. testShellMaxSubscriptions()109 public void testShellMaxSubscriptions() { 110 // Maximum number of active subscriptions, set in ShellSubscriber.h 111 int maxSubs = 20; 112 if (sizetBytes < 0) { 113 return; 114 } 115 CollectingByteOutputReceiver[] receivers = new CollectingByteOutputReceiver[maxSubs + 1]; 116 ShellSubscriptionThread[] shellThreads = new ShellSubscriptionThread[maxSubs + 1]; 117 ShellConfig.ShellSubscription config = createConfig(); 118 byte[] validConfig = makeValidConfig(config); 119 120 // timeout of 5 sec for all subscriptions except for the first 121 int timeout = 5; 122 // timeout of 25 sec to ensure that the first subscription stays active for two sessions 123 // of creating the maximum number of subscriptions 124 int firstSubTimeout = 25; 125 try { 126 // Push the shell config file to the device 127 String remotePath = pushShellConfigToDevice(validConfig); 128 129 String cmd = "cat " + remotePath + " | cmd stats data-subscribe " + timeout; 130 String firstSubCmd = 131 "cat " + remotePath + " | cmd stats data-subscribe " + firstSubTimeout; 132 133 for (int i = 0; i < maxSubs; i++) { 134 // Run data-subscribe on a thread 135 receivers[i] = new CollectingByteOutputReceiver(); 136 if (i == 0) { 137 shellThreads[i] = 138 new ShellSubscriptionThread(firstSubCmd, receivers[i], firstSubTimeout); 139 } else { 140 shellThreads[i] = 141 new ShellSubscriptionThread(cmd, receivers[i], timeout); 142 } 143 shellThreads[i].start(); 144 LogUtil.CLog.d("Starting new shell subscription."); 145 } 146 // Sleep 2 seconds to make sure all subscription clients are initialized before 147 // first pushed event 148 RunUtil.getDefault().sleep(2000); 149 150 // Pushed event. arbitrary label = 1 151 AtomTestUtils.sendAppBreadcrumbReportedAtom(getDevice(), 152 AtomsProto.AppBreadcrumbReported.State.UNSPECIFIED.getNumber(), 1); 153 154 // Make sure the last 19 threads die before moving to the next step. 155 // First subscription is still active due to its longer timeout that is used keep 156 // the subscriber helper thread alive 157 for (int i = 1; i < maxSubs; i++) { 158 shellThreads[i].join(); 159 } 160 161 // Validate the outputs of the last 19 subscriptions since they are finished 162 for (int i = 1; i < maxSubs; i++) { 163 checkOutput(receivers[i]); 164 } 165 166 // Run 19 more subscriptions to hit the maximum active subscriptions again 167 for (int i = 1; i < maxSubs; i++) { 168 // Run data-subscribe on a thread 169 receivers[i] = new CollectingByteOutputReceiver(); 170 shellThreads[i] = 171 new ShellSubscriptionThread(cmd, receivers[i], timeout); 172 shellThreads[i].start(); 173 LogUtil.CLog.d("Starting new shell subscription."); 174 } 175 // Sleep 2 seconds to make sure all subscription clients are initialized before 176 // pushed event 177 RunUtil.getDefault().sleep(2000); 178 179 // ShellSubscriber only allows 20 subscriptions at a time. This is the 21st which will 180 // be ignored 181 receivers[maxSubs] = new CollectingByteOutputReceiver(); 182 shellThreads[maxSubs] = 183 new ShellSubscriptionThread(cmd, receivers[maxSubs], timeout); 184 shellThreads[maxSubs].start(); 185 186 // Sleep 1 seconds to ensure that the 21st subscription is rejected 187 RunUtil.getDefault().sleep(1000); 188 189 // Pushed event. arbitrary label = 1 190 AtomTestUtils.sendAppBreadcrumbReportedAtom(getDevice(), 191 AtomsProto.AppBreadcrumbReported.State.UNSPECIFIED.getNumber(), 1); 192 193 // Make sure all the threads die before moving to the next step 194 for (int i = 0; i <= maxSubs; i++) { 195 shellThreads[i].join(); 196 } 197 // Remove config from device if not already deleted 198 getDevice().executeShellCommand("rm " + remotePath); 199 } catch (Exception e) { 200 fail(e.getMessage()); 201 } 202 for (int i = 0; i < maxSubs; i++) { 203 checkOutput(receivers[i]); 204 } 205 // Ensure that the 21st subscription got rejected and has an empty output 206 byte[] output = receivers[maxSubs].getOutput(); 207 assertThat(output.length).isEqualTo(0); 208 } 209 getSizetBytes()210 private int getSizetBytes() { 211 try { 212 ITestDevice device = getDevice(); 213 if (CpuFeatures.isArm64(device)) { 214 return 8; 215 } 216 if (CpuFeatures.isArm32(device)) { 217 return 4; 218 } 219 return -1; 220 } catch (DeviceNotAvailableException e) { 221 return -1; 222 } 223 } 224 createConfig()225 private ShellConfig.ShellSubscription createConfig() { 226 return ShellConfig.ShellSubscription.newBuilder() 227 .addPushed((StatsdConfigProto.SimpleAtomMatcher.newBuilder() 228 .setAtomId(Atom.APP_BREADCRUMB_REPORTED_FIELD_NUMBER)) 229 .build()).build(); 230 } 231 makeValidConfig(ShellConfig.ShellSubscription config)232 private byte[] makeValidConfig(ShellConfig.ShellSubscription config) { 233 int length = config.toByteArray().length; 234 byte[] validConfig = new byte[sizetBytes + length]; 235 System.arraycopy(IntToByteArrayLittleEndian(length), 0, validConfig, 0, sizetBytes); 236 System.arraycopy(config.toByteArray(), 0, validConfig, sizetBytes, length); 237 return validConfig; 238 } 239 pushShellConfigToDevice(byte[] validConfig)240 private String pushShellConfigToDevice(byte[] validConfig) { 241 try { 242 File configFile = File.createTempFile("shellconfig", ".config"); 243 configFile.deleteOnExit(); 244 Files.write(validConfig, configFile); 245 String remotePath = "/data/local/tmp/" + configFile.getName(); 246 getDevice().pushFile(configFile, remotePath); 247 return remotePath; 248 249 } catch (Exception e) { 250 fail(e.getMessage()); 251 } 252 return ""; 253 } 254 startSubscription()255 private CollectingByteOutputReceiver startSubscription() { 256 ShellConfig.ShellSubscription config = createConfig(); 257 CollectingByteOutputReceiver receiver = new CollectingByteOutputReceiver(); 258 LogUtil.CLog.d("Uploading the following config:\n" + config.toString()); 259 byte[] validConfig = makeValidConfig(config); 260 // timeout of 2 sec for both data-subscribe command and executeShellCommand in thread 261 int timeout = 2; 262 try { 263 // Push the shell config file to the device 264 String remotePath = pushShellConfigToDevice(validConfig); 265 266 String cmd = "cat " + remotePath + " | cmd stats data-subscribe " + timeout; 267 // Run data-subscribe on a thread 268 ShellSubscriptionThread shellThread = 269 new ShellSubscriptionThread(cmd, receiver, timeout); 270 shellThread.start(); 271 LogUtil.CLog.d("Starting new shell subscription."); 272 273 // Sleep a second to make sure subscription is initiated 274 RunUtil.getDefault().sleep(1000); 275 276 // Pushed event. arbitrary label = 1 277 AtomTestUtils.sendAppBreadcrumbReportedAtom(getDevice(), 278 AtomsProto.AppBreadcrumbReported.State.UNSPECIFIED.getNumber(), 1); 279 // Wait for thread to die before returning 280 shellThread.join(); 281 // Remove config from device if not already deleted 282 getDevice().executeShellCommand("rm " + remotePath); 283 } catch (Exception e) { 284 fail(e.getMessage()); 285 } 286 return receiver; 287 } 288 IntToByteArrayLittleEndian(int length)289 private byte[] IntToByteArrayLittleEndian(int length) { 290 ByteBuffer b = ByteBuffer.allocate(sizetBytes); 291 b.order(ByteOrder.LITTLE_ENDIAN); 292 b.putInt(length); 293 return b.array(); 294 } 295 296 // We do not know how much data will be returned, but we can check the data format. checkOutput(CollectingByteOutputReceiver receiver)297 private void checkOutput(CollectingByteOutputReceiver receiver) { 298 int atomCount = 0; 299 int startIndex = 0; 300 301 byte[] output = receiver.getOutput(); 302 LogUtil.CLog.d("output length in checkOutput: " + output.length); 303 assertThat(output.length).isGreaterThan(0); 304 while (output.length > startIndex) { 305 assertThat(output.length).isAtLeast(startIndex + sizetBytes); 306 int dataLength = readSizetFromByteArray(output, startIndex); 307 if (dataLength == 0) { 308 // We have received a heartbeat from statsd. This heartbeat isn't accompanied by any 309 // atoms so return to top of while loop. 310 startIndex += sizetBytes; 311 continue; 312 } 313 assertThat(output.length).isAtLeast(startIndex + sizetBytes + dataLength); 314 315 ShellDataProto.ShellData data = null; 316 try { 317 int dataStart = startIndex + sizetBytes; 318 int dataEnd = dataStart + dataLength; 319 data = ShellDataProto.ShellData.parseFrom( 320 Arrays.copyOfRange(output, dataStart, dataEnd)); 321 } catch (InvalidProtocolBufferException e) { 322 fail("Failed to parse proto"); 323 } 324 325 assertThat(data.getAtomCount()).isEqualTo(1); 326 assertThat(data.getAtom(0).hasAppBreadcrumbReported()).isTrue(); 327 assertThat(data.getAtom(0).getAppBreadcrumbReported().getLabel()).isEqualTo(1); 328 assertThat(data.getAtom(0).getAppBreadcrumbReported().getState().getNumber()) 329 .isEqualTo(1); 330 atomCount++; 331 startIndex += sizetBytes + dataLength; 332 } 333 assertThat(atomCount).isGreaterThan(0); 334 } 335 336 // Converts the bytes in range [startIndex, startIndex + sizetBytes) from a little-endian array 337 // into an integer. Even though sizetBytes could be greater than 4, we assume that the result 338 // will fit within an int. readSizetFromByteArray(byte[] arr, int startIndex)339 private int readSizetFromByteArray(byte[] arr, int startIndex) { 340 int value = 0; 341 for (int j = 0; j < sizetBytes; j++) { 342 value += ((int) arr[j + startIndex] & 0xffL) << (8 * j); 343 } 344 return value; 345 } 346 } 347