1 /* <lambda>null2 * Copyright 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.camera.camera2.pipe.graph 18 19 import androidx.annotation.GuardedBy 20 import androidx.camera.camera2.pipe.CameraGraphId 21 import androidx.camera.camera2.pipe.Request 22 import androidx.camera.camera2.pipe.core.Log 23 import androidx.camera.camera2.pipe.core.ProcessingQueue 24 import androidx.camera.camera2.pipe.core.ProcessingQueue.Companion.processIn 25 import androidx.camera.camera2.pipe.putAllMetadata 26 import java.io.Closeable 27 import kotlinx.atomicfu.atomic 28 import kotlinx.coroutines.CoroutineDispatcher 29 import kotlinx.coroutines.CoroutineName 30 import kotlinx.coroutines.CoroutineScope 31 import kotlinx.coroutines.CoroutineStart 32 import kotlinx.coroutines.cancel 33 import kotlinx.coroutines.launch 34 35 /** 36 * GraphLoop is a thread-safe class that handles incoming state changes and requests and executes 37 * them, in order, on a dispatcher. In addition, this implementation handles several optimizations 38 * that enable requests to be deterministically skipped or aborted, and is responsible for the 39 * cleanup of pending requests during shutdown. 40 */ 41 internal class GraphLoop( 42 private val cameraGraphId: CameraGraphId, 43 private val defaultParameters: Map<*, Any?>, 44 private val requiredParameters: Map<*, Any?>, 45 private val graphListeners: List<Request.Listener>, 46 private val listeners: List<Listener>, 47 private val shutdownScope: CoroutineScope, 48 dispatcher: CoroutineDispatcher 49 ) : Closeable { 50 internal interface Listener { 51 52 fun onStopRepeating() 53 54 fun onGraphStopped() 55 56 fun onGraphShutdown() 57 } 58 59 private val graphLoopScope = CoroutineScope(dispatcher.plus(CoroutineName("CXCP-GraphLoop"))) 60 private val processingQueue = 61 ProcessingQueue(onUnprocessedElements = ::finalizeUnprocessedCommands, process = ::process) 62 .processIn(graphLoopScope) 63 64 private val lock = Any() 65 66 @Volatile private var closed = false 67 @GuardedBy("lock") private var _requestProcessor: GraphRequestProcessor? = null 68 @GuardedBy("lock") private var _repeatingRequest: Request? = null 69 @GuardedBy("lock") private var _graphParameters: Map<*, Any?> = emptyMap<Any, Any?>() 70 @GuardedBy("lock") private var _graph3AParameters: Map<*, Any?> = emptyMap<Any, Any?>() 71 72 var requestProcessor: GraphRequestProcessor? 73 get() = synchronized(lock) { _requestProcessor } 74 set(value) { 75 synchronized(lock) { 76 val previous = _requestProcessor 77 _requestProcessor = value 78 79 if (closed) { 80 _requestProcessor = null 81 if (value != null) { 82 shutdownScope.launch { value.shutdown() } 83 } 84 return 85 } 86 87 // Ignore duplicate calls to set with the same value. 88 if (previous === value) { 89 return@synchronized 90 } 91 processingQueue.tryEmit(GraphCommand.RequestProcessor(previous, value)) 92 } 93 94 if (value == null) { 95 for (i in listeners.indices) { 96 listeners[i].onGraphStopped() 97 } 98 } 99 } 100 101 var repeatingRequest: Request? 102 get() = synchronized(lock) { _repeatingRequest } 103 set(value) { 104 synchronized(lock) { 105 val previous = _repeatingRequest 106 _repeatingRequest = value 107 108 // Ignore duplicate calls to set null, this avoids multiple stopRepeating calls from 109 // being invoked. 110 if (previous == null && value == null) { 111 return@synchronized 112 } 113 114 if (value != null) { 115 processingQueue.tryEmit(GraphCommand.Repeat(value)) 116 } else { 117 // If the repeating request is set to null, stop repeating. 118 processingQueue.tryEmit(GraphCommand.Stop) 119 } 120 } 121 if (value == null) { 122 for (i in listeners.indices) { 123 listeners[i].onStopRepeating() 124 } 125 } 126 } 127 128 var graphParameters: Map<*, Any?> 129 get() = synchronized(lock) { _graphParameters } 130 set(value) = 131 synchronized(lock) { 132 _graphParameters = value 133 processingQueue.tryEmit(GraphCommand.Parameters(value, _graph3AParameters)) 134 } 135 136 var graph3AParameters: Map<*, Any?> 137 get() = synchronized(lock) { _graph3AParameters } 138 set(value) = 139 synchronized(lock) { 140 _graph3AParameters = value 141 processingQueue.tryEmit(GraphCommand.Parameters(_graphParameters, value)) 142 } 143 144 private val _captureProcessingEnabled = atomic(true) 145 var captureProcessingEnabled: Boolean 146 get() = _captureProcessingEnabled.value 147 set(value) { 148 _captureProcessingEnabled.value = value 149 if (value) { 150 invalidate() 151 } 152 } 153 154 fun submit(request: Request): Boolean = submit(listOf(request)) 155 156 fun submit(requests: List<Request>): Boolean { 157 if (!processingQueue.tryEmit(GraphCommand.Capture(requests))) { 158 abortRequests(requests) 159 return false 160 } 161 return true 162 } 163 164 fun trigger(parameters: Map<*, Any?>): Boolean { 165 check(repeatingRequest != null) { 166 "Cannot submit parameters without an active repeating request!" 167 } 168 return processingQueue.tryEmit(GraphCommand.Trigger(parameters)) 169 } 170 171 fun abort() { 172 processingQueue.tryEmit(GraphCommand.Abort) 173 } 174 175 fun invalidate() { 176 processingQueue.tryEmit(GraphCommand.Invalidate) 177 } 178 179 override fun close() { 180 synchronized(lock) { 181 if (closed) return 182 closed = true 183 184 _requestProcessor?.let { processor -> shutdownScope.launch { processor.shutdown() } } 185 _requestProcessor = null 186 187 // Shutdown Process - This will occur when the CameraGraph is closed: 188 // 1. Clear the _requestProcessor reference. This stops enqueued requests from being 189 // processed, since they use the current requestProcessor instance. 190 // 2. Emit a Shutdown call. This will clear or abort any previous requests and will 191 // close the request processor before cancelling the scope. 192 processingQueue.tryEmit(GraphCommand.Shutdown) 193 } 194 195 // Invoke shutdown listeners. There is a small chance that additional elements will be 196 // canceled or released after this point due to unprocessed elements in the queue. 197 for (i in listeners.indices) { 198 listeners[i].onGraphShutdown() 199 } 200 } 201 202 private var currentRepeatingRequest: Request? = null 203 private var currentGraphParameters: Map<*, Any?> = emptyMap<Any, Any?>() 204 private var currentGraph3AParameters: Map<*, Any?> = emptyMap<Any, Any?>() 205 private var currentRequiredParameters: Map<*, Any?> = requiredParameters 206 private var currentRequestProcessor: GraphRequestProcessor? = null 207 208 private suspend fun process(commands: MutableList<GraphCommand>) { 209 // The GraphLoop is responsible for bridging the core interactions with a camera so that 210 // ordering (and thus deterministic execution) is preserved, while also optimizing away 211 // unnecessary operations in real time. 212 // 213 // Unlike the consumer of a Flow, these optimizations require access to the full state of 214 // the command queue in order to evaluate what operations are redundant and may be safely 215 // skipped without altering the guarantees provided by the API surface. 216 // 217 // In general, this function must execute as fast as possible (But is allowed to suspend). 218 // after returning, the function may be re-invoked if: 219 // 220 // 1. The number of items `commands` is different, and non-zero 221 // 2. New items were added to the queue while process was executing. 222 // 223 // To keep things organized, commands are split into individual functions. 224 225 val idx = selectGraphCommand(commands) 226 227 // Process the selected command 228 when (val command = commands[idx]) { 229 GraphCommand.Invalidate -> commands.removeAt(idx) 230 GraphCommand.Shutdown -> processShutdown(commands) 231 GraphCommand.Abort -> processAbort(commands, idx) 232 GraphCommand.Stop -> processStop(commands, idx) 233 is GraphCommand.RequestProcessor -> processRequestProcessor(commands, idx, command) 234 is GraphCommand.Capture -> processCapture(commands, idx, command) 235 is GraphCommand.Trigger -> processTrigger(commands, idx, command) 236 is GraphCommand.Parameters -> processParameters(commands, idx, command) 237 is GraphCommand.Repeat -> processRepeat(commands, idx) 238 } 239 } 240 241 private fun selectGraphCommand(commands: MutableList<GraphCommand>): Int { 242 // This function will never be invoked with an empty command list. 243 if (commands.size == 1) return 0 244 245 // First, pick "interrupt commands". These are prioritized because they tend to remove other 246 // commands, or are guaranteed to be a NoOp (Invalidate). Because of this, pick the most 247 // recent interrupt command in the command list. 248 // 249 // RequestProcessor commands are special - The most recent one should always be selected, 250 // but it should be lower priority than Abort / Stop / Shutdown (and Invalidate). To avoid 251 // looping over it twice, track the first instance that is encountered, and return it if one 252 // is found and no other interrupt commands have been found. 253 var latestRequestProcessorCommand = -1 254 for (i in commands.indices.reversed()) { 255 when (commands[i]) { 256 GraphCommand.Abort, 257 GraphCommand.Invalidate, 258 GraphCommand.Stop, 259 GraphCommand.Shutdown -> { 260 return i 261 } 262 is GraphCommand.RequestProcessor -> { 263 if (latestRequestProcessorCommand < 0) { 264 latestRequestProcessorCommand = i 265 } 266 } 267 else -> continue 268 } 269 } 270 271 if (latestRequestProcessorCommand >= 0) { 272 return latestRequestProcessorCommand 273 } 274 275 // If there are no interrupt commands, prioritize commands that update parameters. 276 // 277 // However - we must maintain ordering to avoid skipping over trigger and capture commands. 278 // We can skip over StartRepeating calls, but not SubmitCapture or SubmitParameter calls. 279 // To do this, we iterate through the commands in order until we hit a non-Parameter or a 280 // non-Repeat command. We then return the most-recent parameter command to execute. 281 var latestParameterCommand = -1 282 for (i in commands.indices) { 283 when (commands[i]) { 284 is GraphCommand.Parameters -> latestParameterCommand = i 285 is GraphCommand.Repeat -> continue 286 else -> break 287 } 288 } 289 if (latestParameterCommand >= 0) { 290 return latestParameterCommand 291 } 292 293 // If the current repeating request is valid, and captureProcessing is enabled, prioritize 294 // capture and trigger commands. 295 if (currentRepeatingRequest != null && captureProcessingEnabled) { 296 // Pick the first Capture or Trigger command 297 for (i in commands.indices) { 298 when (commands[i]) { 299 is GraphCommand.Capture, 300 is GraphCommand.Trigger -> return i 301 else -> continue 302 } 303 } 304 } 305 306 // Pick the most recent Repeat command without skipping over Capture/Triggers 307 var latestRepeatingCommand = -1 308 for (i in commands.indices) { 309 when (commands[i]) { 310 is GraphCommand.Repeat -> latestRepeatingCommand = i 311 else -> break 312 } 313 } 314 if (latestRepeatingCommand >= 0) { 315 return latestRepeatingCommand 316 } 317 318 // Pick the next command in order. 319 return 0 320 } 321 322 private fun processCapture( 323 commands: MutableList<GraphCommand>, 324 idx: Int, 325 command: GraphCommand.Capture, 326 repeatAllowed: Boolean = true 327 ) { 328 if (captureProcessingEnabled) { 329 if (buildAndSubmit(isRepeating = false, requests = command.requests)) { 330 commands.removeAt(idx) 331 return 332 } 333 } 334 335 // If captureProcessing failed, or if we cannot currently issue captures, check to see if 336 // there are prior repeating requests that we should attempt. 337 if (repeatAllowed && idx > 0) { 338 val previousCommand = commands[idx - 1] 339 // The previous command, if it exists (idx > 0), must always be a Repeat command as 340 // other commands must always be prioritized. 341 check(previousCommand is GraphCommand.Repeat) 342 processRepeat(commands, idx - 1, captureAllowed = false) 343 } 344 } 345 346 private fun processTrigger( 347 commands: MutableList<GraphCommand>, 348 idx: Int, 349 command: GraphCommand.Trigger 350 ) { 351 // Trigger commands take an existing repeating request, add some one-time parameters to it, 352 // and the submit it exactly once. 353 val repeatingRequest = currentRepeatingRequest 354 if (repeatingRequest == null && idx == 0) { 355 commands.removeAt(idx) 356 return 357 } 358 359 // If capture processing is enabled, and there is a non-null repeating request, attempt to 360 // submit the trigger. 361 if (captureProcessingEnabled && repeatingRequest != null) { 362 if ( 363 buildAndSubmit( 364 isRepeating = false, 365 requests = listOf(repeatingRequest), 366 oneTimeRequiredParameters = command.triggerParameters 367 ) 368 ) { 369 commands.removeAt(idx) 370 return 371 } 372 } 373 374 // If processTrigger failed, or if we cannot currently issue captures, check to see if 375 // there are prior repeating requests that we should attempt. 376 if (idx > 0) { 377 val previousCommand = commands[idx - 1] 378 check(previousCommand is GraphCommand.Repeat) 379 processRepeat(commands, idx - 1, captureAllowed = false) 380 } 381 } 382 383 private fun processRepeat( 384 commands: MutableList<GraphCommand>, 385 idx: Int, 386 captureAllowed: Boolean = true 387 ) { 388 // Attempt to issue the repeating request at idx. 389 // 1. If that fails - move backwards through the list, attempting each repeating command in 390 // order. 391 // 2. If submitting a repeating request from the command queue fails, attempt to submit the 392 // next command, if it is a trigger or a capture. 393 for (i in idx downTo 0) { 394 val command = commands[i] 395 if ( 396 command is GraphCommand.Repeat && 397 buildAndSubmit(isRepeating = true, requests = listOf(command.request)) 398 ) { 399 currentRepeatingRequest = command.request 400 commands.removeAt(i) 401 commands.removeUpTo(i) { it is GraphCommand.Repeat } 402 return 403 } 404 } 405 406 // Repeating request failed, and there is a command in the queue after idx, and we are 407 // allowed to attempt capture (Capture can invoke processRepeat, and this avoids loops) 408 if (captureAllowed && idx + 1 < commands.size) { 409 val nextCommand = commands[idx + 1] 410 when (nextCommand) { 411 is GraphCommand.Capture -> 412 processCapture(commands, idx + 1, nextCommand, repeatAllowed = false) 413 is GraphCommand.Trigger -> processTrigger(commands, idx + 1, nextCommand) 414 else -> return 415 } 416 } 417 } 418 419 private fun processParameters( 420 commands: MutableList<GraphCommand>, 421 idx: Int, 422 command: GraphCommand.Parameters 423 ) { 424 currentGraphParameters = command.graphParameters 425 currentGraph3AParameters = command.graph3AParameters 426 currentRequiredParameters = 427 if (command.graph3AParameters.isEmpty()) { 428 requiredParameters 429 } else { 430 buildMap { 431 putAllMetadata(command.graph3AParameters) 432 putAllMetadata(requiredParameters) 433 } 434 } 435 436 commands.removeAt(idx) 437 commands.removeUpTo(idx) { it is GraphCommand.Parameters } 438 reissueRepeatingRequest() 439 } 440 441 private suspend fun processRequestProcessor( 442 commands: MutableList<GraphCommand>, 443 idx: Int, 444 command: GraphCommand.RequestProcessor 445 ) { 446 var commandsRemoved = 1 447 commands.removeAt(idx) 448 commands.removeUpTo(idx) { 449 when (it) { 450 is GraphCommand.RequestProcessor -> { 451 it.old?.shutdown() 452 it.new?.shutdown() 453 commandsRemoved++ 454 true 455 } 456 else -> false 457 } 458 } 459 460 command.old?.shutdown() 461 currentRequestProcessor = command.new 462 463 // If we have a previously submitted repeating request, attempt to resubmit it. If that 464 // fails, add it to the beginning of the queue. 465 if (!reissueRepeatingRequest()) { 466 currentRepeatingRequest?.let { 467 commands.add(0, GraphCommand.Repeat(it)) 468 469 // Edge case: The graphProcessor may not re-attempt unless the number of items in 470 // `commands` has changed. 471 if (commandsRemoved == 1) { 472 commands.add(GraphCommand.Invalidate) 473 } 474 } 475 currentRepeatingRequest = null 476 } 477 } 478 479 private fun processStop(commands: MutableList<GraphCommand>, idx: Int) { 480 // stopRepeating causes the current repeating request to stop, but does not affect capture 481 // commands. Invoke stopRepeating on the current RequestProcessor and clear the current 482 // repeating request. 483 currentRequestProcessor?.stopRepeating() 484 currentRepeatingRequest = null 485 486 // Remove all `Stop` and `Repeat` commands prior to the current stop command, since they 487 // are no longer relevant. 488 commands.removeAt(idx) 489 commands.removeUpTo(idx) { 490 when (it) { 491 GraphCommand.Stop, 492 is GraphCommand.Repeat -> true 493 else -> false 494 } 495 } 496 } 497 498 private fun processAbort(commands: MutableList<GraphCommand>, idx: Int) { 499 // abortCaptures affects both in-flight captures and the current repeating request. 500 // Invoke abortCaptures on the current RequestProcessor, and then clear the current 501 // repeating 502 // request, any pending Stop/Abort commands, and any pending Capture or Trigger commands. 503 currentRequestProcessor?.abortCaptures() 504 currentRepeatingRequest = null 505 506 commands.removeAt(idx) 507 commands.removeUpTo(idx) { 508 when (it) { 509 GraphCommand.Stop, 510 GraphCommand.Abort, 511 is GraphCommand.Repeat, 512 is GraphCommand.Trigger -> true 513 is GraphCommand.Capture -> { 514 // Make sure listeners for capture events are always triggered. 515 abortRequests(it.requests) 516 true 517 } 518 else -> false 519 } 520 } 521 } 522 523 private suspend fun processShutdown(commands: MutableList<GraphCommand>) { 524 currentRepeatingRequest = null 525 currentGraphParameters = emptyMap<Any, Any>() 526 currentGraph3AParameters = emptyMap<Any, Any>() 527 528 // Abort and remove all commands during shutdown. 529 for (idx in commands.indices) { 530 val command = commands[idx] 531 if (command is GraphCommand.Capture) { 532 // Make sure listeners for capture events are always triggered. 533 abortRequests(command.requests) 534 } 535 } 536 537 // Shutdown request processors (Current and pending) 538 currentRequestProcessor?.shutdown() 539 currentRequestProcessor = null 540 541 for (idx in commands.indices) { 542 val command = commands[idx] 543 if (command is GraphCommand.RequestProcessor) { 544 command.old?.shutdown() 545 command.new?.shutdown() 546 } 547 } 548 commands.clear() 549 550 // Cancel the scope. This will trigger the onUnprocessedItems callback after this returns 551 // and hits the next suspension point. 552 graphLoopScope.cancel() 553 } 554 555 /** Attempt to re-issue a previously submitted repeating request, likely with new parameters */ 556 private fun reissueRepeatingRequest(): Boolean = 557 currentRequestProcessor?.let { processor -> 558 currentRepeatingRequest?.let { request -> 559 processor.submit( 560 isRepeating = true, 561 requests = listOf(request), 562 defaultParameters = defaultParameters, 563 graphParameters = currentGraphParameters, 564 requiredParameters = currentRequiredParameters, 565 listeners = graphListeners, 566 ) 567 } 568 } == true 569 570 /** 571 * Invoke the onAborted listener for each request, prioritizing internal listeners over the 572 * request-specific listeners. 573 */ 574 private fun abortRequests(requests: List<Request>) { 575 // Internal listeners 576 for (rIdx in requests.indices) { 577 val request = requests[rIdx] 578 for (listenerIdx in graphListeners.indices) { 579 graphListeners[listenerIdx].onAborted(request) 580 } 581 } 582 583 // Request-specific listeners 584 for (rIdx in requests.indices) { 585 val request = requests[rIdx] 586 for (listenerIdx in request.listeners.indices) { 587 request.listeners[listenerIdx].onAborted(request) 588 } 589 } 590 } 591 592 private fun finalizeUnprocessedCommands(unprocessedCommands: List<GraphCommand>) { 593 // When the graph loop is shutdown it is possible that additional elements may have been 594 // added to the queue. To avoid leaking resources, ensure that capture commands are aborted 595 // and that requestProcessor commands shutdown the associated request processor(s). 596 for (command in unprocessedCommands) { 597 when (command) { 598 // Make sure listeners for capture events are always triggered. 599 is GraphCommand.Capture -> abortRequests(command.requests) 600 is GraphCommand.RequestProcessor -> { 601 shutdownScope.launch(start = CoroutineStart.UNDISPATCHED) { 602 command.old?.shutdown() 603 command.new?.shutdown() 604 } 605 } 606 else -> continue 607 } 608 } 609 } 610 611 private fun buildAndSubmit( 612 isRepeating: Boolean, 613 requests: List<Request>, 614 oneTimeRequiredParameters: Map<*, Any?> = emptyMap<Any, Any>() 615 ): Boolean { 616 val processor = currentRequestProcessor 617 if (processor == null) return false 618 619 val success = 620 processor.submit( 621 isRepeating = isRepeating, 622 requests = requests, 623 defaultParameters = defaultParameters, 624 graphParameters = currentGraphParameters, 625 requiredParameters = 626 if (oneTimeRequiredParameters.isEmpty()) { 627 currentRequiredParameters 628 } else { 629 buildMap<Any, Any?> { 630 this.putAllMetadata(currentGraph3AParameters) 631 this.putAllMetadata(oneTimeRequiredParameters) 632 this.putAllMetadata(requiredParameters) 633 } 634 }, 635 listeners = graphListeners 636 ) 637 638 if (!success) { 639 if (isRepeating) { 640 Log.warn { "Failed to repeat with ${requests.single()}" } 641 } else { 642 if (oneTimeRequiredParameters.isEmpty()) { 643 Log.warn { "Failed to submit capture with $requests" } 644 } else { 645 Log.warn { 646 "Failed to trigger with ${requests.single()} and $oneTimeRequiredParameters" 647 } 648 } 649 } 650 } 651 652 return success 653 } 654 655 override fun toString(): String = "GraphLoop($cameraGraphId)" 656 657 companion object { 658 /** 659 * Utility function to remove items by index from a `MutableList` up-to (but not including) 660 * the provided index without creating an iterator. 661 * 662 * For example, in a list of [1, 2, 3, 4, 5], calling removeUpTo(3) { it % 2 = 1 } will test 663 * [1, 2, 3], and remove "1" and "3", modifying the list to have [2, 4, 5] 664 */ 665 private inline fun <T> MutableList<T>.removeUpTo(idx: Int, predicate: (T) -> Boolean) { 666 var a = 0 667 var b = idx 668 while (a < b) { 669 if (predicate(this[a])) { 670 this.removeAt(a) 671 b-- // Reduce upper bound 672 } else { 673 a++ // Advance lower bound 674 } 675 } 676 } 677 } 678 } 679 680 internal sealed interface GraphCommand { 681 object Invalidate : GraphCommand 682 683 object Shutdown : GraphCommand 684 685 object Stop : GraphCommand 686 687 object Abort : GraphCommand 688 689 class RequestProcessor(val old: GraphRequestProcessor?, val new: GraphRequestProcessor?) : 690 GraphCommand 691 692 class Parameters(val graphParameters: Map<*, Any?>, val graph3AParameters: Map<*, Any?>) : 693 GraphCommand 694 695 class Repeat(val request: Request) : GraphCommand 696 697 class Capture(val requests: List<Request>) : GraphCommand 698 699 class Trigger(val triggerParameters: Map<*, Any?>) : GraphCommand 700 } 701