• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
6 #include "FileOnlyProfilingConnection.hpp"
7 
8 #include <common/include/Constants.hpp>
9 #include <common/include/ProfilingException.hpp>
10 #include <common/include/PacketVersionResolver.hpp>
11 
12 #include <algorithm>
13 #include <iostream>
14 
15 #if defined(ARMNN_DISABLE_THREADS)
16 #include <common/include/IgnoreUnused.hpp>
17 #endif
18 
19 namespace arm
20 {
21 
22 namespace pipe
23 {
24 
GetHeadersAccepted()25 std::vector<uint32_t> StreamMetaDataProcessor::GetHeadersAccepted()
26 {
27     std::vector<uint32_t> headers;
28     headers.push_back(m_MetaDataPacketHeader);
29     return headers;
30 }
31 
HandlePacket(const arm::pipe::Packet & packet)32 void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
33 {
34     if (packet.GetHeader() != m_MetaDataPacketHeader)
35     {
36         throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
37     }
38     // determine the endianness of the protocol
39     TargetEndianness endianness;
40     if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
41     {
42         endianness = TargetEndianness::BeWire;
43     }
44     else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
45     {
46         endianness = TargetEndianness::LeWire;
47     }
48     else
49     {
50         throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
51     }
52     m_FileOnlyProfilingConnection->SetEndianess(endianness);
53     // send back the acknowledgement
54     std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
55     arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
56     m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
57 }
58 
ToUint32(const unsigned char * data,TargetEndianness endianness)59 uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
60 {
61     // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
62     // specified endianness.
63     if (endianness == TargetEndianness::BeWire)
64     {
65         return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
66                static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
67     }
68     else
69     {
70         return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
71                static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
72     }
73 }
74 
~FileOnlyProfilingConnection()75 FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
76 {
77     try
78     {
79         Close();
80     }
81     catch (...)
82     {
83         // do nothing
84     }
85 }
86 
IsOpen() const87 bool FileOnlyProfilingConnection::IsOpen() const
88 {
89     // This type of connection is always open.
90     return true;
91 }
92 
Close()93 void FileOnlyProfilingConnection::Close()
94 {
95     // Dump any unread packets out of the queue.
96     size_t initialSize = m_PacketQueue.size();
97     for (size_t i = 0; i < initialSize; ++i)
98     {
99         m_PacketQueue.pop();
100     }
101     // dispose of the processing thread
102     m_KeepRunning.store(false);
103 #if !defined(ARMNN_DISABLE_THREADS)
104     if (m_LocalHandlersThread.joinable())
105     {
106         // make sure the thread wakes up and sees it has to stop
107         m_ConditionPacketReadable.notify_one();
108         m_LocalHandlersThread.join();
109     }
110 #endif
111 }
112 
WritePacket(const unsigned char * buffer,uint32_t length)113 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
114 {
115     ARM_PIPE_ASSERT(buffer);
116     arm::pipe::Packet packet = ReceivePacket(buffer, length);
117     ForwardPacketToHandlers(packet);
118     return true;
119 }
120 
ReturnPacket(arm::pipe::Packet & packet)121 void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
122 {
123     {
124 #if !defined(ARMNN_DISABLE_THREADS)
125         std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
126 #endif
127         m_PacketQueue.push(std::move(packet));
128     }
129 #if !defined(ARMNN_DISABLE_THREADS)
130     m_ConditionPacketAvailable.notify_one();
131 #endif
132 }
133 
ReadPacket(uint32_t timeout)134 arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
135 {
136 #if !defined(ARMNN_DISABLE_THREADS)
137     std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
138 
139     // Here we are using m_PacketQueue.empty() as a predicate variable
140     // The conditional variable will wait until packetQueue is not empty or until a timeout
141     if (!m_ConditionPacketAvailable.wait_for(lck,
142                                              std::chrono::milliseconds(timeout),
143                                              [&]{return !m_PacketQueue.empty();}))
144     {
145         arm::pipe::Packet empty;
146         return empty;
147     }
148 #else
149     IgnoreUnused(timeout);
150 #endif
151 
152     arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
153     m_PacketQueue.pop();
154     return returnedPacket;
155 }
156 
Fail(const std::string & errorMessage)157 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
158 {
159     Close();
160     throw arm::pipe::ProfilingException(errorMessage);
161 }
162 
163 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
164 /// a processing thread that will ensure that processing of packets will happen on a separate
165 /// thread from the profiling services send thread and will therefore protect against the
166 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)167 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
168 {
169     m_PacketHandlers.push_back(std::move(localPacketHandler));
170     ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
171     localCopy->SetConnection(this);
172     if (localCopy->GetHeadersAccepted().empty())
173     {
174         //this is a universal handler
175         m_UniversalHandlers.push_back(localCopy);
176     }
177     else
178     {
179         for (uint32_t header : localCopy->GetHeadersAccepted())
180         {
181             auto iter = m_IndexedHandlers.find(header);
182             if (iter == m_IndexedHandlers.end())
183             {
184                 std::vector<ILocalPacketHandlerSharedPtr> handlers;
185                 handlers.push_back(localCopy);
186                 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
187             }
188             else
189             {
190                 iter->second.push_back(localCopy);
191             }
192         }
193     }
194 }
195 
StartProcessingThread()196 void FileOnlyProfilingConnection::StartProcessingThread()
197 {
198     // check if the thread has already started
199     if (m_IsRunning.load())
200     {
201         return;
202     }
203     // make sure if there was one running before it is joined
204 #if !defined(ARMNN_DISABLE_THREADS)
205     if (m_LocalHandlersThread.joinable())
206     {
207         m_LocalHandlersThread.join();
208     }
209 #endif
210     m_IsRunning.store(true);
211     m_KeepRunning.store(true);
212 #if !defined(ARMNN_DISABLE_THREADS)
213     m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
214 #endif
215 }
216 
ForwardPacketToHandlers(arm::pipe::Packet & packet)217 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
218 {
219     if (m_PacketHandlers.empty())
220     {
221         return;
222     }
223     if (!m_KeepRunning.load())
224     {
225         return;
226     }
227     {
228 #if !defined(ARMNN_DISABLE_THREADS)
229         std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
230 #endif
231         if (!m_KeepRunning.load())
232         {
233             return;
234         }
235         m_ReadableList.push(std::move(packet));
236     }
237 #if !defined(ARMNN_DISABLE_THREADS)
238     m_ConditionPacketReadable.notify_one();
239 #endif
240 }
241 
ServiceLocalHandlers()242 void FileOnlyProfilingConnection::ServiceLocalHandlers()
243 {
244     do
245     {
246         arm::pipe::Packet returnedPacket;
247         bool readPacket = false;
248         {   // only lock while we are taking the packet off the incoming list
249 #if !defined(ARMNN_DISABLE_THREADS)
250             std::unique_lock<std::mutex> lck(m_ReadableMutex);
251 #endif
252             if (m_Timeout < 0)
253             {
254 #if !defined(ARMNN_DISABLE_THREADS)
255                 m_ConditionPacketReadable.wait(lck,
256                                                [&] { return !m_ReadableList.empty(); });
257 #endif
258             }
259             else
260             {
261 #if !defined(ARMNN_DISABLE_THREADS)
262                 m_ConditionPacketReadable.wait_for(lck,
263                                                    std::chrono::milliseconds(std::max(m_Timeout, 1000)),
264                                                    [&] { return !m_ReadableList.empty(); });
265 #endif
266             }
267             if (m_KeepRunning.load())
268             {
269                 if (!m_ReadableList.empty())
270                 {
271                     returnedPacket = std::move(m_ReadableList.front());
272                     m_ReadableList.pop();
273                     readPacket = true;
274                 }
275             }
276             else
277             {
278                 ClearReadableList();
279             }
280         }
281         if (m_KeepRunning.load() && readPacket)
282         {
283             DispatchPacketToHandlers(returnedPacket);
284         }
285     } while (m_KeepRunning.load());
286     // make sure the readable list is cleared
287     ClearReadableList();
288     m_IsRunning.store(false);
289 }
290 
ClearReadableList()291 void FileOnlyProfilingConnection::ClearReadableList()
292 {
293     // make sure the incoming packet queue gets emptied
294     size_t initialSize = m_ReadableList.size();
295     for (size_t i = 0; i < initialSize; ++i)
296     {
297         m_ReadableList.pop();
298     }
299 }
300 
DispatchPacketToHandlers(const arm::pipe::Packet & packet)301 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
302 {
303     for (auto& delegate : m_UniversalHandlers)
304     {
305         delegate->HandlePacket(packet);
306     }
307     auto iter = m_IndexedHandlers.find(packet.GetHeader());
308     if (iter != m_IndexedHandlers.end())
309     {
310         for (auto& delegate : iter->second)
311         {
312             try
313             {
314                 delegate->HandlePacket(packet);
315             }
316             catch (const arm::pipe::ProfilingException& ex)
317             {
318                 Fail(ex.what());
319             }
320             catch (const std::exception& ex)
321             {
322                 Fail(ex.what());
323             }
324             catch (...)
325             {
326                 Fail("handler failed");
327             }
328         }
329     }
330 }
331 
332 }    // namespace pipe
333 
334 }    // namespace arm
335