• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.android.utils.chre;
18 
19 import android.content.Context;
20 
21 import androidx.annotation.NonNull;
22 
23 import com.google.android.chre.utils.pigweed.ChreRpcClient;
24 import com.google.common.io.ByteSink;
25 import com.google.common.io.Files;
26 import com.google.protobuf.ByteString;
27 import com.google.protobuf.Empty;
28 import com.google.protobuf.MessageLite;
29 
30 import java.io.File;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Objects;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.TimeUnit;
43 
44 import dev.chre.rpc.proto.ChreApiTest;
45 import dev.pigweed.pw_rpc.Call.ServerStreamingFuture;
46 import dev.pigweed.pw_rpc.Call.UnaryFuture;
47 import dev.pigweed.pw_rpc.MethodClient;
48 import dev.pigweed.pw_rpc.Service;
49 import dev.pigweed.pw_rpc.UnaryResult;
50 
51 /**
52  * A set of helper functions for tests that use the CHRE API Test nanoapp.
53  */
54 public class ChreApiTestUtil {
55     /**
56      * The default timeout for an RPC call in seconds.
57      */
58     public static final int RPC_TIMEOUT_IN_SECONDS = 5;
59 
60     /**
61      * The default timeout for an RPC call in milliseconds.
62      */
63     public static final int RPC_TIMEOUT_IN_MS = RPC_TIMEOUT_IN_SECONDS * 1000;
64 
65     /**
66      * The default timeout for an RPC call in nanosecond.
67      */
68     public static final long RPC_TIMEOUT_IN_NS = RPC_TIMEOUT_IN_SECONDS * 1000000000L;
69 
70     /**
71      * The number of threads for the executor that executes the futures.
72      * We need at least 2 here. One to process the RPCs for server streaming
73      * and one to process events (which has server streaming as a dependent).
74      * 2 is the minimum needed to run smoothly without timeout issues.
75      */
76     private static final int NUM_THREADS_FOR_EXECUTOR = 2;
77 
78     /**
79      * The maximum number of samples to remove from the beginning of an audio
80      * data event. 8000 samples == 500ms.
81      */
82     private static final int MAX_LEADING_ZEROS_TO_REMOVE = 8000;
83 
84     /**
85      * CHRE audio format enum values for 8-bit and 16-bit audio formats.
86      */
87     private static final int CHRE_AUDIO_DATA_FORMAT_8_BIT = 0;
88     private static final int CHRE_AUDIO_DATA_FORMAT_16_BIT = 1;
89 
90     /**
91      * Executor for use with server streaming RPCs.
92      */
93     private final ExecutorService mExecutor =
94             Executors.newFixedThreadPool(NUM_THREADS_FOR_EXECUTOR);
95 
96     /**
97      * Storage for nanoapp streaming messages. This is a map from each RPC client to the
98      * list of messages received.
99      */
100     private final Map<ChreRpcClient, List<MessageLite>> mNanoappStreamingMessages =
101             new HashMap<ChreRpcClient, List<MessageLite>>();
102 
103     /**
104      * If true, there is an active server streaming RPC ongoing.
105      */
106     private boolean mActiveServerStreamingRpc = false;
107 
108     /**
109      * Calls a server streaming RPC method on multiple RPC clients. The RPC will be initiated for
110      * each client, then we will give each client a maximum of RPC_TIMEOUT_IN_SECONDS seconds of
111      * timeout, getting the futures in sequential order. The responses will have the same size
112      * as the input rpcClients size.
113      *
114      * @param <RequestType>   the type of the request (proto generated type).
115      * @param <ResponseType>  the type of the response (proto generated type).
116      * @param rpcClients      the RPC clients.
117      * @param method          the fully-qualified method name.
118      * @param request         the request.
119      *
120      * @return                the proto responses or null if there was an error.
121      */
122     public <RequestType extends MessageLite, ResponseType extends MessageLite>
callConcurrentServerStreamingRpcMethodSync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull RequestType request)123             List<List<ResponseType>> callConcurrentServerStreamingRpcMethodSync(
124                     @NonNull List<ChreRpcClient> rpcClients,
125                     @NonNull String method,
126                     @NonNull RequestType request) throws Exception {
127         Objects.requireNonNull(rpcClients);
128         Objects.requireNonNull(method);
129         Objects.requireNonNull(request);
130 
131         Future<List<List<ResponseType>>> responseFuture =
132                 callConcurrentServerStreamingRpcMethodAsync(rpcClients, method, request,
133                         RPC_TIMEOUT_IN_MS);
134         return responseFuture == null
135                 ? null
136                 : responseFuture.get(RPC_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
137     }
138 
139     /**
140      * Calls a server streaming RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout.
141      *
142      * @param <RequestType>   the type of the request (proto generated type).
143      * @param <ResponseType>  the type of the response (proto generated type).
144      * @param rpcClient       the RPC client.
145      * @param method          the fully-qualified method name.
146      * @param request         the request.
147      *
148      * @return                the proto response or null if there was an error.
149      */
150     public <RequestType extends MessageLite, ResponseType extends MessageLite> List<ResponseType>
callServerStreamingRpcMethodSync( @onNull ChreRpcClient rpcClient, @NonNull String method, @NonNull RequestType request)151             callServerStreamingRpcMethodSync(
152                     @NonNull ChreRpcClient rpcClient,
153                     @NonNull String method,
154                     @NonNull RequestType request) throws Exception {
155         Objects.requireNonNull(rpcClient);
156         Objects.requireNonNull(method);
157         Objects.requireNonNull(request);
158 
159         List<List<ResponseType>> responses = callConcurrentServerStreamingRpcMethodSync(
160                 Arrays.asList(rpcClient),
161                 method,
162                 request);
163         return responses == null || responses.isEmpty() ? null : responses.get(0);
164     }
165 
166     /**
167      * Calls a server streaming RPC method with RPC_TIMEOUT_IN_SECONDS seconds of
168      * timeout with an empty request.
169      *
170      * @param <ResponseType>  the type of the response (proto generated type).
171      * @param rpcClient       the RPC client.
172      * @param method          the fully-qualified method name.
173      *
174      * @return                the proto response or null if there was an error.
175      */
176     public <ResponseType extends MessageLite> List<ResponseType>
callServerStreamingRpcMethodSync( @onNull ChreRpcClient rpcClient, @NonNull String method)177             callServerStreamingRpcMethodSync(
178                     @NonNull ChreRpcClient rpcClient,
179                     @NonNull String method) throws Exception {
180         Objects.requireNonNull(rpcClient);
181         Objects.requireNonNull(method);
182 
183         Empty request = Empty.newBuilder().build();
184         return callServerStreamingRpcMethodSync(rpcClient, method, request);
185     }
186 
187     /**
188      * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout for concurrent
189      * instances of the ChreApiTest nanoapp.
190      *
191      * @param <RequestType>   the type of the request (proto generated type).
192      * @param <ResponseType>  the type of the response (proto generated type).
193      * @param rpcClients      the RPC clients corresponding to the instances of the
194      *                        ChreApiTest nanoapp.
195      * @param method          the fully-qualified method name.
196      * @param requests        the list of requests.
197      *
198      * @return                the proto response or null if there was an error.
199      */
200     public static <RequestType extends MessageLite, ResponseType extends MessageLite>
callConcurrentUnaryRpcMethodSync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull List<RequestType> requests)201             List<ResponseType> callConcurrentUnaryRpcMethodSync(
202                     @NonNull List<ChreRpcClient> rpcClients,
203                     @NonNull String method,
204                     @NonNull List<RequestType> requests) throws Exception {
205         Objects.requireNonNull(rpcClients);
206         Objects.requireNonNull(method);
207         Objects.requireNonNull(requests);
208         if (rpcClients.size() != requests.size()) {
209             return null;
210         }
211 
212         List<UnaryFuture<ResponseType>> responseFutures =
213                 new ArrayList<UnaryFuture<ResponseType>>();
214         Iterator<ChreRpcClient> rpcClientsIter = rpcClients.iterator();
215         Iterator<RequestType> requestsIter = requests.iterator();
216         while (rpcClientsIter.hasNext() && requestsIter.hasNext()) {
217             ChreRpcClient rpcClient = rpcClientsIter.next();
218             RequestType request = requestsIter.next();
219             MethodClient methodClient = rpcClient.getMethodClient(method);
220             responseFutures.add(methodClient.invokeUnaryFuture(request));
221         }
222 
223         List<ResponseType> responses = new ArrayList<ResponseType>();
224         boolean success = true;
225         long endTimeInMs = System.currentTimeMillis() + RPC_TIMEOUT_IN_MS;
226         for (UnaryFuture<ResponseType> responseFuture: responseFutures) {
227             try {
228                 UnaryResult<ResponseType> responseResult = responseFuture.get(
229                         Math.max(0, endTimeInMs - System.currentTimeMillis()),
230                                 TimeUnit.MILLISECONDS);
231                 responses.add(responseResult.response());
232             } catch (Exception exception) {
233                 success = false;
234             }
235         }
236         return success ? responses : null;
237     }
238 
239     /**
240      * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout for concurrent
241      * instances of the ChreApiTest nanoapp.
242      *
243      * @param <RequestType>   the type of the request (proto generated type).
244      * @param <ResponseType>  the type of the response (proto generated type).
245      * @param rpcClients      the RPC clients corresponding to the instances of the
246      *                        ChreApiTest nanoapp.
247      * @param method          the fully-qualified method name.
248      * @param request         the request.
249      *
250      * @return                the proto response or null if there was an error.
251      */
252     public static <RequestType extends MessageLite, ResponseType extends MessageLite>
callConcurrentUnaryRpcMethodSync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull RequestType request)253             List<ResponseType> callConcurrentUnaryRpcMethodSync(
254                     @NonNull List<ChreRpcClient> rpcClients,
255                     @NonNull String method,
256                     @NonNull RequestType request) throws Exception {
257         Objects.requireNonNull(rpcClients);
258         Objects.requireNonNull(method);
259         Objects.requireNonNull(request);
260 
261         List<RequestType> requests = new ArrayList<RequestType>();
262         for (int i = 0; i < rpcClients.size(); ++i) {
263             requests.add(request);
264         }
265         return callConcurrentUnaryRpcMethodSync(rpcClients, method, requests);
266     }
267 
268     /**
269      * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout.
270      *
271      * @param <RequestType>   the type of the request (proto generated type).
272      * @param <ResponseType>  the type of the response (proto generated type).
273      * @param rpcClient       the RPC client.
274      * @param method          the fully-qualified method name.
275      * @param request         the request.
276      *
277      * @return                the proto response or null if there was an error.
278      */
279     public static <RequestType extends MessageLite, ResponseType extends MessageLite> ResponseType
callUnaryRpcMethodSync( @onNull ChreRpcClient rpcClient, @NonNull String method, @NonNull RequestType request)280             callUnaryRpcMethodSync(
281                     @NonNull ChreRpcClient rpcClient,
282                     @NonNull String method,
283                     @NonNull RequestType request) throws Exception {
284         Objects.requireNonNull(rpcClient);
285         Objects.requireNonNull(method);
286         Objects.requireNonNull(request);
287 
288         List<ResponseType> responses = callConcurrentUnaryRpcMethodSync(Arrays.asList(rpcClient),
289                 method, request);
290         return responses == null || responses.isEmpty() ? null : responses.get(0);
291     }
292 
293     /**
294      * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout with an empty request.
295      *
296      * @param <ResponseType>  the type of the response (proto generated type).
297      * @param rpcClient       the RPC client.
298      * @param method          the fully-qualified method name.
299      *
300      * @return                the proto response or null if there was an error.
301      */
302     public static <ResponseType extends MessageLite> ResponseType
callUnaryRpcMethodSync(@onNull ChreRpcClient rpcClient, @NonNull String method)303             callUnaryRpcMethodSync(@NonNull ChreRpcClient rpcClient, @NonNull String method)
304             throws Exception {
305         Objects.requireNonNull(rpcClient);
306         Objects.requireNonNull(method);
307 
308         Empty request = Empty.newBuilder().build();
309         return callUnaryRpcMethodSync(rpcClient, method, request);
310     }
311 
312     /**
313      * Gathers events that match the eventTypes for each RPC client. This gathers
314      * events until eventCount events are gathered or timeoutInNs nanoseconds has passed.
315      * The host will wait until 2 * timeoutInNs to timeout receiving the response.
316      * The responses will have the same size as the input rpcClients size.
317      *
318      * @param rpcClients      the RPC clients.
319      * @param eventTypes      the types of event to gather.
320      * @param eventCount      the number of events to gather.
321      *
322      * @return                the events future.
323      */
gatherEventsConcurrent( @onNull List<ChreRpcClient> rpcClients, List<Integer> eventTypes, int eventCount, long timeoutInNs)324     public Future<List<List<ChreApiTest.GeneralEventsMessage>>> gatherEventsConcurrent(
325             @NonNull List<ChreRpcClient> rpcClients, List<Integer> eventTypes, int eventCount,
326             long timeoutInNs) throws Exception {
327         Objects.requireNonNull(rpcClients);
328 
329         ChreApiTest.GatherEventsInput input = ChreApiTest.GatherEventsInput.newBuilder()
330                 .addAllEventTypes(eventTypes)
331                 .setEventCount(eventCount)
332                 .setTimeoutInNs(timeoutInNs)
333                 .build();
334         return callConcurrentServerStreamingRpcMethodAsync(rpcClients,
335                 "chre.rpc.ChreApiTestService.GatherEvents", input,
336                 TimeUnit.NANOSECONDS.toMillis(2 * timeoutInNs));
337     }
338 
339     /**
340      * Gathers events that match the eventType for each RPC client. This gathers
341      * events until eventCount events are gathered or timeoutInNs nanoseconds has passed.
342      * The host will wait until 2 * timeoutInNs to timeout receiving the response.
343      * The responses will have the same size as the input rpcClients size.
344      *
345      * @param rpcClients      the RPC clients.
346      * @param eventType       the type of event to gather.
347      * @param eventCount      the number of events to gather.
348      *
349      * @return                the events future.
350      */
gatherEventsConcurrent( @onNull List<ChreRpcClient> rpcClients, int eventType, int eventCount, long timeoutInNs)351     public Future<List<List<ChreApiTest.GeneralEventsMessage>>> gatherEventsConcurrent(
352             @NonNull List<ChreRpcClient> rpcClients, int eventType, int eventCount,
353             long timeoutInNs) throws Exception {
354         Objects.requireNonNull(rpcClients);
355 
356         return gatherEventsConcurrent(rpcClients, Arrays.asList(eventType),
357                 eventCount, timeoutInNs);
358     }
359 
360     /**
361      * Gathers events that match the eventTypes for the RPC client. This gathers
362      * events until eventCount events are gathered or timeoutInNs nanoseconds has passed.
363      * The host will wait until 2 * timeoutInNs to timeout receiving the response.
364      *
365      * @param rpcClient       the RPC client.
366      * @param eventTypes      the types of event to gather.
367      * @param eventCount      the number of events to gather.
368      *
369      * @return                the events future.
370      */
gatherEvents( @onNull ChreRpcClient rpcClient, List<Integer> eventTypes, int eventCount, long timeoutInNs)371     public Future<List<ChreApiTest.GeneralEventsMessage>> gatherEvents(
372             @NonNull ChreRpcClient rpcClient, List<Integer> eventTypes, int eventCount,
373                     long timeoutInNs) throws Exception {
374         Objects.requireNonNull(rpcClient);
375 
376         Future<List<List<ChreApiTest.GeneralEventsMessage>>> eventsConcurrentFuture =
377                 gatherEventsConcurrent(Arrays.asList(rpcClient), eventTypes, eventCount,
378                         timeoutInNs);
379         return eventsConcurrentFuture == null ? null : mExecutor.submit(() -> {
380             List<List<ChreApiTest.GeneralEventsMessage>> events =
381                     eventsConcurrentFuture.get(2 * timeoutInNs, TimeUnit.NANOSECONDS);
382             return events == null || events.size() == 0 ? null : events.get(0);
383         });
384     }
385 
386     /**
387      * Gathers events that match the eventType for the RPC client. This gathers
388      * events until eventCount events are gathered or timeoutInNs nanoseconds has passed.
389      * The host will wait until 2 * timeoutInNs to timeout receiving the response.
390      *
391      * @param rpcClient       the RPC client.
392      * @param eventType       the type of event to gather.
393      * @param eventCount      the number of events to gather.
394      *
395      * @return                the events future.
396      */
gatherEvents( @onNull ChreRpcClient rpcClient, int eventType, int eventCount, long timeoutInNs)397     public Future<List<ChreApiTest.GeneralEventsMessage>> gatherEvents(
398             @NonNull ChreRpcClient rpcClient, int eventType, int eventCount,
399                     long timeoutInNs) throws Exception {
400         Objects.requireNonNull(rpcClient);
401 
402         return gatherEvents(rpcClient, Arrays.asList(eventType), eventCount, timeoutInNs);
403     }
404 
405     /**
406      * Gather and re-merge a single CHRE audio data event.
407      */
gatherAudioDataEvent( @onNull ChreRpcClient rpcClient, int audioEventType, int eventCount, long timeoutInNs)408     public ChreApiTest.ChreAudioDataEvent gatherAudioDataEvent(
409             @NonNull ChreRpcClient rpcClient, int audioEventType,
410                     int eventCount, long timeoutInNs) throws Exception {
411         Objects.requireNonNull(rpcClient);
412 
413         Future<List<ChreApiTest.GeneralEventsMessage>> audioEventsFuture =
414                 gatherEvents(rpcClient, audioEventType, eventCount,
415                              timeoutInNs);
416         List<ChreApiTest.GeneralEventsMessage> audioEvents =
417                 audioEventsFuture.get(2 * timeoutInNs, TimeUnit.NANOSECONDS);
418         return processAudioDataEvents(audioEvents);
419     }
420 
421     /**
422      * Re-merge a single CHRE audio data event.
423      */
processAudioDataEvents( @onNull List<ChreApiTest.GeneralEventsMessage> audioEvents)424     public ChreApiTest.ChreAudioDataEvent processAudioDataEvents(
425             @NonNull List<ChreApiTest.GeneralEventsMessage> audioEvents) throws Exception {
426         Objects.requireNonNull(audioEvents);
427         // Assert audioEvents isn't empty
428         if (audioEvents.size() == 0) {
429             return null;
430         }
431 
432         ChreApiTest.ChreAudioDataMetadata metadata =
433                 audioEvents.get(0).getChreAudioDataMetadata();
434         // 8-bit format == 0
435         // 16-bit format == 1
436         int bufferSize = metadata.getSampleCount() * (metadata.getFormat() + 1);
437         ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
438         ByteString sampleBytes;
439         boolean status = true;
440 
441         for (int i = 1; i < audioEvents.size() && status; ++i) {
442             ChreApiTest.ChreAudioDataSamples samples = audioEvents.get(i).getChreAudioDataSamples();
443             // assert samples sent/received in order
444             if (samples.getId() != (i - 1)) {
445                 status = false;
446             } else {
447                 buffer.put(samples.getSamples().toByteArray());
448             }
449         }
450 
451         // Remove leading zeros before creating the full event
452         buffer.rewind();
453         sampleBytes = removeLeadingZerosFromAudio(buffer, MAX_LEADING_ZEROS_TO_REMOVE,
454                                                   metadata.getFormat());
455         if (sampleBytes == null) {
456             status = false;
457         }
458 
459         ChreApiTest.ChreAudioDataEvent audioEvent =
460                 ChreApiTest.ChreAudioDataEvent.newBuilder()
461                         .setStatus(status)
462                         .setVersion(metadata.getVersion())
463                         .setReserved(metadata.getReserved())
464                         .setHandle(metadata.getHandle())
465                         .setTimestamp(metadata.getTimestamp())
466                         .setSampleRate(metadata.getSampleRate())
467                         .setSampleCount(metadata.getSampleCount())
468                         .setFormat(metadata.getFormat())
469                         .setSamples(sampleBytes)
470                         .build();
471 
472         return audioEvent;
473     }
474 
475     /**
476      * Gets the RPC service for the CHRE API Test nanoapp.
477      */
getChreApiService()478     public static Service getChreApiService() {
479         return new Service("chre.rpc.ChreApiTestService",
480                 Service.unaryMethod(
481                         "ChreBleGetCapabilities",
482                         Empty.parser(),
483                         ChreApiTest.Capabilities.parser()),
484                 Service.unaryMethod(
485                         "ChreBleGetFilterCapabilities",
486                         Empty.parser(),
487                         ChreApiTest.Capabilities.parser()),
488                 Service.serverStreamingMethod(
489                         "ChreBleStartScanSync",
490                         ChreApiTest.ChreBleStartScanAsyncInput.parser(),
491                         ChreApiTest.GeneralSyncMessage.parser()),
492                 Service.serverStreamingMethod(
493                         "ChreBleStopScanSync",
494                         Empty.parser(),
495                         ChreApiTest.GeneralSyncMessage.parser()),
496                 Service.unaryMethod(
497                         "ChreSensorFindDefault",
498                         ChreApiTest.ChreSensorFindDefaultInput.parser(),
499                         ChreApiTest.ChreSensorFindDefaultOutput.parser()),
500                 Service.unaryMethod(
501                         "ChreGetSensorInfo",
502                         ChreApiTest.ChreHandleInput.parser(),
503                         ChreApiTest.ChreGetSensorInfoOutput.parser()),
504                 Service.unaryMethod(
505                         "ChreGetSensorSamplingStatus",
506                         ChreApiTest.ChreHandleInput.parser(),
507                         ChreApiTest.ChreGetSensorSamplingStatusOutput.parser()),
508                 Service.unaryMethod(
509                         "ChreSensorConfigure",
510                         ChreApiTest.ChreSensorConfigureInput.parser(),
511                         ChreApiTest.Status.parser()),
512                 Service.unaryMethod(
513                         "ChreSensorConfigureModeOnly",
514                         ChreApiTest.ChreSensorConfigureModeOnlyInput.parser(),
515                         ChreApiTest.Status.parser()),
516                 Service.unaryMethod(
517                         "ChreAudioGetSource",
518                         ChreApiTest.ChreHandleInput.parser(),
519                         ChreApiTest.ChreAudioGetSourceOutput.parser()),
520                 Service.unaryMethod(
521                         "ChreAudioConfigureSource",
522                         ChreApiTest.ChreHandleInput.parser(),
523                         ChreApiTest.Status.parser()),
524                 Service.unaryMethod(
525                         "ChreAudioGetStatus",
526                         ChreApiTest.ChreHandleInput.parser(),
527                         ChreApiTest.ChreAudioGetStatusOutput.parser()),
528                 Service.unaryMethod(
529                         "ChreConfigureHostEndpointNotifications",
530                         ChreApiTest.ChreConfigureHostEndpointNotificationsInput.parser(),
531                         ChreApiTest.Status.parser()),
532                 Service.unaryMethod(
533                         "ChreGetHostEndpointInfo",
534                         ChreApiTest.ChreGetHostEndpointInfoInput.parser(),
535                         ChreApiTest.ChreGetHostEndpointInfoOutput.parser()),
536                 Service.serverStreamingMethod(
537                         "GatherEvents",
538                         ChreApiTest.GatherEventsInput.parser(),
539                         ChreApiTest.GeneralEventsMessage.parser()));
540     }
541 
542     /**
543      * Writes data out to permanent storage.
544      *
545      * @param data      Byte array holding the data to write out to file
546      * @param filename  Filename for the created file
547      * @param context   Current target context
548      */
writeDataToFile(byte[] data, String filename, Context context)549     public static void writeDataToFile(byte[] data, String filename,
550                 Context context) throws Exception {
551         File file = new File(context.getFilesDir(), filename);
552         ByteSink sink = Files.asByteSink(file);
553         sink.write(data);
554     }
555 
556     /**
557      * Removes leading 0 samples from an audio data packet
558      *
559      * @param buffer    ByteBuffer containing all the data
560      * @param limit     Max amount of samples to remove
561      * @param format    Audio data format from metadata
562      *
563      * @return          ByteString with the leading zero samples removed
564      */
removeLeadingZerosFromAudio(ByteBuffer buffer, int limit, int format)565     private ByteString removeLeadingZerosFromAudio(ByteBuffer buffer, int limit, int format) {
566         ByteString sampleString;
567 
568         if (format != CHRE_AUDIO_DATA_FORMAT_8_BIT && format != CHRE_AUDIO_DATA_FORMAT_16_BIT) {
569             return null;
570         }
571 
572         for (int i = 0; i < limit; ++i) {
573             int s;
574             if (format == CHRE_AUDIO_DATA_FORMAT_8_BIT) {
575                 s = buffer.get();
576             } else {
577                 s = buffer.getShort();
578             }
579 
580             if (s != 0) {
581                 break;
582             }
583         }
584 
585         // move back to the first non-zero sample
586         if (format == CHRE_AUDIO_DATA_FORMAT_8_BIT) {
587             buffer.position(buffer.position() - 1);
588         } else {
589             buffer.position(buffer.position() - 2);
590         }
591         sampleString = ByteString.copyFrom(buffer.slice());
592 
593         return sampleString;
594     }
595 
596     /**
597      * Calls a server streaming RPC method with timeoutInMs milliseconds of timeout on
598      * multiple RPC clients. This returns a Future for the result. The responses will have the same
599      * size as the input rpcClients size.
600      *
601      * @param <RequestType>   the type of the request (proto generated type).
602      * @param <ResponseType>  the type of the response (proto generated type).
603      * @param rpcClients      the RPC clients.
604      * @param method          the fully-qualified method name.
605      * @param requests        the list of requests.
606      * @param timeoutInMs     the timeout in milliseconds.
607      *
608      * @return                the Future for the response for null if there was an error.
609      */
610     private <RequestType extends MessageLite, ResponseType extends MessageLite>
callConcurrentServerStreamingRpcMethodAsync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull List<RequestType> requests, long timeoutInMs)611             Future<List<List<ResponseType>>> callConcurrentServerStreamingRpcMethodAsync(
612                     @NonNull List<ChreRpcClient> rpcClients,
613                     @NonNull String method,
614                     @NonNull List<RequestType> requests,
615                     long timeoutInMs) throws Exception {
616         Objects.requireNonNull(rpcClients);
617         Objects.requireNonNull(method);
618         Objects.requireNonNull(requests);
619         if (rpcClients.size() != requests.size()) {
620             return null;
621         }
622 
623         List<ServerStreamingFuture> responseFutures = new ArrayList<ServerStreamingFuture>();
624         synchronized (mNanoappStreamingMessages) {
625             if (mActiveServerStreamingRpc) {
626                 return null;
627             }
628 
629             Iterator<ChreRpcClient> rpcClientsIter = rpcClients.iterator();
630             Iterator<RequestType> requestsIter = requests.iterator();
631             while (rpcClientsIter.hasNext() && requestsIter.hasNext()) {
632                 ChreRpcClient rpcClient = rpcClientsIter.next();
633                 RequestType request = requestsIter.next();
634                 MethodClient methodClient = rpcClient.getMethodClient(method);
635                 ServerStreamingFuture responseFuture = methodClient.invokeServerStreamingFuture(
636                         request,
637                         (ResponseType response) -> {
638                             synchronized (mNanoappStreamingMessages) {
639                                 mNanoappStreamingMessages.putIfAbsent(rpcClient,
640                                         new ArrayList<MessageLite>());
641                                 mNanoappStreamingMessages.get(rpcClient).add(response);
642                             }
643                         });
644                 responseFutures.add(responseFuture);
645             }
646             mActiveServerStreamingRpc = true;
647         }
648 
649         final List<ChreRpcClient> rpcClientsFinal = rpcClients;
650         Future<List<List<ResponseType>>> responseFuture = mExecutor.submit(() -> {
651             boolean success = true;
652             long endTimeInMs = System.currentTimeMillis() + timeoutInMs;
653             for (ServerStreamingFuture future: responseFutures) {
654                 try {
655                     future.get(Math.max(0, endTimeInMs - System.currentTimeMillis()),
656                             TimeUnit.MILLISECONDS);
657                 } catch (Exception exception) {
658                     success = false;
659                 }
660             }
661 
662             synchronized (mNanoappStreamingMessages) {
663                 List<List<ResponseType>> responses = null;
664                 if (success) {
665                     responses = new ArrayList<List<ResponseType>>();
666                     for (ChreRpcClient rpcClient: rpcClientsFinal) {
667                         List<MessageLite> messages = mNanoappStreamingMessages.get(rpcClient);
668                         List<ResponseType> responseList = new ArrayList<ResponseType>();
669                         if (messages != null) {
670                             // Only needed to cast the type.
671                             for (MessageLite message: messages) {
672                                 responseList.add((ResponseType) message);
673                             }
674                         }
675 
676                         responses.add(responseList);
677                     }
678                 }
679 
680                 mNanoappStreamingMessages.clear();
681                 mActiveServerStreamingRpc = false;
682                 return responses;
683             }
684         });
685         return responseFuture;
686     }
687 
688     /**
689      * Calls a server streaming RPC method with timeoutInMs milliseconds of timeout on
690      * multiple RPC clients. This returns a Future for the result. The responses will have the same
691      * size as the input rpcClients size.
692      *
693      * @param <RequestType>   the type of the request (proto generated type).
694      * @param <ResponseType>  the type of the response (proto generated type).
695      * @param rpcClients      the RPC clients.
696      * @param method          the fully-qualified method name.
697      * @param request         the request.
698      * @param timeoutInMs     the timeout in milliseconds.
699      *
700      * @return                the Future for the response for null if there was an error.
701      */
702     private <RequestType extends MessageLite, ResponseType extends MessageLite>
callConcurrentServerStreamingRpcMethodAsync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull RequestType request, long timeoutInMs)703             Future<List<List<ResponseType>>> callConcurrentServerStreamingRpcMethodAsync(
704                     @NonNull List<ChreRpcClient> rpcClients,
705                     @NonNull String method,
706                     @NonNull RequestType request,
707                     long timeoutInMs) throws Exception {
708         Objects.requireNonNull(rpcClients);
709         Objects.requireNonNull(method);
710         Objects.requireNonNull(request);
711 
712         ArrayList<RequestType> requests = new ArrayList<RequestType>();
713         for (int i = 0; i < rpcClients.size(); ++i) {
714             requests.add(request);
715         }
716         return callConcurrentServerStreamingRpcMethodAsync(rpcClients, method,
717                 requests, timeoutInMs);
718     }
719 }
720