1 //
2 // Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "FileOnlyProfilingConnection.hpp"
7 #include "PacketVersionResolver.hpp"
8
9 #include <armnn/Exceptions.hpp>
10 #include <common/include/Constants.hpp>
11 #include <common/include/ProfilingException.hpp>
12
13 #include <algorithm>
14 #include <iostream>
15 #include <thread>
16
17 namespace armnn
18 {
19
20 namespace profiling
21 {
22
GetHeadersAccepted()23 std::vector<uint32_t> StreamMetaDataProcessor::GetHeadersAccepted()
24 {
25 std::vector<uint32_t> headers;
26 headers.push_back(m_MetaDataPacketHeader);
27 return headers;
28 }
29
HandlePacket(const arm::pipe::Packet & packet)30 void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
31 {
32 if (packet.GetHeader() != m_MetaDataPacketHeader)
33 {
34 throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
35 }
36 // determine the endianness of the protocol
37 TargetEndianness endianness;
38 if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
39 {
40 endianness = TargetEndianness::BeWire;
41 }
42 else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
43 {
44 endianness = TargetEndianness::LeWire;
45 }
46 else
47 {
48 throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
49 }
50 m_FileOnlyProfilingConnection->SetEndianess(endianness);
51 // send back the acknowledgement
52 std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
53 arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
54 m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
55 }
56
ToUint32(const unsigned char * data,TargetEndianness endianness)57 uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
58 {
59 // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
60 // specified endianness.
61 if (endianness == TargetEndianness::BeWire)
62 {
63 return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
64 static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
65 }
66 else
67 {
68 return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
69 static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
70 }
71 }
72
~FileOnlyProfilingConnection()73 FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
74 {
75 try
76 {
77 Close();
78 }
79 catch (...)
80 {
81 // do nothing
82 }
83 }
84
IsOpen() const85 bool FileOnlyProfilingConnection::IsOpen() const
86 {
87 // This type of connection is always open.
88 return true;
89 }
90
Close()91 void FileOnlyProfilingConnection::Close()
92 {
93 // Dump any unread packets out of the queue.
94 size_t initialSize = m_PacketQueue.size();
95 for (size_t i = 0; i < initialSize; ++i)
96 {
97 m_PacketQueue.pop();
98 }
99 // dispose of the processing thread
100 m_KeepRunning.store(false);
101 if (m_LocalHandlersThread.joinable())
102 {
103 // make sure the thread wakes up and sees it has to stop
104 m_ConditionPacketReadable.notify_one();
105 m_LocalHandlersThread.join();
106 }
107 }
108
WritePacket(const unsigned char * buffer,uint32_t length)109 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
110 {
111 ARMNN_ASSERT(buffer);
112 arm::pipe::Packet packet = ReceivePacket(buffer, length);
113 ForwardPacketToHandlers(packet);
114 return true;
115 }
116
ReturnPacket(arm::pipe::Packet & packet)117 void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
118 {
119 {
120 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
121 m_PacketQueue.push(std::move(packet));
122 }
123 m_ConditionPacketAvailable.notify_one();
124 }
125
ReadPacket(uint32_t timeout)126 arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
127 {
128 std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
129
130 // Here we are using m_PacketQueue.empty() as a predicate variable
131 // The conditional variable will wait until packetQueue is not empty or until a timeout
132 if (!m_ConditionPacketAvailable.wait_for(lck,
133 std::chrono::milliseconds(timeout),
134 [&]{return !m_PacketQueue.empty();}))
135 {
136 arm::pipe::Packet empty;
137 return empty;
138 }
139
140 arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
141 m_PacketQueue.pop();
142 return returnedPacket;
143 }
144
Fail(const std::string & errorMessage)145 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
146 {
147 Close();
148 throw RuntimeException(errorMessage);
149 }
150
151 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
152 /// a processing thread that will ensure that processing of packets will happen on a separate
153 /// thread from the profiling services send thread and will therefore protect against the
154 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)155 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
156 {
157 m_PacketHandlers.push_back(std::move(localPacketHandler));
158 ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
159 localCopy->SetConnection(this);
160 if (localCopy->GetHeadersAccepted().empty())
161 {
162 //this is a universal handler
163 m_UniversalHandlers.push_back(localCopy);
164 }
165 else
166 {
167 for (uint32_t header : localCopy->GetHeadersAccepted())
168 {
169 auto iter = m_IndexedHandlers.find(header);
170 if (iter == m_IndexedHandlers.end())
171 {
172 std::vector<ILocalPacketHandlerSharedPtr> handlers;
173 handlers.push_back(localCopy);
174 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
175 }
176 else
177 {
178 iter->second.push_back(localCopy);
179 }
180 }
181 }
182 }
183
StartProcessingThread()184 void FileOnlyProfilingConnection::StartProcessingThread()
185 {
186 // check if the thread has already started
187 if (m_IsRunning.load())
188 {
189 return;
190 }
191 // make sure if there was one running before it is joined
192 if (m_LocalHandlersThread.joinable())
193 {
194 m_LocalHandlersThread.join();
195 }
196 m_IsRunning.store(true);
197 m_KeepRunning.store(true);
198 m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
199 }
200
ForwardPacketToHandlers(arm::pipe::Packet & packet)201 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
202 {
203 if (m_PacketHandlers.empty())
204 {
205 return;
206 }
207 if (!m_KeepRunning.load())
208 {
209 return;
210 }
211 {
212 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
213 if (!m_KeepRunning.load())
214 {
215 return;
216 }
217 m_ReadableList.push(std::move(packet));
218 }
219 m_ConditionPacketReadable.notify_one();
220 }
221
ServiceLocalHandlers()222 void FileOnlyProfilingConnection::ServiceLocalHandlers()
223 {
224 do
225 {
226 arm::pipe::Packet returnedPacket;
227 bool readPacket = false;
228 { // only lock while we are taking the packet off the incoming list
229 std::unique_lock<std::mutex> lck(m_ReadableMutex);
230 if (m_Timeout < 0)
231 {
232 m_ConditionPacketReadable.wait(lck,
233 [&] { return !m_ReadableList.empty(); });
234 }
235 else
236 {
237 m_ConditionPacketReadable.wait_for(lck,
238 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
239 [&] { return !m_ReadableList.empty(); });
240 }
241 if (m_KeepRunning.load())
242 {
243 if (!m_ReadableList.empty())
244 {
245 returnedPacket = std::move(m_ReadableList.front());
246 m_ReadableList.pop();
247 readPacket = true;
248 }
249 }
250 else
251 {
252 ClearReadableList();
253 }
254 }
255 if (m_KeepRunning.load() && readPacket)
256 {
257 DispatchPacketToHandlers(returnedPacket);
258 }
259 } while (m_KeepRunning.load());
260 // make sure the readable list is cleared
261 ClearReadableList();
262 m_IsRunning.store(false);
263 }
264
ClearReadableList()265 void FileOnlyProfilingConnection::ClearReadableList()
266 {
267 // make sure the incoming packet queue gets emptied
268 size_t initialSize = m_ReadableList.size();
269 for (size_t i = 0; i < initialSize; ++i)
270 {
271 m_ReadableList.pop();
272 }
273 }
274
DispatchPacketToHandlers(const arm::pipe::Packet & packet)275 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
276 {
277 for (auto& delegate : m_UniversalHandlers)
278 {
279 delegate->HandlePacket(packet);
280 }
281 auto iter = m_IndexedHandlers.find(packet.GetHeader());
282 if (iter != m_IndexedHandlers.end())
283 {
284 for (auto& delegate : iter->second)
285 {
286 try
287 {
288 delegate->HandlePacket(packet);
289 }
290 catch (const arm::pipe::ProfilingException& ex)
291 {
292 Fail(ex.what());
293 }
294 catch (const std::exception& ex)
295 {
296 Fail(ex.what());
297 }
298 catch (...)
299 {
300 Fail("handler failed");
301 }
302 }
303 }
304 }
305
306 } // namespace profiling
307
308 } // namespace armnn
309