• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package android.cts.statsd.subscriber;
17 
18 import static com.google.common.truth.Truth.assertThat;
19 
20 import com.android.compatibility.common.util.CpuFeatures;
21 import com.android.internal.os.StatsdConfigProto;
22 import com.android.os.AtomsProto;
23 import com.android.os.AtomsProto.Atom;
24 import com.android.os.ShellConfig;
25 import com.android.os.statsd.ShellDataProto;
26 import com.android.tradefed.device.CollectingByteOutputReceiver;
27 import com.android.tradefed.device.DeviceNotAvailableException;
28 import com.android.tradefed.device.ITestDevice;
29 import com.android.tradefed.log.LogUtil;
30 import com.android.tradefed.testtype.DeviceTestCase;
31 import com.android.tradefed.util.RunUtil;
32 
33 import com.google.common.io.Files;
34 import com.google.protobuf.InvalidProtocolBufferException;
35 
36 import java.io.File;
37 import java.nio.ByteBuffer;
38 import java.nio.ByteOrder;
39 import java.util.Arrays;
40 import java.util.concurrent.TimeUnit;
41 
42 import android.cts.statsdatom.lib.AtomTestUtils;
43 
44 /**
45  * Statsd shell data subscription test.
46  */
47 public class ShellSubscriberTest extends DeviceTestCase {
48     private int sizetBytes;
49 
50     public class ShellSubscriptionThread extends Thread {
51         String cmd;
52         CollectingByteOutputReceiver receiver;
53         int maxTimeoutForCommandSec;
54 
ShellSubscriptionThread( String cmd, CollectingByteOutputReceiver receiver, int maxTimeoutForCommandSec)55         public ShellSubscriptionThread(
56                 String cmd,
57                 CollectingByteOutputReceiver receiver,
58                 int maxTimeoutForCommandSec) {
59             this.cmd = cmd;
60             this.receiver = receiver;
61             this.maxTimeoutForCommandSec = maxTimeoutForCommandSec;
62         }
63 
run()64         public void run() {
65             try {
66                 getDevice().executeShellCommand(cmd, receiver, maxTimeoutForCommandSec,
67                         /*maxTimeToOutputShellResponse=*/maxTimeoutForCommandSec, TimeUnit.SECONDS,
68                         /*retryAttempts=*/0);
69             } catch (Exception e) {
70                 fail(e.getMessage());
71             }
72         }
73     }
74 
75     @Override
setUp()76     protected void setUp() throws Exception {
77         super.setUp();
78         sizetBytes = getSizetBytes();
79     }
80 
testShellSubscription()81     public void testShellSubscription() {
82         if (sizetBytes < 0) {
83             return;
84         }
85 
86         CollectingByteOutputReceiver receiver = startSubscription();
87         checkOutput(receiver);
88     }
89 
90     // This is testShellSubscription but 5x
testShellSubscriptionReconnect()91     public void testShellSubscriptionReconnect() {
92         int numOfSubs = 5;
93         if (sizetBytes < 0) {
94             return;
95         }
96 
97         for (int i = 0; i < numOfSubs; i++) {
98             CollectingByteOutputReceiver receiver = startSubscription();
99             checkOutput(receiver);
100         }
101     }
102 
103     // Tests that multiple clients can run at once:
104     // -Runs maximum number of active subscriptions (20) at once.
105     // -Maximum number of subscriptions minus 1 return:
106     // --Leave 1 subscription alive to ensure the subscriber helper thread stays alive.
107     // -Run maximum number of subscriptions minus 1 to reach the maximum running again.
108     // -Attempt to run one more subscription, which will fail.
testShellMaxSubscriptions()109     public void testShellMaxSubscriptions() {
110         // Maximum number of active subscriptions, set in ShellSubscriber.h
111         int maxSubs = 20;
112         if (sizetBytes < 0) {
113             return;
114         }
115         CollectingByteOutputReceiver[] receivers = new CollectingByteOutputReceiver[maxSubs + 1];
116         ShellSubscriptionThread[] shellThreads = new ShellSubscriptionThread[maxSubs + 1];
117         ShellConfig.ShellSubscription config = createConfig();
118         byte[] validConfig = makeValidConfig(config);
119 
120         // timeout of 5 sec for all subscriptions except for the first
121         int timeout = 5;
122         // timeout of 25 sec to ensure that the first subscription stays active for two sessions
123         // of creating the maximum number of subscriptions
124         int firstSubTimeout = 25;
125         try {
126             // Push the shell config file to the device
127             String remotePath = pushShellConfigToDevice(validConfig);
128 
129             String cmd = "cat " + remotePath + " |  cmd stats data-subscribe " + timeout;
130             String firstSubCmd =
131                     "cat " + remotePath + " |  cmd stats data-subscribe " + firstSubTimeout;
132 
133             for (int i = 0; i < maxSubs; i++) {
134                 // Run data-subscribe on a thread
135                 receivers[i] = new CollectingByteOutputReceiver();
136                 if (i == 0) {
137                     shellThreads[i] =
138                             new ShellSubscriptionThread(firstSubCmd, receivers[i], firstSubTimeout);
139                 } else {
140                     shellThreads[i] =
141                             new ShellSubscriptionThread(cmd, receivers[i], timeout);
142                 }
143                 shellThreads[i].start();
144                 LogUtil.CLog.d("Starting new shell subscription.");
145             }
146             // Sleep 2 seconds to make sure all subscription clients are initialized before
147             // first pushed event
148             RunUtil.getDefault().sleep(2000);
149 
150             // Pushed event. arbitrary label = 1
151             AtomTestUtils.sendAppBreadcrumbReportedAtom(getDevice(),
152                     AtomsProto.AppBreadcrumbReported.State.UNSPECIFIED.getNumber(), 1);
153 
154             // Make sure the last 19 threads die before moving to the next step.
155             // First subscription is still active due to its longer timeout that is used keep
156             // the subscriber helper thread alive
157             for (int i = 1; i < maxSubs; i++) {
158                 shellThreads[i].join();
159             }
160 
161             // Validate the outputs of the last 19 subscriptions since they are finished
162             for (int i = 1; i < maxSubs; i++) {
163                 checkOutput(receivers[i]);
164             }
165 
166             // Run 19 more subscriptions to hit the maximum active subscriptions again
167             for (int i = 1; i < maxSubs; i++) {
168                 // Run data-subscribe on a thread
169                 receivers[i] = new CollectingByteOutputReceiver();
170                 shellThreads[i] =
171                         new ShellSubscriptionThread(cmd, receivers[i], timeout);
172                 shellThreads[i].start();
173                 LogUtil.CLog.d("Starting new shell subscription.");
174             }
175             // Sleep 2 seconds to make sure all subscription clients are initialized before
176             // pushed event
177             RunUtil.getDefault().sleep(2000);
178 
179             // ShellSubscriber only allows 20 subscriptions at a time. This is the 21st which will
180             // be ignored
181             receivers[maxSubs] = new CollectingByteOutputReceiver();
182             shellThreads[maxSubs] =
183                     new ShellSubscriptionThread(cmd, receivers[maxSubs], timeout);
184             shellThreads[maxSubs].start();
185 
186             // Sleep 1 seconds to ensure that the 21st subscription is rejected
187             RunUtil.getDefault().sleep(1000);
188 
189             // Pushed event. arbitrary label = 1
190             AtomTestUtils.sendAppBreadcrumbReportedAtom(getDevice(),
191                     AtomsProto.AppBreadcrumbReported.State.UNSPECIFIED.getNumber(), 1);
192 
193             // Make sure all the threads die before moving to the next step
194             for (int i = 0; i <= maxSubs; i++) {
195                 shellThreads[i].join();
196             }
197             // Remove config from device if not already deleted
198             getDevice().executeShellCommand("rm " + remotePath);
199         } catch (Exception e) {
200             fail(e.getMessage());
201         }
202         for (int i = 0; i < maxSubs; i++) {
203             checkOutput(receivers[i]);
204         }
205         // Ensure that the 21st subscription got rejected and has an empty output
206         byte[] output = receivers[maxSubs].getOutput();
207         assertThat(output.length).isEqualTo(0);
208     }
209 
getSizetBytes()210     private int getSizetBytes() {
211         try {
212             ITestDevice device = getDevice();
213             if (CpuFeatures.isArm64(device)) {
214                 return 8;
215             }
216             if (CpuFeatures.isArm32(device)) {
217                 return 4;
218             }
219             return -1;
220         } catch (DeviceNotAvailableException e) {
221             return -1;
222         }
223     }
224 
createConfig()225     private ShellConfig.ShellSubscription createConfig() {
226         return ShellConfig.ShellSubscription.newBuilder()
227                 .addPushed((StatsdConfigProto.SimpleAtomMatcher.newBuilder()
228                         .setAtomId(Atom.APP_BREADCRUMB_REPORTED_FIELD_NUMBER))
229                         .build()).build();
230     }
231 
makeValidConfig(ShellConfig.ShellSubscription config)232     private byte[] makeValidConfig(ShellConfig.ShellSubscription config) {
233         int length = config.toByteArray().length;
234         byte[] validConfig = new byte[sizetBytes + length];
235         System.arraycopy(IntToByteArrayLittleEndian(length), 0, validConfig, 0, sizetBytes);
236         System.arraycopy(config.toByteArray(), 0, validConfig, sizetBytes, length);
237         return validConfig;
238     }
239 
pushShellConfigToDevice(byte[] validConfig)240     private String pushShellConfigToDevice(byte[] validConfig) {
241         try {
242             File configFile = File.createTempFile("shellconfig", ".config");
243             configFile.deleteOnExit();
244             Files.write(validConfig, configFile);
245             String remotePath = "/data/local/tmp/" + configFile.getName();
246             getDevice().pushFile(configFile, remotePath);
247             return remotePath;
248 
249         } catch (Exception e) {
250             fail(e.getMessage());
251         }
252         return "";
253     }
254 
startSubscription()255     private CollectingByteOutputReceiver startSubscription() {
256         ShellConfig.ShellSubscription config = createConfig();
257         CollectingByteOutputReceiver receiver = new CollectingByteOutputReceiver();
258         LogUtil.CLog.d("Uploading the following config:\n" + config.toString());
259         byte[] validConfig = makeValidConfig(config);
260         // timeout of 2 sec for both data-subscribe command and executeShellCommand in thread
261         int timeout = 2;
262         try {
263             // Push the shell config file to the device
264             String remotePath = pushShellConfigToDevice(validConfig);
265 
266             String cmd = "cat " + remotePath + " |  cmd stats data-subscribe " + timeout;
267             // Run data-subscribe on a thread
268             ShellSubscriptionThread shellThread =
269                     new ShellSubscriptionThread(cmd, receiver, timeout);
270             shellThread.start();
271             LogUtil.CLog.d("Starting new shell subscription.");
272 
273             // Sleep a second to make sure subscription is initiated
274             RunUtil.getDefault().sleep(1000);
275 
276             // Pushed event. arbitrary label = 1
277             AtomTestUtils.sendAppBreadcrumbReportedAtom(getDevice(),
278                     AtomsProto.AppBreadcrumbReported.State.UNSPECIFIED.getNumber(), 1);
279             // Wait for thread to die before returning
280             shellThread.join();
281             // Remove config from device if not already deleted
282             getDevice().executeShellCommand("rm " + remotePath);
283         } catch (Exception e) {
284             fail(e.getMessage());
285         }
286         return receiver;
287     }
288 
IntToByteArrayLittleEndian(int length)289     private byte[] IntToByteArrayLittleEndian(int length) {
290         ByteBuffer b = ByteBuffer.allocate(sizetBytes);
291         b.order(ByteOrder.LITTLE_ENDIAN);
292         b.putInt(length);
293         return b.array();
294     }
295 
296     // We do not know how much data will be returned, but we can check the data format.
checkOutput(CollectingByteOutputReceiver receiver)297     private void checkOutput(CollectingByteOutputReceiver receiver) {
298         int atomCount = 0;
299         int startIndex = 0;
300 
301         byte[] output = receiver.getOutput();
302         LogUtil.CLog.d("output length in checkOutput: " + output.length);
303         assertThat(output.length).isGreaterThan(0);
304         while (output.length > startIndex) {
305             assertThat(output.length).isAtLeast(startIndex + sizetBytes);
306             int dataLength = readSizetFromByteArray(output, startIndex);
307             if (dataLength == 0) {
308                 // We have received a heartbeat from statsd. This heartbeat isn't accompanied by any
309                 // atoms so return to top of while loop.
310                 startIndex += sizetBytes;
311                 continue;
312             }
313             assertThat(output.length).isAtLeast(startIndex + sizetBytes + dataLength);
314 
315             ShellDataProto.ShellData data = null;
316             try {
317                 int dataStart = startIndex + sizetBytes;
318                 int dataEnd = dataStart + dataLength;
319                 data = ShellDataProto.ShellData.parseFrom(
320                         Arrays.copyOfRange(output, dataStart, dataEnd));
321             } catch (InvalidProtocolBufferException e) {
322                 fail("Failed to parse proto");
323             }
324 
325             assertThat(data.getAtomCount()).isEqualTo(1);
326             assertThat(data.getAtom(0).hasAppBreadcrumbReported()).isTrue();
327             assertThat(data.getAtom(0).getAppBreadcrumbReported().getLabel()).isEqualTo(1);
328             assertThat(data.getAtom(0).getAppBreadcrumbReported().getState().getNumber())
329                     .isEqualTo(1);
330             atomCount++;
331             startIndex += sizetBytes + dataLength;
332         }
333         assertThat(atomCount).isGreaterThan(0);
334     }
335 
336     // Converts the bytes in range [startIndex, startIndex + sizetBytes) from a little-endian array
337     // into an integer. Even though sizetBytes could be greater than 4, we assume that the result
338     // will fit within an int.
readSizetFromByteArray(byte[] arr, int startIndex)339     private int readSizetFromByteArray(byte[] arr, int startIndex) {
340         int value = 0;
341         for (int j = 0; j < sizetBytes; j++) {
342             value += ((int) arr[j + startIndex] & 0xffL) << (8 * j);
343         }
344         return value;
345     }
346 }
347