1 /*
2  * Copyright 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.camera.camera2.pipe.internal
18 
19 import androidx.annotation.GuardedBy
20 import androidx.camera.camera2.pipe.Frame
21 import androidx.camera.camera2.pipe.FrameCapture
22 import androidx.camera.camera2.pipe.OutputStatus
23 import androidx.camera.camera2.pipe.Request
24 import androidx.camera.camera2.pipe.config.CameraGraphScope
25 import androidx.camera.camera2.pipe.internal.OutputResult.Companion.completeWithFailure
26 import androidx.camera.camera2.pipe.internal.OutputResult.Companion.completeWithOutput
27 import androidx.camera.camera2.pipe.internal.OutputResult.Companion.outputOrNull
28 import androidx.camera.camera2.pipe.internal.OutputResult.Companion.outputStatus
29 import javax.inject.Inject
30 import kotlinx.atomicfu.atomic
31 import kotlinx.coroutines.CompletableDeferred
32 import kotlinx.coroutines.ExperimentalCoroutinesApi
33 
34 /**
35  * FrameCaptureQueue manages the list of requests that are expected to produce a [Frame] that needs
36  * to be returned when the [Frame] that matches the [Request] is started.
37  */
38 @CameraGraphScope
39 internal class FrameCaptureQueue @Inject constructor() : AutoCloseable {
40     private val lock = Any()
41 
42     @GuardedBy("lock") private val queue = ArrayDeque<FrameCaptureImpl>()
43 
44     @GuardedBy("lock") private var closed = false
45 
removenull46     fun remove(request: Request): FrameCaptureImpl? =
47         synchronized(lock) {
48             if (closed) return null
49 
50             // If an item matching this request exists, remove it from the queue and return it.
51             queue.firstOrNull { it.request == request }?.also { queue.remove(it) }
52         }
53 
54     /**
55      * Tell the [FrameDistributor] that a specific request will be submitted to the camera and
56      * create a placeholder that will be completed when that specific request starts exposing.
57      */
enqueuenull58     fun enqueue(request: Request): FrameCaptureImpl =
59         synchronized(lock) {
60             FrameCaptureImpl(request).also {
61                 if (!closed) {
62                     queue.add(it)
63                 } else {
64                     it.close()
65                 }
66             }
67         }
68 
69     /**
70      * Tell the [FrameDistributor] that a specific list of requests will be submitted to the camera
71      * and to create placeholders.
72      */
enqueuenull73     fun enqueue(requests: List<Request>): List<FrameCapture> =
74         synchronized(lock) {
75             requests
76                 .map { FrameCaptureImpl(it) }
77                 .also {
78                     if (!closed) {
79                         queue.addAll(it)
80                     } else {
81                         for (result in it) {
82                             result.close()
83                         }
84                     }
85                 }
86         }
87 
closenull88     override fun close() {
89         synchronized(lock) {
90             if (closed) return
91             closed = true
92         }
93 
94         // Note: This happens outside the synchronized block, but is safe since all modifications
95         // above happen within the synchronized block, and all modifications check the closed value
96         // before modifying the list.
97         for (pendingOutputFrame in queue) {
98 
99             // Any pending frame in the queue is guaranteed to not hold a real result.
100             pendingOutputFrame.completeWithFailure(OutputStatus.ERROR_OUTPUT_ABORTED)
101         }
102         queue.clear()
103     }
104 
105     @OptIn(ExperimentalCoroutinesApi::class)
106     internal inner class FrameCaptureImpl(override val request: Request) : FrameCapture {
107         private val closed = atomic(false)
108         private val result = CompletableDeferred<OutputResult<Frame>>()
109 
110         @GuardedBy("this")
111         private var frameListeners: MutableList<Frame.Listener>? = mutableListOf()
112 
113         /** Complete this [FrameCapture] with the provide [Frame]. */
completeWithnull114         fun completeWith(frame: Frame) {
115             if (!result.completeWithOutput(frame)) {
116                 // Close the frame if the result was non-null and we failed to complete this frame
117                 frame.close()
118             } else {
119                 val listeners: List<Frame.Listener>?
120                 synchronized(this) {
121                     listeners = frameListeners
122                     frameListeners = null
123                 }
124 
125                 if (listeners != null) {
126                     for (i in listeners.indices) {
127                         frame.addListener(listeners[i])
128                     }
129                 }
130             }
131         }
132 
133         /** Cancel this [FrameCapture] with a specific [OutputStatus]. */
completeWithFailurenull134         fun completeWithFailure(failureStatus: OutputStatus) {
135             if (result.completeWithFailure(failureStatus)) {
136                 val listeners: List<Frame.Listener>?
137                 synchronized(this) {
138                     listeners = frameListeners
139                     frameListeners = null
140                 }
141 
142                 // Ensure listeners always receive the onFrameCompleted event, since it will not be
143                 // attached to a real frame.
144                 if (listeners != null) {
145                     for (i in listeners.indices) {
146                         listeners[i].onFrameComplete()
147                     }
148                 }
149             }
150         }
151 
getFramenull152         override fun getFrame(): Frame? {
153             if (closed.value) return null
154             return result.outputOrNull()?.tryAcquire()
155         }
156 
157         override val status: OutputStatus
158             get() {
159                 if (closed.value) return OutputStatus.UNAVAILABLE
160                 return result.outputStatus()
161             }
162 
awaitFramenull163         override suspend fun awaitFrame(): Frame? {
164             if (closed.value) return null
165             return result.await().output?.tryAcquire()
166         }
167 
addListenernull168         override fun addListener(listener: Frame.Listener) {
169             val success = synchronized(this) { frameListeners?.add(listener) == true }
170             // If the list of listeners is null, then we've already completed this deferred output
171             // frame.
172             if (!success) {
173                 val frame = result.outputOrNull()
174                 if (frame != null) {
175                     frame.addListener(listener)
176                 } else {
177                     listener.onFrameComplete()
178                 }
179             }
180         }
181 
closenull182         override fun close() {
183             if (closed.compareAndSet(expect = false, update = true)) {
184                 completeWithFailure(OutputStatus.UNAVAILABLE)
185                 result.outputOrNull()?.close()
186 
187                 // We should close all of the object if we successfully remove it from the list.
188                 // Otherwise, this operation is a no-op.
189                 synchronized(lock) { queue.remove(this) }
190             }
191         }
192     }
193 }
194