• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <inttypes.h>
18 
19 #define LOG_TAG "StreamSplitter"
20 #define ATRACE_TAG ATRACE_TAG_GRAPHICS
21 //#define LOG_NDEBUG 0
22 
23 #include <gui/IGraphicBufferConsumer.h>
24 #include <gui/IGraphicBufferProducer.h>
25 #include <gui/StreamSplitter.h>
26 
27 #include <ui/GraphicBuffer.h>
28 
29 #include <binder/ProcessState.h>
30 
31 #include <utils/Trace.h>
32 
33 namespace android {
34 
createSplitter(const sp<IGraphicBufferConsumer> & inputQueue,sp<StreamSplitter> * outSplitter)35 status_t StreamSplitter::createSplitter(
36         const sp<IGraphicBufferConsumer>& inputQueue,
37         sp<StreamSplitter>* outSplitter) {
38     if (inputQueue == NULL) {
39         ALOGE("createSplitter: inputQueue must not be NULL");
40         return BAD_VALUE;
41     }
42     if (outSplitter == NULL) {
43         ALOGE("createSplitter: outSplitter must not be NULL");
44         return BAD_VALUE;
45     }
46 
47     sp<StreamSplitter> splitter(new StreamSplitter(inputQueue));
48     status_t status = splitter->mInput->consumerConnect(splitter, false);
49     if (status == NO_ERROR) {
50         splitter->mInput->setConsumerName(String8("StreamSplitter"));
51         *outSplitter = splitter;
52     }
53     return status;
54 }
55 
StreamSplitter(const sp<IGraphicBufferConsumer> & inputQueue)56 StreamSplitter::StreamSplitter(const sp<IGraphicBufferConsumer>& inputQueue)
57       : mIsAbandoned(false), mMutex(), mReleaseCondition(),
58         mOutstandingBuffers(0), mInput(inputQueue), mOutputs(), mBuffers() {}
59 
~StreamSplitter()60 StreamSplitter::~StreamSplitter() {
61     mInput->consumerDisconnect();
62     Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
63     for (; output != mOutputs.end(); ++output) {
64         (*output)->disconnect(NATIVE_WINDOW_API_CPU);
65     }
66 
67     if (mBuffers.size() > 0) {
68         ALOGE("%zu buffers still being tracked", mBuffers.size());
69     }
70 }
71 
addOutput(const sp<IGraphicBufferProducer> & outputQueue)72 status_t StreamSplitter::addOutput(
73         const sp<IGraphicBufferProducer>& outputQueue) {
74     if (outputQueue == NULL) {
75         ALOGE("addOutput: outputQueue must not be NULL");
76         return BAD_VALUE;
77     }
78 
79     Mutex::Autolock lock(mMutex);
80 
81     IGraphicBufferProducer::QueueBufferOutput queueBufferOutput;
82     sp<OutputListener> listener(new OutputListener(this, outputQueue));
83     outputQueue->asBinder()->linkToDeath(listener);
84     status_t status = outputQueue->connect(listener, NATIVE_WINDOW_API_CPU,
85             /* producerControlledByApp */ false, &queueBufferOutput);
86     if (status != NO_ERROR) {
87         ALOGE("addOutput: failed to connect (%d)", status);
88         return status;
89     }
90 
91     mOutputs.push_back(outputQueue);
92 
93     return NO_ERROR;
94 }
95 
setName(const String8 & name)96 void StreamSplitter::setName(const String8 &name) {
97     Mutex::Autolock lock(mMutex);
98     mInput->setConsumerName(name);
99 }
100 
onFrameAvailable(const BufferItem &)101 void StreamSplitter::onFrameAvailable(const BufferItem& /* item */) {
102     ATRACE_CALL();
103     Mutex::Autolock lock(mMutex);
104 
105     // The current policy is that if any one consumer is consuming buffers too
106     // slowly, the splitter will stall the rest of the outputs by not acquiring
107     // any more buffers from the input. This will cause back pressure on the
108     // input queue, slowing down its producer.
109 
110     // If there are too many outstanding buffers, we block until a buffer is
111     // released back to the input in onBufferReleased
112     while (mOutstandingBuffers >= MAX_OUTSTANDING_BUFFERS) {
113         mReleaseCondition.wait(mMutex);
114 
115         // If the splitter is abandoned while we are waiting, the release
116         // condition variable will be broadcast, and we should just return
117         // without attempting to do anything more (since the input queue will
118         // also be abandoned).
119         if (mIsAbandoned) {
120             return;
121         }
122     }
123     ++mOutstandingBuffers;
124 
125     // Acquire and detach the buffer from the input
126     IGraphicBufferConsumer::BufferItem bufferItem;
127     status_t status = mInput->acquireBuffer(&bufferItem, /* presentWhen */ 0);
128     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
129             "acquiring buffer from input failed (%d)", status);
130 
131     ALOGV("acquired buffer %#" PRIx64 " from input",
132             bufferItem.mGraphicBuffer->getId());
133 
134     status = mInput->detachBuffer(bufferItem.mBuf);
135     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
136             "detaching buffer from input failed (%d)", status);
137 
138     // Initialize our reference count for this buffer
139     mBuffers.add(bufferItem.mGraphicBuffer->getId(),
140             new BufferTracker(bufferItem.mGraphicBuffer));
141 
142     IGraphicBufferProducer::QueueBufferInput queueInput(
143             bufferItem.mTimestamp, bufferItem.mIsAutoTimestamp,
144             bufferItem.mCrop, bufferItem.mScalingMode,
145             bufferItem.mTransform, bufferItem.mIsDroppable,
146             bufferItem.mFence);
147 
148     // Attach and queue the buffer to each of the outputs
149     Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
150     for (; output != mOutputs.end(); ++output) {
151         int slot;
152         status = (*output)->attachBuffer(&slot, bufferItem.mGraphicBuffer);
153         if (status == NO_INIT) {
154             // If we just discovered that this output has been abandoned, note
155             // that, increment the release count so that we still release this
156             // buffer eventually, and move on to the next output
157             onAbandonedLocked();
158             mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
159                     incrementReleaseCountLocked();
160             continue;
161         } else {
162             LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
163                     "attaching buffer to output failed (%d)", status);
164         }
165 
166         IGraphicBufferProducer::QueueBufferOutput queueOutput;
167         status = (*output)->queueBuffer(slot, queueInput, &queueOutput);
168         if (status == NO_INIT) {
169             // If we just discovered that this output has been abandoned, note
170             // that, increment the release count so that we still release this
171             // buffer eventually, and move on to the next output
172             onAbandonedLocked();
173             mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
174                     incrementReleaseCountLocked();
175             continue;
176         } else {
177             LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
178                     "queueing buffer to output failed (%d)", status);
179         }
180 
181         ALOGV("queued buffer %#" PRIx64 " to output %p",
182                 bufferItem.mGraphicBuffer->getId(), output->get());
183     }
184 }
185 
onBufferReleasedByOutput(const sp<IGraphicBufferProducer> & from)186 void StreamSplitter::onBufferReleasedByOutput(
187         const sp<IGraphicBufferProducer>& from) {
188     ATRACE_CALL();
189     Mutex::Autolock lock(mMutex);
190 
191     sp<GraphicBuffer> buffer;
192     sp<Fence> fence;
193     status_t status = from->detachNextBuffer(&buffer, &fence);
194     if (status == NO_INIT) {
195         // If we just discovered that this output has been abandoned, note that,
196         // but we can't do anything else, since buffer is invalid
197         onAbandonedLocked();
198         return;
199     } else {
200         LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
201                 "detaching buffer from output failed (%d)", status);
202     }
203 
204     ALOGV("detached buffer %#" PRIx64 " from output %p",
205           buffer->getId(), from.get());
206 
207     const sp<BufferTracker>& tracker = mBuffers.editValueFor(buffer->getId());
208 
209     // Merge the release fence of the incoming buffer so that the fence we send
210     // back to the input includes all of the outputs' fences
211     tracker->mergeFence(fence);
212 
213     // Check to see if this is the last outstanding reference to this buffer
214     size_t releaseCount = tracker->incrementReleaseCountLocked();
215     ALOGV("buffer %#" PRIx64 " reference count %zu (of %zu)", buffer->getId(),
216             releaseCount, mOutputs.size());
217     if (releaseCount < mOutputs.size()) {
218         return;
219     }
220 
221     // If we've been abandoned, we can't return the buffer to the input, so just
222     // stop tracking it and move on
223     if (mIsAbandoned) {
224         mBuffers.removeItem(buffer->getId());
225         return;
226     }
227 
228     // Attach and release the buffer back to the input
229     int consumerSlot;
230     status = mInput->attachBuffer(&consumerSlot, tracker->getBuffer());
231     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
232             "attaching buffer to input failed (%d)", status);
233 
234     status = mInput->releaseBuffer(consumerSlot, /* frameNumber */ 0,
235             EGL_NO_DISPLAY, EGL_NO_SYNC_KHR, tracker->getMergedFence());
236     LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
237             "releasing buffer to input failed (%d)", status);
238 
239     ALOGV("released buffer %#" PRIx64 " to input", buffer->getId());
240 
241     // We no longer need to track the buffer once it has been returned to the
242     // input
243     mBuffers.removeItem(buffer->getId());
244 
245     // Notify any waiting onFrameAvailable calls
246     --mOutstandingBuffers;
247     mReleaseCondition.signal();
248 }
249 
onAbandonedLocked()250 void StreamSplitter::onAbandonedLocked() {
251     ALOGE("one of my outputs has abandoned me");
252     if (!mIsAbandoned) {
253         mInput->consumerDisconnect();
254     }
255     mIsAbandoned = true;
256     mReleaseCondition.broadcast();
257 }
258 
OutputListener(const sp<StreamSplitter> & splitter,const sp<IGraphicBufferProducer> & output)259 StreamSplitter::OutputListener::OutputListener(
260         const sp<StreamSplitter>& splitter,
261         const sp<IGraphicBufferProducer>& output)
262       : mSplitter(splitter), mOutput(output) {}
263 
~OutputListener()264 StreamSplitter::OutputListener::~OutputListener() {}
265 
onBufferReleased()266 void StreamSplitter::OutputListener::onBufferReleased() {
267     mSplitter->onBufferReleasedByOutput(mOutput);
268 }
269 
binderDied(const wp<IBinder> &)270 void StreamSplitter::OutputListener::binderDied(const wp<IBinder>& /* who */) {
271     Mutex::Autolock lock(mSplitter->mMutex);
272     mSplitter->onAbandonedLocked();
273 }
274 
BufferTracker(const sp<GraphicBuffer> & buffer)275 StreamSplitter::BufferTracker::BufferTracker(const sp<GraphicBuffer>& buffer)
276       : mBuffer(buffer), mMergedFence(Fence::NO_FENCE), mReleaseCount(0) {}
277 
~BufferTracker()278 StreamSplitter::BufferTracker::~BufferTracker() {}
279 
mergeFence(const sp<Fence> & with)280 void StreamSplitter::BufferTracker::mergeFence(const sp<Fence>& with) {
281     mMergedFence = Fence::merge(String8("StreamSplitter"), mMergedFence, with);
282 }
283 
284 } // namespace android
285