1 //
2 // Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "FileOnlyProfilingConnection.hpp"
7
8 #include <common/include/Constants.hpp>
9 #include <common/include/ProfilingException.hpp>
10 #include <common/include/PacketVersionResolver.hpp>
11
12 #include <algorithm>
13 #include <iostream>
14
15 #if defined(ARMNN_DISABLE_THREADS)
16 #include <common/include/IgnoreUnused.hpp>
17 #endif
18
19 namespace arm
20 {
21
22 namespace pipe
23 {
24
GetHeadersAccepted()25 std::vector<uint32_t> StreamMetaDataProcessor::GetHeadersAccepted()
26 {
27 std::vector<uint32_t> headers;
28 headers.push_back(m_MetaDataPacketHeader);
29 return headers;
30 }
31
HandlePacket(const arm::pipe::Packet & packet)32 void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
33 {
34 if (packet.GetHeader() != m_MetaDataPacketHeader)
35 {
36 throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
37 }
38 // determine the endianness of the protocol
39 TargetEndianness endianness;
40 if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
41 {
42 endianness = TargetEndianness::BeWire;
43 }
44 else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
45 {
46 endianness = TargetEndianness::LeWire;
47 }
48 else
49 {
50 throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
51 }
52 m_FileOnlyProfilingConnection->SetEndianess(endianness);
53 // send back the acknowledgement
54 std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
55 arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
56 m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
57 }
58
ToUint32(const unsigned char * data,TargetEndianness endianness)59 uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
60 {
61 // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
62 // specified endianness.
63 if (endianness == TargetEndianness::BeWire)
64 {
65 return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
66 static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
67 }
68 else
69 {
70 return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
71 static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
72 }
73 }
74
~FileOnlyProfilingConnection()75 FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
76 {
77 try
78 {
79 Close();
80 }
81 catch (...)
82 {
83 // do nothing
84 }
85 }
86
IsOpen() const87 bool FileOnlyProfilingConnection::IsOpen() const
88 {
89 // This type of connection is always open.
90 return true;
91 }
92
Close()93 void FileOnlyProfilingConnection::Close()
94 {
95 // Dump any unread packets out of the queue.
96 size_t initialSize = m_PacketQueue.size();
97 for (size_t i = 0; i < initialSize; ++i)
98 {
99 m_PacketQueue.pop();
100 }
101 // dispose of the processing thread
102 m_KeepRunning.store(false);
103 #if !defined(ARMNN_DISABLE_THREADS)
104 if (m_LocalHandlersThread.joinable())
105 {
106 // make sure the thread wakes up and sees it has to stop
107 m_ConditionPacketReadable.notify_one();
108 m_LocalHandlersThread.join();
109 }
110 #endif
111 }
112
WritePacket(const unsigned char * buffer,uint32_t length)113 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
114 {
115 ARM_PIPE_ASSERT(buffer);
116 arm::pipe::Packet packet = ReceivePacket(buffer, length);
117 ForwardPacketToHandlers(packet);
118 return true;
119 }
120
ReturnPacket(arm::pipe::Packet & packet)121 void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
122 {
123 {
124 #if !defined(ARMNN_DISABLE_THREADS)
125 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
126 #endif
127 m_PacketQueue.push(std::move(packet));
128 }
129 #if !defined(ARMNN_DISABLE_THREADS)
130 m_ConditionPacketAvailable.notify_one();
131 #endif
132 }
133
ReadPacket(uint32_t timeout)134 arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
135 {
136 #if !defined(ARMNN_DISABLE_THREADS)
137 std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
138
139 // Here we are using m_PacketQueue.empty() as a predicate variable
140 // The conditional variable will wait until packetQueue is not empty or until a timeout
141 if (!m_ConditionPacketAvailable.wait_for(lck,
142 std::chrono::milliseconds(timeout),
143 [&]{return !m_PacketQueue.empty();}))
144 {
145 arm::pipe::Packet empty;
146 return empty;
147 }
148 #else
149 IgnoreUnused(timeout);
150 #endif
151
152 arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
153 m_PacketQueue.pop();
154 return returnedPacket;
155 }
156
Fail(const std::string & errorMessage)157 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
158 {
159 Close();
160 throw arm::pipe::ProfilingException(errorMessage);
161 }
162
163 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
164 /// a processing thread that will ensure that processing of packets will happen on a separate
165 /// thread from the profiling services send thread and will therefore protect against the
166 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)167 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
168 {
169 m_PacketHandlers.push_back(std::move(localPacketHandler));
170 ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
171 localCopy->SetConnection(this);
172 if (localCopy->GetHeadersAccepted().empty())
173 {
174 //this is a universal handler
175 m_UniversalHandlers.push_back(localCopy);
176 }
177 else
178 {
179 for (uint32_t header : localCopy->GetHeadersAccepted())
180 {
181 auto iter = m_IndexedHandlers.find(header);
182 if (iter == m_IndexedHandlers.end())
183 {
184 std::vector<ILocalPacketHandlerSharedPtr> handlers;
185 handlers.push_back(localCopy);
186 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
187 }
188 else
189 {
190 iter->second.push_back(localCopy);
191 }
192 }
193 }
194 }
195
StartProcessingThread()196 void FileOnlyProfilingConnection::StartProcessingThread()
197 {
198 // check if the thread has already started
199 if (m_IsRunning.load())
200 {
201 return;
202 }
203 // make sure if there was one running before it is joined
204 #if !defined(ARMNN_DISABLE_THREADS)
205 if (m_LocalHandlersThread.joinable())
206 {
207 m_LocalHandlersThread.join();
208 }
209 #endif
210 m_IsRunning.store(true);
211 m_KeepRunning.store(true);
212 #if !defined(ARMNN_DISABLE_THREADS)
213 m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
214 #endif
215 }
216
ForwardPacketToHandlers(arm::pipe::Packet & packet)217 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
218 {
219 if (m_PacketHandlers.empty())
220 {
221 return;
222 }
223 if (!m_KeepRunning.load())
224 {
225 return;
226 }
227 {
228 #if !defined(ARMNN_DISABLE_THREADS)
229 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
230 #endif
231 if (!m_KeepRunning.load())
232 {
233 return;
234 }
235 m_ReadableList.push(std::move(packet));
236 }
237 #if !defined(ARMNN_DISABLE_THREADS)
238 m_ConditionPacketReadable.notify_one();
239 #endif
240 }
241
ServiceLocalHandlers()242 void FileOnlyProfilingConnection::ServiceLocalHandlers()
243 {
244 do
245 {
246 arm::pipe::Packet returnedPacket;
247 bool readPacket = false;
248 { // only lock while we are taking the packet off the incoming list
249 #if !defined(ARMNN_DISABLE_THREADS)
250 std::unique_lock<std::mutex> lck(m_ReadableMutex);
251 #endif
252 if (m_Timeout < 0)
253 {
254 #if !defined(ARMNN_DISABLE_THREADS)
255 m_ConditionPacketReadable.wait(lck,
256 [&] { return !m_ReadableList.empty(); });
257 #endif
258 }
259 else
260 {
261 #if !defined(ARMNN_DISABLE_THREADS)
262 m_ConditionPacketReadable.wait_for(lck,
263 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
264 [&] { return !m_ReadableList.empty(); });
265 #endif
266 }
267 if (m_KeepRunning.load())
268 {
269 if (!m_ReadableList.empty())
270 {
271 returnedPacket = std::move(m_ReadableList.front());
272 m_ReadableList.pop();
273 readPacket = true;
274 }
275 }
276 else
277 {
278 ClearReadableList();
279 }
280 }
281 if (m_KeepRunning.load() && readPacket)
282 {
283 DispatchPacketToHandlers(returnedPacket);
284 }
285 } while (m_KeepRunning.load());
286 // make sure the readable list is cleared
287 ClearReadableList();
288 m_IsRunning.store(false);
289 }
290
ClearReadableList()291 void FileOnlyProfilingConnection::ClearReadableList()
292 {
293 // make sure the incoming packet queue gets emptied
294 size_t initialSize = m_ReadableList.size();
295 for (size_t i = 0; i < initialSize; ++i)
296 {
297 m_ReadableList.pop();
298 }
299 }
300
DispatchPacketToHandlers(const arm::pipe::Packet & packet)301 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
302 {
303 for (auto& delegate : m_UniversalHandlers)
304 {
305 delegate->HandlePacket(packet);
306 }
307 auto iter = m_IndexedHandlers.find(packet.GetHeader());
308 if (iter != m_IndexedHandlers.end())
309 {
310 for (auto& delegate : iter->second)
311 {
312 try
313 {
314 delegate->HandlePacket(packet);
315 }
316 catch (const arm::pipe::ProfilingException& ex)
317 {
318 Fail(ex.what());
319 }
320 catch (const std::exception& ex)
321 {
322 Fail(ex.what());
323 }
324 catch (...)
325 {
326 Fail("handler failed");
327 }
328 }
329 }
330 }
331
332 } // namespace pipe
333
334 } // namespace arm
335