• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5 
6 #include "SendThread.hpp"
7 #include "ProfilingUtils.hpp"
8 
9 #include <armnn/Exceptions.hpp>
10 #include <armnn/Conversion.hpp>
11 #include <armnn/utility/NumericCast.hpp>
12 
13 #include <Processes.hpp>
14 
15 #include <cstring>
16 
17 namespace armnn
18 {
19 
20 namespace profiling
21 {
22 
SendThread(armnn::profiling::ProfilingStateMachine & profilingStateMachine,armnn::profiling::IBufferManager & buffer,armnn::profiling::ISendCounterPacket & sendCounterPacket,int timeout)23 SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine,
24                        armnn::profiling::IBufferManager& buffer,
25                        armnn::profiling::ISendCounterPacket& sendCounterPacket,
26                        int timeout)
27     : m_StateMachine(profilingStateMachine)
28     , m_BufferManager(buffer)
29     , m_SendCounterPacket(sendCounterPacket)
30     , m_Timeout(timeout)
31     , m_IsRunning(false)
32     , m_KeepRunning(false)
33     , m_SendThreadException(nullptr)
34 {
35     m_BufferManager.SetConsumer(this);
36 }
37 
SetReadyToRead()38 void SendThread::SetReadyToRead()
39 {
40     // We need to wait for the send thread to release its mutex
41     {
42         std::lock_guard<std::mutex> lck(m_WaitMutex);
43         m_ReadyToRead = true;
44     }
45     // Signal the send thread that there's something to read in the buffer
46     m_WaitCondition.notify_one();
47 }
48 
Start(IProfilingConnection & profilingConnection)49 void SendThread::Start(IProfilingConnection& profilingConnection)
50 {
51     // Check if the send thread is already running
52     if (m_IsRunning.load())
53     {
54         // The send thread is already running
55         return;
56     }
57 
58     if (m_SendThread.joinable())
59     {
60         m_SendThread.join();
61     }
62 
63     // Mark the send thread as running
64     m_IsRunning.store(true);
65 
66     // Keep the send procedure going until the send thread is signalled to stop
67     m_KeepRunning.store(true);
68 
69     // Make sure the send thread will not flush the buffer until signaled to do so
70     // no need for a mutex as the send thread can not be running at this point
71     m_ReadyToRead = false;
72 
73     m_PacketSent = false;
74 
75     // Start the send thread
76     m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
77 }
78 
Stop(bool rethrowSendThreadExceptions)79 void SendThread::Stop(bool rethrowSendThreadExceptions)
80 {
81     // Signal the send thread to stop
82     m_KeepRunning.store(false);
83 
84     // Check that the send thread is running
85     if (m_SendThread.joinable())
86     {
87         // Kick the send thread out of the wait condition
88         SetReadyToRead();
89         // Wait for the send thread to complete operations
90         m_SendThread.join();
91     }
92 
93     // Check if the send thread exception has to be rethrown
94     if (!rethrowSendThreadExceptions)
95     {
96         // No need to rethrow the send thread exception, return immediately
97         return;
98     }
99 
100     // Check if there's an exception to rethrow
101     if (m_SendThreadException)
102     {
103         // Rethrow the send thread exception
104         std::rethrow_exception(m_SendThreadException);
105 
106         // Nullify the exception as it has been rethrown
107         m_SendThreadException = nullptr;
108     }
109 }
110 
Send(IProfilingConnection & profilingConnection)111 void SendThread::Send(IProfilingConnection& profilingConnection)
112 {
113     // Run once and keep the sending procedure looping until the thread is signalled to stop
114     do
115     {
116         // Check the current state of the profiling service
117         ProfilingState currentState = m_StateMachine.GetCurrentState();
118         switch (currentState)
119         {
120         case ProfilingState::Uninitialised:
121         case ProfilingState::NotConnected:
122 
123             // The send thread cannot be running when the profiling service is uninitialized or not connected,
124             // stop the thread immediately
125             m_KeepRunning.store(false);
126             m_IsRunning.store(false);
127 
128             // An exception should be thrown here, save it to be rethrown later from the main thread so that
129             // it can be caught by the consumer
130             m_SendThreadException =
131                     std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
132                                                              "profiling service not yet initialized or connected"));
133 
134             return;
135         case ProfilingState::WaitingForAck:
136 
137             // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
138             // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
139             // updated by the command handler
140 
141             // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
142             m_SendCounterPacket.SendStreamMetaDataPacket();
143 
144              // Flush the buffer manually to send the packet
145             FlushBuffer(profilingConnection);
146 
147             // Wait for a connection ack from the remote server. We should expect a response within timeout value.
148             // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
149             // StreamMetadata again.
150 
151             // Wait condition lock scope - Begin
152             {
153                 std::unique_lock<std::mutex> lock(m_WaitMutex);
154 
155                 bool timeout = m_WaitCondition.wait_for(lock,
156                                                         std::chrono::milliseconds(std::max(m_Timeout, 1000)),
157                                                         [&]{ return m_ReadyToRead; });
158                 // If we get notified we need to flush the buffer again
159                 if(timeout)
160                 {
161                     // Otherwise if we just timed out don't flush the buffer
162                     continue;
163                 }
164                 //reset condition variable predicate for next use
165                 m_ReadyToRead = false;
166             }
167             // Wait condition lock scope - End
168             break;
169         case ProfilingState::Active:
170         default:
171             // Wait condition lock scope - Begin
172             {
173                 std::unique_lock<std::mutex> lock(m_WaitMutex);
174 
175                 // Normal working state for the send thread
176                 // Check if the send thread is required to enforce a timeout wait policy
177                 if (m_Timeout < 0)
178                 {
179                     // Wait indefinitely until notified that something to read has become available in the buffer
180                     m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
181                 }
182                 else
183                 {
184                     // Wait until the thread is notified of something to read from the buffer,
185                     // or check anyway after the specified number of milliseconds
186                     m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
187                 }
188 
189                 //reset condition variable predicate for next use
190                 m_ReadyToRead = false;
191             }
192             // Wait condition lock scope - End
193             break;
194         }
195 
196         // Send all the available packets in the buffer
197         FlushBuffer(profilingConnection);
198     } while (m_KeepRunning.load());
199 
200     // Ensure that all readable data got written to the profiling connection before the thread is stopped
201     // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
202     FlushBuffer(profilingConnection, false);
203 
204     // Mark the send thread as not running
205     m_IsRunning.store(false);
206 }
207 
FlushBuffer(IProfilingConnection & profilingConnection,bool notifyWatchers)208 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
209 {
210     // Get the first available readable buffer
211     IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
212 
213     // Initialize the flag that indicates whether at least a packet has been sent
214     bool packetsSent = false;
215 
216     while (packetBuffer != nullptr)
217     {
218         // Get the data to send from the buffer
219         const unsigned char* readBuffer = packetBuffer->GetReadableData();
220         unsigned int readBufferSize = packetBuffer->GetSize();
221 
222         if (readBuffer == nullptr || readBufferSize == 0)
223         {
224             // Nothing to send, get the next available readable buffer and continue
225             m_BufferManager.MarkRead(packetBuffer);
226             packetBuffer = m_BufferManager.GetReadableBuffer();
227 
228             continue;
229         }
230 
231         // Check that the profiling connection is open, silently drop the data and continue if it's closed
232         if (profilingConnection.IsOpen())
233         {
234             // Write a packet to the profiling connection. Silently ignore any write error and continue
235             profilingConnection.WritePacket(readBuffer, armnn::numeric_cast<uint32_t>(readBufferSize));
236 
237             // Set the flag that indicates whether at least a packet has been sent
238             packetsSent = true;
239         }
240 
241         // Mark the packet buffer as read
242         m_BufferManager.MarkRead(packetBuffer);
243 
244         // Get the next available readable buffer
245         packetBuffer = m_BufferManager.GetReadableBuffer();
246     }
247     // Check whether at least a packet has been sent
248     if (packetsSent && notifyWatchers)
249     {
250         // Wait for the parent thread to release its mutex if necessary
251         {
252             std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
253             m_PacketSent = true;
254         }
255         // Notify to any watcher that something has been sent
256         m_PacketSentWaitCondition.notify_one();
257     }
258 }
259 
WaitForPacketSent(uint32_t timeout=1000)260 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
261 {
262     std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
263     // Blocks until notified that at least a packet has been sent or until timeout expires.
264     bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
265                                                        std::chrono::milliseconds(timeout),
266                                                        [&] { return m_PacketSent; });
267 
268     m_PacketSent = false;
269 
270     return timedOut;
271 }
272 
273 } // namespace profiling
274 
275 } // namespace armnn
276