1 // Copyright 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "PixelStreamManager.h"
15
16 #include <android-base/logging.h>
17 #include <vndk/hardware_buffer.h>
18
19 #include <algorithm>
20 #include <mutex>
21 #include <thread>
22
23 #include "PixelFormatUtils.h"
24
25 namespace android {
26 namespace automotive {
27 namespace computepipe {
28 namespace runner {
29 namespace stream_manager {
30
PixelMemHandle(int bufferId,int streamId,int additionalUsageFlags)31 PixelMemHandle::PixelMemHandle(int bufferId, int streamId, int additionalUsageFlags)
32 : mBufferId(bufferId),
33 mStreamId(streamId),
34 mBuffer(nullptr),
35 mUsage(AHARDWAREBUFFER_USAGE_CPU_WRITE_OFTEN | additionalUsageFlags) {
36 }
37
~PixelMemHandle()38 PixelMemHandle::~PixelMemHandle() {
39 if (mBuffer) {
40 AHardwareBuffer_release(mBuffer);
41 }
42 }
43
getStreamId() const44 int PixelMemHandle::getStreamId() const {
45 return mStreamId;
46 }
47
getType() const48 proto::PacketType PixelMemHandle::getType() const {
49 return proto::PacketType::PIXEL_DATA;
50 }
getTimeStamp() const51 uint64_t PixelMemHandle::getTimeStamp() const {
52 return mTimestamp;
53 }
54
getSize() const55 uint32_t PixelMemHandle::getSize() const {
56 return 0;
57 }
58
getData() const59 const char* PixelMemHandle::getData() const {
60 return nullptr;
61 }
62
getHardwareBuffer() const63 AHardwareBuffer* PixelMemHandle::getHardwareBuffer() const {
64 return mBuffer;
65 }
66
67 /* Sets frame info */
setFrameData(uint64_t timestamp,const InputFrame & inputFrame)68 Status PixelMemHandle::setFrameData(uint64_t timestamp, const InputFrame& inputFrame) {
69 // Allocate a new buffer if it is currently null.
70 FrameInfo frameInfo = inputFrame.getFrameInfo();
71 if (mBuffer == nullptr) {
72 mDesc.format = PixelFormatToHardwareBufferFormat(frameInfo.format);
73 mDesc.height = frameInfo.height;
74 mDesc.width = frameInfo.width;
75 mDesc.layers = 1;
76 mDesc.rfu0 = 0;
77 mDesc.rfu1 = 0;
78 mDesc.stride = frameInfo.stride;
79 mDesc.usage = mUsage;
80 int err = AHardwareBuffer_allocate(&mDesc, &mBuffer);
81
82 if (err != 0 || mBuffer == nullptr) {
83 LOG(ERROR) << "Failed to allocate hardware buffer with error " << err;
84 return Status::NO_MEMORY;
85 }
86
87 // Update mDesc with the actual descriptor with which the buffer was created. The actual
88 // stride could be different from the specified stride.
89 AHardwareBuffer_describe(mBuffer, &mDesc);
90 }
91
92 // Verifies that the input frame data has the same type as the allocated buffer.
93 if (frameInfo.width != mDesc.width || frameInfo.height != mDesc.height ||
94 PixelFormatToHardwareBufferFormat(frameInfo.format) != mDesc.format) {
95 LOG(ERROR) << "Variable image sizes from the same stream id is not supported.";
96 return Status::INVALID_ARGUMENT;
97 }
98
99 // Locks the frame for copying the input frame data.
100 void* mappedBuffer = nullptr;
101 int err = AHardwareBuffer_lock(mBuffer, AHARDWAREBUFFER_USAGE_CPU_WRITE_OFTEN, -1, nullptr,
102 &mappedBuffer);
103 if (err != 0 || mappedBuffer == nullptr) {
104 LOG(ERROR) << "Unable to lock a realased hardware buffer.";
105 return Status::INTERNAL_ERROR;
106 }
107
108 // Copies the input frame data.
109 int bytesPerPixel = numBytesPerPixel(static_cast<AHardwareBuffer_Format>(mDesc.format));
110 // The stride for hardware buffer is specified in pixels while the stride
111 // for InputFrame data structure is specified in bytes.
112 if (mDesc.stride * bytesPerPixel == frameInfo.stride) {
113 memcpy(mappedBuffer, inputFrame.getFramePtr(), mDesc.stride * mDesc.height * bytesPerPixel);
114 } else {
115 for (int y = 0; y < frameInfo.height; y++) {
116 memcpy((uint8_t*)mappedBuffer + mDesc.stride * y * bytesPerPixel,
117 inputFrame.getFramePtr() + y * frameInfo.stride,
118 std::min(frameInfo.stride, mDesc.stride * bytesPerPixel));
119 }
120 }
121
122 AHardwareBuffer_unlock(mBuffer, nullptr);
123 mTimestamp = timestamp;
124
125 return Status::SUCCESS;
126 }
127
getBufferId() const128 int PixelMemHandle::getBufferId() const {
129 return mBufferId;
130 }
131
setEngineInterface(std::shared_ptr<StreamEngineInterface> engine)132 void PixelStreamManager::setEngineInterface(std::shared_ptr<StreamEngineInterface> engine) {
133 std::lock_guard lock(mLock);
134 mEngine = engine;
135 }
136
setMaxInFlightPackets(uint32_t maxPackets)137 Status PixelStreamManager::setMaxInFlightPackets(uint32_t maxPackets) {
138 std::lock_guard lock(mLock);
139 if (mBuffersInUse.size() > maxPackets) {
140 LOG(ERROR) << "Cannot set max in flight packets after graph has already started.";
141 return Status::ILLEGAL_STATE;
142 }
143
144 mMaxInFlightPackets = maxPackets;
145 std::lock_guard stateLock(mStateLock);
146 mState = CONFIG_DONE;
147 return Status::SUCCESS;
148 }
149
freePacket(int bufferId)150 Status PixelStreamManager::freePacket(int bufferId) {
151 std::lock_guard lock(mLock);
152
153 auto it = mBuffersInUse.find(bufferId);
154
155 if (it == mBuffersInUse.end()) {
156 std::lock_guard stateLock(mStateLock);
157 // If the graph has already been stopped, we free the buffers
158 // asynchronously, so return SUCCESS if freePacket is called later.
159 if (mState == STOPPED) {
160 return Status::SUCCESS;
161 }
162
163 LOG(ERROR) << "Unable to find the mem handle. Duplicate release may possible have been "
164 "called";
165 return Status::INVALID_ARGUMENT;
166 }
167
168 it->second.outstandingRefCount -= 1;
169 if (it->second.outstandingRefCount == 0) {
170 mBuffersReady.push_back(it->second.handle);
171 mBuffersInUse.erase(it);
172 }
173 return Status::SUCCESS;
174 }
175
freeAllPackets()176 void PixelStreamManager::freeAllPackets() {
177 std::lock_guard lock(mLock);
178
179 for (auto [bufferId, buffer] : mBuffersInUse) {
180 mBuffersReady.push_back(buffer.handle);
181 }
182 mBuffersInUse.clear();
183 }
184
queuePacket(const char *,const uint32_t,uint64_t)185 Status PixelStreamManager::queuePacket(const char* /*data*/, const uint32_t /*size*/,
186 uint64_t /*timestamp*/) {
187 LOG(ERROR) << "Trying to queue a semantic packet to a pixel stream manager";
188 return Status::ILLEGAL_STATE;
189 }
190
queuePacket(const InputFrame & frame,uint64_t timestamp)191 Status PixelStreamManager::queuePacket(const InputFrame& frame, uint64_t timestamp) {
192 std::lock_guard lock(mLock);
193
194 // State has to be running for the callback to go back.
195 {
196 std::lock_guard stateLock(mStateLock);
197 if (mState != RUNNING) {
198 LOG(ERROR) << "Packet cannot be queued when state is not RUNNING. Current state is"
199 << mState;
200 return Status::ILLEGAL_STATE;
201 }
202 }
203
204 if (mEngine == nullptr) {
205 LOG(ERROR) << "Stream to engine interface is not set";
206 return Status::ILLEGAL_STATE;
207 }
208
209 if (mBuffersInUse.size() >= mMaxInFlightPackets) {
210 LOG(INFO) << "Too many frames in flight. Skipping frame at timestamp " << timestamp;
211 return Status::SUCCESS;
212 }
213
214 // A unique id per buffer is maintained by incrementing the unique id from the previously
215 // created buffer. The unique id is therefore the number of buffers already created.
216 if (mBuffersReady.empty()) {
217 mBuffersReady.push_back(std::make_shared<PixelMemHandle>(mBuffersInUse.size(), mStreamId));
218 }
219
220 // The previously used buffer is pushed to the back of the vector. Picking the last used buffer
221 // may be more cache efficient if accessing through CPU, so we use that strategy here.
222 std::shared_ptr<PixelMemHandle> memHandle = mBuffersReady[mBuffersReady.size() - 1];
223 mBuffersReady.resize(mBuffersReady.size() - 1);
224
225 BufferMetadata bufferMetadata;
226 bufferMetadata.outstandingRefCount = 1;
227 bufferMetadata.handle = memHandle;
228
229 mBuffersInUse.emplace(memHandle->getBufferId(), bufferMetadata);
230
231 Status status = memHandle->setFrameData(timestamp, frame);
232 if (status != Status::SUCCESS) {
233 LOG(ERROR) << "Setting frame data failed with error code " << status;
234 return status;
235 }
236
237 // Dispatch packet to the engine asynchronously in order to avoid circularly
238 // waiting for each others' locks.
239 std::thread t([this, memHandle]() {
240 Status status = mEngine->dispatchPacket(memHandle);
241 if (status != Status::SUCCESS) {
242 mEngine->notifyError(std::string(__func__) + ":" + std::to_string(__LINE__) +
243 " Failed to dispatch packet");
244 }
245 });
246 t.detach();
247 return Status::SUCCESS;
248 }
249
handleExecutionPhase(const RunnerEvent & e)250 Status PixelStreamManager::handleExecutionPhase(const RunnerEvent& e) {
251 std::lock_guard<std::mutex> lock(mStateLock);
252 if (mState == CONFIG_DONE && e.isPhaseEntry()) {
253 mState = RUNNING;
254 return Status::SUCCESS;
255 }
256 if (mState == RESET) {
257 // Cannot get to running phase from reset state without config phase
258 return Status::ILLEGAL_STATE;
259 }
260 if (mState == RUNNING && e.isAborted()) {
261 // Transition back to config completed
262 mState = CONFIG_DONE;
263 return Status::SUCCESS;
264 }
265 if (mState == RUNNING) {
266 return Status::ILLEGAL_STATE;
267 }
268 return Status::SUCCESS;
269 }
270
handleStopWithFlushPhase(const RunnerEvent & e)271 Status PixelStreamManager::handleStopWithFlushPhase(const RunnerEvent& e) {
272 return handleStopImmediatePhase(e);
273 }
274
handleStopImmediatePhase(const RunnerEvent & e)275 Status PixelStreamManager::handleStopImmediatePhase(const RunnerEvent& e) {
276 std::lock_guard<std::mutex> lock(mStateLock);
277 if (mState == CONFIG_DONE || mState == RESET) {
278 return ILLEGAL_STATE;
279 }
280 /* Cannot have stop completed if we never entered stop state */
281 if (mState == RUNNING && (e.isAborted() || e.isTransitionComplete())) {
282 return ILLEGAL_STATE;
283 }
284 /* We are being asked to stop */
285 if (mState == RUNNING && e.isPhaseEntry()) {
286 mState = STOPPED;
287 std::thread t([this]() {
288 freeAllPackets();
289 mEngine->notifyEndOfStream();
290 });
291 t.detach();
292 return SUCCESS;
293 }
294 /* Other Components have stopped, we can transition back to CONFIG_DONE */
295 if (mState == STOPPED && e.isTransitionComplete()) {
296 mState = CONFIG_DONE;
297 return SUCCESS;
298 }
299 /* We were stopped, but stop was aborted. */
300 if (mState == STOPPED && e.isAborted()) {
301 mState = RUNNING;
302 return SUCCESS;
303 }
304 return SUCCESS;
305 }
306
clonePacket(std::shared_ptr<MemHandle> handle)307 std::shared_ptr<MemHandle> PixelStreamManager::clonePacket(std::shared_ptr<MemHandle> handle) {
308 if (!handle) {
309 LOG(ERROR) << "PixelStreamManager - Received null memhandle.";
310 return nullptr;
311 }
312
313 std::lock_guard<std::mutex> lock(mLock);
314 int bufferId = handle->getBufferId();
315 auto it = mBuffersInUse.find(bufferId);
316 if (it == mBuffersInUse.end()) {
317 LOG(ERROR) << "PixelStreamManager - Attempting to clone an already freed packet.";
318 return nullptr;
319 }
320 it->second.outstandingRefCount += 1;
321 return handle;
322 }
323
PixelStreamManager(std::string name,int streamId)324 PixelStreamManager::PixelStreamManager(std::string name, int streamId)
325 : StreamManager(name, proto::PacketType::PIXEL_DATA), mStreamId(streamId) {
326 }
327
328 } // namespace stream_manager
329 } // namespace runner
330 } // namespace computepipe
331 } // namespace automotive
332 } // namespace android
333