1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.android.utils.chre; 18 19 import android.content.Context; 20 21 import androidx.annotation.NonNull; 22 23 import com.google.android.chre.utils.pigweed.ChreRpcClient; 24 import com.google.common.io.ByteSink; 25 import com.google.common.io.Files; 26 import com.google.protobuf.ByteString; 27 import com.google.protobuf.Empty; 28 import com.google.protobuf.MessageLite; 29 30 import java.io.File; 31 import java.nio.ByteBuffer; 32 import java.util.ArrayList; 33 import java.util.Arrays; 34 import java.util.HashMap; 35 import java.util.Iterator; 36 import java.util.List; 37 import java.util.Map; 38 import java.util.Objects; 39 import java.util.concurrent.ExecutorService; 40 import java.util.concurrent.Executors; 41 import java.util.concurrent.Future; 42 import java.util.concurrent.TimeUnit; 43 44 import dev.chre.rpc.proto.ChreApiTest; 45 import dev.pigweed.pw_rpc.Call.ServerStreamingFuture; 46 import dev.pigweed.pw_rpc.Call.UnaryFuture; 47 import dev.pigweed.pw_rpc.MethodClient; 48 import dev.pigweed.pw_rpc.Service; 49 import dev.pigweed.pw_rpc.UnaryResult; 50 51 /** 52 * A set of helper functions for tests that use the CHRE API Test nanoapp. 53 */ 54 public class ChreApiTestUtil { 55 /** 56 * The default timeout for an RPC call in seconds. 57 */ 58 public static final int RPC_TIMEOUT_IN_SECONDS = 5; 59 60 /** 61 * The default timeout for an RPC call in milliseconds. 62 */ 63 public static final int RPC_TIMEOUT_IN_MS = RPC_TIMEOUT_IN_SECONDS * 1000; 64 65 /** 66 * The default timeout for an RPC call in nanosecond. 67 */ 68 public static final long RPC_TIMEOUT_IN_NS = RPC_TIMEOUT_IN_SECONDS * 1000000000L; 69 70 /** 71 * The number of threads for the executor that executes the futures. 72 * We need at least 2 here. One to process the RPCs for server streaming 73 * and one to process events (which has server streaming as a dependent). 74 * 2 is the minimum needed to run smoothly without timeout issues. 75 */ 76 private static final int NUM_THREADS_FOR_EXECUTOR = 2; 77 78 /** 79 * The maximum number of samples to remove from the beginning of an audio 80 * data event. 8000 samples == 500ms. 81 */ 82 private static final int MAX_LEADING_ZEROS_TO_REMOVE = 8000; 83 84 /** 85 * CHRE audio format enum values for 8-bit and 16-bit audio formats. 86 */ 87 private static final int CHRE_AUDIO_DATA_FORMAT_8_BIT = 0; 88 private static final int CHRE_AUDIO_DATA_FORMAT_16_BIT = 1; 89 90 /** 91 * Executor for use with server streaming RPCs. 92 */ 93 private final ExecutorService mExecutor = 94 Executors.newFixedThreadPool(NUM_THREADS_FOR_EXECUTOR); 95 96 /** 97 * Storage for nanoapp streaming messages. This is a map from each RPC client to the 98 * list of messages received. 99 */ 100 private final Map<ChreRpcClient, List<MessageLite>> mNanoappStreamingMessages = 101 new HashMap<ChreRpcClient, List<MessageLite>>(); 102 103 /** 104 * If true, there is an active server streaming RPC ongoing. 105 */ 106 private boolean mActiveServerStreamingRpc = false; 107 108 /** 109 * Calls a server streaming RPC method on multiple RPC clients. The RPC will be initiated for 110 * each client, then we will give each client a maximum of RPC_TIMEOUT_IN_SECONDS seconds of 111 * timeout, getting the futures in sequential order. The responses will have the same size 112 * as the input rpcClients size. 113 * 114 * @param <RequestType> the type of the request (proto generated type). 115 * @param <ResponseType> the type of the response (proto generated type). 116 * @param rpcClients the RPC clients. 117 * @param method the fully-qualified method name. 118 * @param request the request. 119 * 120 * @return the proto responses or null if there was an error. 121 */ 122 public <RequestType extends MessageLite, ResponseType extends MessageLite> callConcurrentServerStreamingRpcMethodSync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull RequestType request)123 List<List<ResponseType>> callConcurrentServerStreamingRpcMethodSync( 124 @NonNull List<ChreRpcClient> rpcClients, 125 @NonNull String method, 126 @NonNull RequestType request) throws Exception { 127 Objects.requireNonNull(rpcClients); 128 Objects.requireNonNull(method); 129 Objects.requireNonNull(request); 130 131 Future<List<List<ResponseType>>> responseFuture = 132 callConcurrentServerStreamingRpcMethodAsync(rpcClients, method, request, 133 RPC_TIMEOUT_IN_MS); 134 return responseFuture == null 135 ? null 136 : responseFuture.get(RPC_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); 137 } 138 139 /** 140 * Calls a server streaming RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout. 141 * 142 * @param <RequestType> the type of the request (proto generated type). 143 * @param <ResponseType> the type of the response (proto generated type). 144 * @param rpcClient the RPC client. 145 * @param method the fully-qualified method name. 146 * @param request the request. 147 * 148 * @return the proto response or null if there was an error. 149 */ 150 public <RequestType extends MessageLite, ResponseType extends MessageLite> List<ResponseType> callServerStreamingRpcMethodSync( @onNull ChreRpcClient rpcClient, @NonNull String method, @NonNull RequestType request)151 callServerStreamingRpcMethodSync( 152 @NonNull ChreRpcClient rpcClient, 153 @NonNull String method, 154 @NonNull RequestType request) throws Exception { 155 Objects.requireNonNull(rpcClient); 156 Objects.requireNonNull(method); 157 Objects.requireNonNull(request); 158 159 List<List<ResponseType>> responses = callConcurrentServerStreamingRpcMethodSync( 160 Arrays.asList(rpcClient), 161 method, 162 request); 163 return responses == null || responses.isEmpty() ? null : responses.get(0); 164 } 165 166 /** 167 * Calls a server streaming RPC method with RPC_TIMEOUT_IN_SECONDS seconds of 168 * timeout with an empty request. 169 * 170 * @param <ResponseType> the type of the response (proto generated type). 171 * @param rpcClient the RPC client. 172 * @param method the fully-qualified method name. 173 * 174 * @return the proto response or null if there was an error. 175 */ 176 public <ResponseType extends MessageLite> List<ResponseType> callServerStreamingRpcMethodSync( @onNull ChreRpcClient rpcClient, @NonNull String method)177 callServerStreamingRpcMethodSync( 178 @NonNull ChreRpcClient rpcClient, 179 @NonNull String method) throws Exception { 180 Objects.requireNonNull(rpcClient); 181 Objects.requireNonNull(method); 182 183 Empty request = Empty.newBuilder().build(); 184 return callServerStreamingRpcMethodSync(rpcClient, method, request); 185 } 186 187 /** 188 * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout for concurrent 189 * instances of the ChreApiTest nanoapp. 190 * 191 * @param <RequestType> the type of the request (proto generated type). 192 * @param <ResponseType> the type of the response (proto generated type). 193 * @param rpcClients the RPC clients corresponding to the instances of the 194 * ChreApiTest nanoapp. 195 * @param method the fully-qualified method name. 196 * @param requests the list of requests. 197 * 198 * @return the proto response or null if there was an error. 199 */ 200 public static <RequestType extends MessageLite, ResponseType extends MessageLite> callConcurrentUnaryRpcMethodSync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull List<RequestType> requests)201 List<ResponseType> callConcurrentUnaryRpcMethodSync( 202 @NonNull List<ChreRpcClient> rpcClients, 203 @NonNull String method, 204 @NonNull List<RequestType> requests) throws Exception { 205 Objects.requireNonNull(rpcClients); 206 Objects.requireNonNull(method); 207 Objects.requireNonNull(requests); 208 if (rpcClients.size() != requests.size()) { 209 return null; 210 } 211 212 List<UnaryFuture<ResponseType>> responseFutures = 213 new ArrayList<UnaryFuture<ResponseType>>(); 214 Iterator<ChreRpcClient> rpcClientsIter = rpcClients.iterator(); 215 Iterator<RequestType> requestsIter = requests.iterator(); 216 while (rpcClientsIter.hasNext() && requestsIter.hasNext()) { 217 ChreRpcClient rpcClient = rpcClientsIter.next(); 218 RequestType request = requestsIter.next(); 219 MethodClient methodClient = rpcClient.getMethodClient(method); 220 responseFutures.add(methodClient.invokeUnaryFuture(request)); 221 } 222 223 List<ResponseType> responses = new ArrayList<ResponseType>(); 224 boolean success = true; 225 long endTimeInMs = System.currentTimeMillis() + RPC_TIMEOUT_IN_MS; 226 for (UnaryFuture<ResponseType> responseFuture: responseFutures) { 227 try { 228 UnaryResult<ResponseType> responseResult = responseFuture.get( 229 Math.max(0, endTimeInMs - System.currentTimeMillis()), 230 TimeUnit.MILLISECONDS); 231 responses.add(responseResult.response()); 232 } catch (Exception exception) { 233 success = false; 234 } 235 } 236 return success ? responses : null; 237 } 238 239 /** 240 * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout for concurrent 241 * instances of the ChreApiTest nanoapp. 242 * 243 * @param <RequestType> the type of the request (proto generated type). 244 * @param <ResponseType> the type of the response (proto generated type). 245 * @param rpcClients the RPC clients corresponding to the instances of the 246 * ChreApiTest nanoapp. 247 * @param method the fully-qualified method name. 248 * @param request the request. 249 * 250 * @return the proto response or null if there was an error. 251 */ 252 public static <RequestType extends MessageLite, ResponseType extends MessageLite> callConcurrentUnaryRpcMethodSync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull RequestType request)253 List<ResponseType> callConcurrentUnaryRpcMethodSync( 254 @NonNull List<ChreRpcClient> rpcClients, 255 @NonNull String method, 256 @NonNull RequestType request) throws Exception { 257 Objects.requireNonNull(rpcClients); 258 Objects.requireNonNull(method); 259 Objects.requireNonNull(request); 260 261 List<RequestType> requests = new ArrayList<RequestType>(); 262 for (int i = 0; i < rpcClients.size(); ++i) { 263 requests.add(request); 264 } 265 return callConcurrentUnaryRpcMethodSync(rpcClients, method, requests); 266 } 267 268 /** 269 * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout. 270 * 271 * @param <RequestType> the type of the request (proto generated type). 272 * @param <ResponseType> the type of the response (proto generated type). 273 * @param rpcClient the RPC client. 274 * @param method the fully-qualified method name. 275 * @param request the request. 276 * 277 * @return the proto response or null if there was an error. 278 */ 279 public static <RequestType extends MessageLite, ResponseType extends MessageLite> ResponseType callUnaryRpcMethodSync( @onNull ChreRpcClient rpcClient, @NonNull String method, @NonNull RequestType request)280 callUnaryRpcMethodSync( 281 @NonNull ChreRpcClient rpcClient, 282 @NonNull String method, 283 @NonNull RequestType request) throws Exception { 284 Objects.requireNonNull(rpcClient); 285 Objects.requireNonNull(method); 286 Objects.requireNonNull(request); 287 288 List<ResponseType> responses = callConcurrentUnaryRpcMethodSync(Arrays.asList(rpcClient), 289 method, request); 290 return responses == null || responses.isEmpty() ? null : responses.get(0); 291 } 292 293 /** 294 * Calls an RPC method with RPC_TIMEOUT_IN_SECONDS seconds of timeout with an empty request. 295 * 296 * @param <ResponseType> the type of the response (proto generated type). 297 * @param rpcClient the RPC client. 298 * @param method the fully-qualified method name. 299 * 300 * @return the proto response or null if there was an error. 301 */ 302 public static <ResponseType extends MessageLite> ResponseType callUnaryRpcMethodSync(@onNull ChreRpcClient rpcClient, @NonNull String method)303 callUnaryRpcMethodSync(@NonNull ChreRpcClient rpcClient, @NonNull String method) 304 throws Exception { 305 Objects.requireNonNull(rpcClient); 306 Objects.requireNonNull(method); 307 308 Empty request = Empty.newBuilder().build(); 309 return callUnaryRpcMethodSync(rpcClient, method, request); 310 } 311 312 /** 313 * Gathers events that match the eventTypes for each RPC client. This gathers 314 * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. 315 * The host will wait until 2 * timeoutInNs to timeout receiving the response. 316 * The responses will have the same size as the input rpcClients size. 317 * 318 * @param rpcClients the RPC clients. 319 * @param eventTypes the types of event to gather. 320 * @param eventCount the number of events to gather. 321 * 322 * @return the events future. 323 */ gatherEventsConcurrent( @onNull List<ChreRpcClient> rpcClients, List<Integer> eventTypes, int eventCount, long timeoutInNs)324 public Future<List<List<ChreApiTest.GeneralEventsMessage>>> gatherEventsConcurrent( 325 @NonNull List<ChreRpcClient> rpcClients, List<Integer> eventTypes, int eventCount, 326 long timeoutInNs) throws Exception { 327 Objects.requireNonNull(rpcClients); 328 329 ChreApiTest.GatherEventsInput input = ChreApiTest.GatherEventsInput.newBuilder() 330 .addAllEventTypes(eventTypes) 331 .setEventCount(eventCount) 332 .setTimeoutInNs(timeoutInNs) 333 .build(); 334 return callConcurrentServerStreamingRpcMethodAsync(rpcClients, 335 "chre.rpc.ChreApiTestService.GatherEvents", input, 336 TimeUnit.NANOSECONDS.toMillis(2 * timeoutInNs)); 337 } 338 339 /** 340 * Gathers events that match the eventType for each RPC client. This gathers 341 * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. 342 * The host will wait until 2 * timeoutInNs to timeout receiving the response. 343 * The responses will have the same size as the input rpcClients size. 344 * 345 * @param rpcClients the RPC clients. 346 * @param eventType the type of event to gather. 347 * @param eventCount the number of events to gather. 348 * 349 * @return the events future. 350 */ gatherEventsConcurrent( @onNull List<ChreRpcClient> rpcClients, int eventType, int eventCount, long timeoutInNs)351 public Future<List<List<ChreApiTest.GeneralEventsMessage>>> gatherEventsConcurrent( 352 @NonNull List<ChreRpcClient> rpcClients, int eventType, int eventCount, 353 long timeoutInNs) throws Exception { 354 Objects.requireNonNull(rpcClients); 355 356 return gatherEventsConcurrent(rpcClients, Arrays.asList(eventType), 357 eventCount, timeoutInNs); 358 } 359 360 /** 361 * Gathers events that match the eventTypes for the RPC client. This gathers 362 * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. 363 * The host will wait until 2 * timeoutInNs to timeout receiving the response. 364 * 365 * @param rpcClient the RPC client. 366 * @param eventTypes the types of event to gather. 367 * @param eventCount the number of events to gather. 368 * 369 * @return the events future. 370 */ gatherEvents( @onNull ChreRpcClient rpcClient, List<Integer> eventTypes, int eventCount, long timeoutInNs)371 public Future<List<ChreApiTest.GeneralEventsMessage>> gatherEvents( 372 @NonNull ChreRpcClient rpcClient, List<Integer> eventTypes, int eventCount, 373 long timeoutInNs) throws Exception { 374 Objects.requireNonNull(rpcClient); 375 376 Future<List<List<ChreApiTest.GeneralEventsMessage>>> eventsConcurrentFuture = 377 gatherEventsConcurrent(Arrays.asList(rpcClient), eventTypes, eventCount, 378 timeoutInNs); 379 return eventsConcurrentFuture == null ? null : mExecutor.submit(() -> { 380 List<List<ChreApiTest.GeneralEventsMessage>> events = 381 eventsConcurrentFuture.get(2 * timeoutInNs, TimeUnit.NANOSECONDS); 382 return events == null || events.size() == 0 ? null : events.get(0); 383 }); 384 } 385 386 /** 387 * Gathers events that match the eventType for the RPC client. This gathers 388 * events until eventCount events are gathered or timeoutInNs nanoseconds has passed. 389 * The host will wait until 2 * timeoutInNs to timeout receiving the response. 390 * 391 * @param rpcClient the RPC client. 392 * @param eventType the type of event to gather. 393 * @param eventCount the number of events to gather. 394 * 395 * @return the events future. 396 */ gatherEvents( @onNull ChreRpcClient rpcClient, int eventType, int eventCount, long timeoutInNs)397 public Future<List<ChreApiTest.GeneralEventsMessage>> gatherEvents( 398 @NonNull ChreRpcClient rpcClient, int eventType, int eventCount, 399 long timeoutInNs) throws Exception { 400 Objects.requireNonNull(rpcClient); 401 402 return gatherEvents(rpcClient, Arrays.asList(eventType), eventCount, timeoutInNs); 403 } 404 405 /** 406 * Gather and re-merge a single CHRE audio data event. 407 */ gatherAudioDataEvent( @onNull ChreRpcClient rpcClient, int audioEventType, int eventCount, long timeoutInNs)408 public ChreApiTest.ChreAudioDataEvent gatherAudioDataEvent( 409 @NonNull ChreRpcClient rpcClient, int audioEventType, 410 int eventCount, long timeoutInNs) throws Exception { 411 Objects.requireNonNull(rpcClient); 412 413 Future<List<ChreApiTest.GeneralEventsMessage>> audioEventsFuture = 414 gatherEvents(rpcClient, audioEventType, eventCount, 415 timeoutInNs); 416 List<ChreApiTest.GeneralEventsMessage> audioEvents = 417 audioEventsFuture.get(2 * timeoutInNs, TimeUnit.NANOSECONDS); 418 return processAudioDataEvents(audioEvents); 419 } 420 421 /** 422 * Re-merge a single CHRE audio data event. 423 */ processAudioDataEvents( @onNull List<ChreApiTest.GeneralEventsMessage> audioEvents)424 public ChreApiTest.ChreAudioDataEvent processAudioDataEvents( 425 @NonNull List<ChreApiTest.GeneralEventsMessage> audioEvents) throws Exception { 426 Objects.requireNonNull(audioEvents); 427 // Assert audioEvents isn't empty 428 if (audioEvents.size() == 0) { 429 return null; 430 } 431 432 ChreApiTest.ChreAudioDataMetadata metadata = 433 audioEvents.get(0).getChreAudioDataMetadata(); 434 // 8-bit format == 0 435 // 16-bit format == 1 436 int bufferSize = metadata.getSampleCount() * (metadata.getFormat() + 1); 437 ByteBuffer buffer = ByteBuffer.allocate(bufferSize); 438 ByteString sampleBytes; 439 boolean status = true; 440 441 for (int i = 1; i < audioEvents.size() && status; ++i) { 442 ChreApiTest.ChreAudioDataSamples samples = audioEvents.get(i).getChreAudioDataSamples(); 443 // assert samples sent/received in order 444 if (samples.getId() != (i - 1)) { 445 status = false; 446 } else { 447 buffer.put(samples.getSamples().toByteArray()); 448 } 449 } 450 451 // Remove leading zeros before creating the full event 452 buffer.rewind(); 453 sampleBytes = removeLeadingZerosFromAudio(buffer, MAX_LEADING_ZEROS_TO_REMOVE, 454 metadata.getFormat()); 455 if (sampleBytes == null) { 456 status = false; 457 } 458 459 ChreApiTest.ChreAudioDataEvent audioEvent = 460 ChreApiTest.ChreAudioDataEvent.newBuilder() 461 .setStatus(status) 462 .setVersion(metadata.getVersion()) 463 .setReserved(metadata.getReserved()) 464 .setHandle(metadata.getHandle()) 465 .setTimestamp(metadata.getTimestamp()) 466 .setSampleRate(metadata.getSampleRate()) 467 .setSampleCount(metadata.getSampleCount()) 468 .setFormat(metadata.getFormat()) 469 .setSamples(sampleBytes) 470 .build(); 471 472 return audioEvent; 473 } 474 475 /** 476 * Gets the RPC service for the CHRE API Test nanoapp. 477 */ getChreApiService()478 public static Service getChreApiService() { 479 return new Service("chre.rpc.ChreApiTestService", 480 Service.unaryMethod( 481 "ChreBleGetCapabilities", 482 Empty.parser(), 483 ChreApiTest.Capabilities.parser()), 484 Service.unaryMethod( 485 "ChreBleGetFilterCapabilities", 486 Empty.parser(), 487 ChreApiTest.Capabilities.parser()), 488 Service.serverStreamingMethod( 489 "ChreBleStartScanSync", 490 ChreApiTest.ChreBleStartScanAsyncInput.parser(), 491 ChreApiTest.GeneralSyncMessage.parser()), 492 Service.serverStreamingMethod( 493 "ChreBleStopScanSync", 494 Empty.parser(), 495 ChreApiTest.GeneralSyncMessage.parser()), 496 Service.unaryMethod( 497 "ChreSensorFindDefault", 498 ChreApiTest.ChreSensorFindDefaultInput.parser(), 499 ChreApiTest.ChreSensorFindDefaultOutput.parser()), 500 Service.unaryMethod( 501 "ChreGetSensorInfo", 502 ChreApiTest.ChreHandleInput.parser(), 503 ChreApiTest.ChreGetSensorInfoOutput.parser()), 504 Service.unaryMethod( 505 "ChreGetSensorSamplingStatus", 506 ChreApiTest.ChreHandleInput.parser(), 507 ChreApiTest.ChreGetSensorSamplingStatusOutput.parser()), 508 Service.unaryMethod( 509 "ChreSensorConfigure", 510 ChreApiTest.ChreSensorConfigureInput.parser(), 511 ChreApiTest.Status.parser()), 512 Service.unaryMethod( 513 "ChreSensorConfigureModeOnly", 514 ChreApiTest.ChreSensorConfigureModeOnlyInput.parser(), 515 ChreApiTest.Status.parser()), 516 Service.unaryMethod( 517 "ChreAudioGetSource", 518 ChreApiTest.ChreHandleInput.parser(), 519 ChreApiTest.ChreAudioGetSourceOutput.parser()), 520 Service.unaryMethod( 521 "ChreAudioConfigureSource", 522 ChreApiTest.ChreHandleInput.parser(), 523 ChreApiTest.Status.parser()), 524 Service.unaryMethod( 525 "ChreAudioGetStatus", 526 ChreApiTest.ChreHandleInput.parser(), 527 ChreApiTest.ChreAudioGetStatusOutput.parser()), 528 Service.unaryMethod( 529 "ChreConfigureHostEndpointNotifications", 530 ChreApiTest.ChreConfigureHostEndpointNotificationsInput.parser(), 531 ChreApiTest.Status.parser()), 532 Service.unaryMethod( 533 "ChreGetHostEndpointInfo", 534 ChreApiTest.ChreGetHostEndpointInfoInput.parser(), 535 ChreApiTest.ChreGetHostEndpointInfoOutput.parser()), 536 Service.serverStreamingMethod( 537 "GatherEvents", 538 ChreApiTest.GatherEventsInput.parser(), 539 ChreApiTest.GeneralEventsMessage.parser())); 540 } 541 542 /** 543 * Writes data out to permanent storage. 544 * 545 * @param data Byte array holding the data to write out to file 546 * @param filename Filename for the created file 547 * @param context Current target context 548 */ writeDataToFile(byte[] data, String filename, Context context)549 public static void writeDataToFile(byte[] data, String filename, 550 Context context) throws Exception { 551 File file = new File(context.getFilesDir(), filename); 552 ByteSink sink = Files.asByteSink(file); 553 sink.write(data); 554 } 555 556 /** 557 * Removes leading 0 samples from an audio data packet 558 * 559 * @param buffer ByteBuffer containing all the data 560 * @param limit Max amount of samples to remove 561 * @param format Audio data format from metadata 562 * 563 * @return ByteString with the leading zero samples removed 564 */ removeLeadingZerosFromAudio(ByteBuffer buffer, int limit, int format)565 private ByteString removeLeadingZerosFromAudio(ByteBuffer buffer, int limit, int format) { 566 ByteString sampleString; 567 568 if (format != CHRE_AUDIO_DATA_FORMAT_8_BIT && format != CHRE_AUDIO_DATA_FORMAT_16_BIT) { 569 return null; 570 } 571 572 for (int i = 0; i < limit; ++i) { 573 int s; 574 if (format == CHRE_AUDIO_DATA_FORMAT_8_BIT) { 575 s = buffer.get(); 576 } else { 577 s = buffer.getShort(); 578 } 579 580 if (s != 0) { 581 break; 582 } 583 } 584 585 // move back to the first non-zero sample 586 if (format == CHRE_AUDIO_DATA_FORMAT_8_BIT) { 587 buffer.position(buffer.position() - 1); 588 } else { 589 buffer.position(buffer.position() - 2); 590 } 591 sampleString = ByteString.copyFrom(buffer.slice()); 592 593 return sampleString; 594 } 595 596 /** 597 * Calls a server streaming RPC method with timeoutInMs milliseconds of timeout on 598 * multiple RPC clients. This returns a Future for the result. The responses will have the same 599 * size as the input rpcClients size. 600 * 601 * @param <RequestType> the type of the request (proto generated type). 602 * @param <ResponseType> the type of the response (proto generated type). 603 * @param rpcClients the RPC clients. 604 * @param method the fully-qualified method name. 605 * @param requests the list of requests. 606 * @param timeoutInMs the timeout in milliseconds. 607 * 608 * @return the Future for the response for null if there was an error. 609 */ 610 private <RequestType extends MessageLite, ResponseType extends MessageLite> callConcurrentServerStreamingRpcMethodAsync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull List<RequestType> requests, long timeoutInMs)611 Future<List<List<ResponseType>>> callConcurrentServerStreamingRpcMethodAsync( 612 @NonNull List<ChreRpcClient> rpcClients, 613 @NonNull String method, 614 @NonNull List<RequestType> requests, 615 long timeoutInMs) throws Exception { 616 Objects.requireNonNull(rpcClients); 617 Objects.requireNonNull(method); 618 Objects.requireNonNull(requests); 619 if (rpcClients.size() != requests.size()) { 620 return null; 621 } 622 623 List<ServerStreamingFuture> responseFutures = new ArrayList<ServerStreamingFuture>(); 624 synchronized (mNanoappStreamingMessages) { 625 if (mActiveServerStreamingRpc) { 626 return null; 627 } 628 629 Iterator<ChreRpcClient> rpcClientsIter = rpcClients.iterator(); 630 Iterator<RequestType> requestsIter = requests.iterator(); 631 while (rpcClientsIter.hasNext() && requestsIter.hasNext()) { 632 ChreRpcClient rpcClient = rpcClientsIter.next(); 633 RequestType request = requestsIter.next(); 634 MethodClient methodClient = rpcClient.getMethodClient(method); 635 ServerStreamingFuture responseFuture = methodClient.invokeServerStreamingFuture( 636 request, 637 (ResponseType response) -> { 638 synchronized (mNanoappStreamingMessages) { 639 mNanoappStreamingMessages.putIfAbsent(rpcClient, 640 new ArrayList<MessageLite>()); 641 mNanoappStreamingMessages.get(rpcClient).add(response); 642 } 643 }); 644 responseFutures.add(responseFuture); 645 } 646 mActiveServerStreamingRpc = true; 647 } 648 649 final List<ChreRpcClient> rpcClientsFinal = rpcClients; 650 Future<List<List<ResponseType>>> responseFuture = mExecutor.submit(() -> { 651 boolean success = true; 652 long endTimeInMs = System.currentTimeMillis() + timeoutInMs; 653 for (ServerStreamingFuture future: responseFutures) { 654 try { 655 future.get(Math.max(0, endTimeInMs - System.currentTimeMillis()), 656 TimeUnit.MILLISECONDS); 657 } catch (Exception exception) { 658 success = false; 659 } 660 } 661 662 synchronized (mNanoappStreamingMessages) { 663 List<List<ResponseType>> responses = null; 664 if (success) { 665 responses = new ArrayList<List<ResponseType>>(); 666 for (ChreRpcClient rpcClient: rpcClientsFinal) { 667 List<MessageLite> messages = mNanoappStreamingMessages.get(rpcClient); 668 List<ResponseType> responseList = new ArrayList<ResponseType>(); 669 if (messages != null) { 670 // Only needed to cast the type. 671 for (MessageLite message: messages) { 672 responseList.add((ResponseType) message); 673 } 674 } 675 676 responses.add(responseList); 677 } 678 } 679 680 mNanoappStreamingMessages.clear(); 681 mActiveServerStreamingRpc = false; 682 return responses; 683 } 684 }); 685 return responseFuture; 686 } 687 688 /** 689 * Calls a server streaming RPC method with timeoutInMs milliseconds of timeout on 690 * multiple RPC clients. This returns a Future for the result. The responses will have the same 691 * size as the input rpcClients size. 692 * 693 * @param <RequestType> the type of the request (proto generated type). 694 * @param <ResponseType> the type of the response (proto generated type). 695 * @param rpcClients the RPC clients. 696 * @param method the fully-qualified method name. 697 * @param request the request. 698 * @param timeoutInMs the timeout in milliseconds. 699 * 700 * @return the Future for the response for null if there was an error. 701 */ 702 private <RequestType extends MessageLite, ResponseType extends MessageLite> callConcurrentServerStreamingRpcMethodAsync( @onNull List<ChreRpcClient> rpcClients, @NonNull String method, @NonNull RequestType request, long timeoutInMs)703 Future<List<List<ResponseType>>> callConcurrentServerStreamingRpcMethodAsync( 704 @NonNull List<ChreRpcClient> rpcClients, 705 @NonNull String method, 706 @NonNull RequestType request, 707 long timeoutInMs) throws Exception { 708 Objects.requireNonNull(rpcClients); 709 Objects.requireNonNull(method); 710 Objects.requireNonNull(request); 711 712 ArrayList<RequestType> requests = new ArrayList<RequestType>(); 713 for (int i = 0; i < rpcClients.size(); ++i) { 714 requests.add(request); 715 } 716 return callConcurrentServerStreamingRpcMethodAsync(rpcClients, method, 717 requests, timeoutInMs); 718 } 719 } 720