1 //
2 // Copyright © 2019 Arm Ltd. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "BufferManager.hpp"
7 #include "PacketBuffer.hpp"
8
9 namespace armnn
10 {
11
12 namespace profiling
13 {
14
BufferManager(unsigned int numberOfBuffers,unsigned int maxPacketSize)15 BufferManager::BufferManager(unsigned int numberOfBuffers, unsigned int maxPacketSize)
16 : m_MaxBufferSize(maxPacketSize),
17 m_NumberOfBuffers(numberOfBuffers),
18 m_MaxNumberOfBuffers(numberOfBuffers * 3),
19 m_CurrentNumberOfBuffers(numberOfBuffers)
20 {
21 Initialize();
22 }
23
Reserve(unsigned int requestedSize,unsigned int & reservedSize)24 IPacketBufferPtr BufferManager::Reserve(unsigned int requestedSize, unsigned int& reservedSize)
25 {
26 reservedSize = 0;
27 std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock);
28 if (requestedSize > m_MaxBufferSize)
29 {
30 return nullptr;
31 }
32 availableListLock.lock();
33 if (m_AvailableList.empty())
34 {
35 if (m_CurrentNumberOfBuffers < m_MaxNumberOfBuffers)
36 {
37 // create a temporary overflow/surge buffer and hand it back
38 m_CurrentNumberOfBuffers++;
39 availableListLock.unlock();
40 IPacketBufferPtr buffer = std::make_unique<PacketBuffer>(m_MaxBufferSize);
41 reservedSize = requestedSize;
42 return buffer;
43 }
44 else
45 {
46 // we have totally busted the limit. call a halt to new memory allocations.
47 availableListLock.unlock();
48 return nullptr;
49 }
50 }
51 IPacketBufferPtr buffer = std::move(m_AvailableList.back());
52 m_AvailableList.pop_back();
53 availableListLock.unlock();
54 reservedSize = requestedSize;
55 return buffer;
56 }
57
Commit(IPacketBufferPtr & packetBuffer,unsigned int size,bool notifyConsumer)58 void BufferManager::Commit(IPacketBufferPtr& packetBuffer, unsigned int size, bool notifyConsumer)
59 {
60 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex, std::defer_lock);
61 packetBuffer->Commit(size);
62 readableListLock.lock();
63 m_ReadableList.push(std::move(packetBuffer));
64 readableListLock.unlock();
65
66 if (notifyConsumer)
67 {
68 FlushReadList();
69 }
70 }
71
Initialize()72 void BufferManager::Initialize()
73 {
74 m_AvailableList.reserve(m_NumberOfBuffers);
75 m_CurrentNumberOfBuffers = m_NumberOfBuffers;
76 for (unsigned int i = 0; i < m_NumberOfBuffers; ++i)
77 {
78 IPacketBufferPtr buffer = std::make_unique<PacketBuffer>(m_MaxBufferSize);
79 m_AvailableList.emplace_back(std::move(buffer));
80 }
81 }
82
Release(IPacketBufferPtr & packetBuffer)83 void BufferManager::Release(IPacketBufferPtr& packetBuffer)
84 {
85 std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock);
86 packetBuffer->Release();
87 availableListLock.lock();
88 if (m_AvailableList.size() <= m_NumberOfBuffers)
89 {
90 m_AvailableList.push_back(std::move(packetBuffer));
91 }
92 else
93 {
94 // we have been handed a temporary overflow/surge buffer get rid of it
95 packetBuffer->Destroy();
96 if (m_CurrentNumberOfBuffers > m_NumberOfBuffers)
97 {
98 --m_CurrentNumberOfBuffers;
99 }
100 }
101 availableListLock.unlock();
102 }
103
Reset()104 void BufferManager::Reset()
105 {
106 //This method should only be called once all threads have been joined
107 std::lock_guard<std::mutex> readableListLock(m_ReadableMutex);
108 std::lock_guard<std::mutex> availableListLock(m_AvailableMutex);
109
110 m_AvailableList.clear();
111 std::queue<IPacketBufferPtr>().swap(m_ReadableList);
112
113 Initialize();
114 }
115
GetReadableBuffer()116 IPacketBufferPtr BufferManager::GetReadableBuffer()
117 {
118 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
119 if (!m_ReadableList.empty())
120 {
121 IPacketBufferPtr buffer = std::move(m_ReadableList.front());
122 m_ReadableList.pop();
123 readableListLock.unlock();
124 return buffer;
125 }
126 return nullptr;
127 }
128
MarkRead(IPacketBufferPtr & packetBuffer)129 void BufferManager::MarkRead(IPacketBufferPtr& packetBuffer)
130 {
131 std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock);
132 packetBuffer->MarkRead();
133 availableListLock.lock();
134 if (m_AvailableList.size() <= m_NumberOfBuffers)
135 {
136 m_AvailableList.push_back(std::move(packetBuffer));
137 }
138 else
139 {
140 // we have been handed a temporary overflow/surge buffer get rid of it
141 packetBuffer->Destroy();
142 if (m_CurrentNumberOfBuffers > m_NumberOfBuffers)
143 {
144 --m_CurrentNumberOfBuffers;
145 }
146 }
147 availableListLock.unlock();
148 }
149
SetConsumer(IConsumer * consumer)150 void BufferManager::SetConsumer(IConsumer* consumer)
151 {
152 m_Consumer = consumer;
153 }
154
FlushReadList()155 void BufferManager::FlushReadList()
156 {
157 // notify consumer that packet is ready to read
158 if (m_Consumer != nullptr)
159 {
160 m_Consumer->SetReadyToRead();
161 }
162 }
163
164 } // namespace profiling
165
166 } // namespace armnn
167