1 /*-------------------------------------------------------------------------
2 * drawElements Quality Program Test Executor
3 * ------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Tcp/Ip communication link.
22 *//*--------------------------------------------------------------------*/
23
24 #include "xeTcpIpLink.hpp"
25 #include "xsProtocol.hpp"
26 #include "deClock.h"
27 #include "deInt32.h"
28
29 namespace xe
30 {
31
32 enum
33 {
34 SEND_BUFFER_BLOCK_SIZE = 1024,
35 SEND_BUFFER_NUM_BLOCKS = 64
36 };
37
38 // Utilities for writing messages out.
39
writeMessageHeader(de::BlockBuffer<deUint8> & dst,xs::MessageType type,int messageSize)40 static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
41 {
42 deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43 xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44 dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45 }
46
writeKeepalive(de::BlockBuffer<deUint8> & dst)47 static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
48 {
49 writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50 dst.flush();
51 }
52
writeExecuteBinary(de::BlockBuffer<deUint8> & dst,const char * name,const char * params,const char * workDir,const char * caseList)53 static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
54 {
55 int nameSize = (int)strlen(name) + 1;
56 int paramsSize = (int)strlen(params) + 1;
57 int workDirSize = (int)strlen(workDir) + 1;
58 int caseListSize = (int)strlen(caseList) + 1;
59 int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
60
61 writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62 dst.write(nameSize, (const deUint8*)name);
63 dst.write(paramsSize, (const deUint8*)params);
64 dst.write(workDirSize, (const deUint8*)workDir);
65 dst.write(caseListSize, (const deUint8*)caseList);
66 dst.flush();
67 }
68
writeStopExecution(de::BlockBuffer<deUint8> & dst)69 static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
70 {
71 writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
72 dst.flush();
73 }
74
75 // TcpIpLinkState
76
TcpIpLinkState(CommLinkState initialState,const char * initialErr)77 TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78 : m_state (initialState)
79 , m_error (initialErr)
80 , m_lastKeepaliveReceived (0)
81 , m_stateChangedCallback (DE_NULL)
82 , m_testLogDataCallback (DE_NULL)
83 , m_infoLogDataCallback (DE_NULL)
84 , m_userPtr (DE_NULL)
85 {
86 }
87
~TcpIpLinkState(void)88 TcpIpLinkState::~TcpIpLinkState (void)
89 {
90 }
91
getState(void) const92 CommLinkState TcpIpLinkState::getState (void) const
93 {
94 de::ScopedLock lock(m_lock);
95
96 return m_state;
97 }
98
getState(std::string & error) const99 CommLinkState TcpIpLinkState::getState (std::string& error) const
100 {
101 de::ScopedLock lock(m_lock);
102
103 error = m_error;
104 return m_state;
105 }
106
setCallbacks(CommLink::StateChangedFunc stateChangedCallback,CommLink::LogDataFunc testLogDataCallback,CommLink::LogDataFunc infoLogDataCallback,void * userPtr)107 void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
108 {
109 de::ScopedLock lock(m_lock);
110
111 m_stateChangedCallback = stateChangedCallback;
112 m_testLogDataCallback = testLogDataCallback;
113 m_infoLogDataCallback = infoLogDataCallback;
114 m_userPtr = userPtr;
115 }
116
setState(CommLinkState state,const char * error)117 void TcpIpLinkState::setState (CommLinkState state, const char* error)
118 {
119 CommLink::StateChangedFunc callback = DE_NULL;
120 void* userPtr = DE_NULL;
121
122 {
123 de::ScopedLock lock(m_lock);
124
125 m_state = state;
126 m_error = error;
127
128 callback = m_stateChangedCallback;
129 userPtr = m_userPtr;
130 }
131
132 if (callback)
133 callback(userPtr, state, error);
134 }
135
onTestLogData(const deUint8 * bytes,size_t numBytes) const136 void TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const
137 {
138 CommLink::LogDataFunc callback = DE_NULL;
139 void* userPtr = DE_NULL;
140
141 m_lock.lock();
142 callback = m_testLogDataCallback;
143 userPtr = m_userPtr;
144 m_lock.unlock();
145
146 if (callback)
147 callback(userPtr, bytes, numBytes);
148 }
149
onInfoLogData(const deUint8 * bytes,size_t numBytes) const150 void TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const
151 {
152 CommLink::LogDataFunc callback = DE_NULL;
153 void* userPtr = DE_NULL;
154
155 m_lock.lock();
156 callback = m_infoLogDataCallback;
157 userPtr = m_userPtr;
158 m_lock.unlock();
159
160 if (callback)
161 callback(userPtr, bytes, numBytes);
162 }
163
onKeepaliveReceived(void)164 void TcpIpLinkState::onKeepaliveReceived (void)
165 {
166 de::ScopedLock lock(m_lock);
167 m_lastKeepaliveReceived = deGetMicroseconds();
168 }
169
getLastKeepaliveRecevied(void) const170 deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
171 {
172 de::ScopedLock lock(m_lock);
173 return m_lastKeepaliveReceived;
174 }
175
176 // TcpIpSendThread
177
TcpIpSendThread(de::Socket & socket,TcpIpLinkState & state)178 TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
179 : m_socket (socket)
180 , m_state (state)
181 , m_buffer (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182 , m_isRunning (false)
183 {
184 }
185
~TcpIpSendThread(void)186 TcpIpSendThread::~TcpIpSendThread (void)
187 {
188 }
189
start(void)190 void TcpIpSendThread::start (void)
191 {
192 DE_ASSERT(!m_isRunning);
193
194 // Reset state.
195 m_buffer.clear();
196 m_isRunning = true;
197
198 de::Thread::start();
199 }
200
run(void)201 void TcpIpSendThread::run (void)
202 {
203 try
204 {
205 deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
206
207 while (!m_buffer.isCanceled())
208 {
209 size_t numToSend = 0;
210 size_t numSent = 0;
211 deSocketResult result = DE_SOCKETRESULT_LAST;
212
213 try
214 {
215 // Wait for single byte and then try to read more.
216 m_buffer.read(1, &buf[0]);
217 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
218 }
219 catch (const de::BlockBuffer<deUint8>::CanceledException&)
220 {
221 // Handled in loop condition.
222 }
223
224 while (numSent < numToSend)
225 {
226 result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
227
228 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229 XE_FAIL("Connection closed");
230 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231 XE_FAIL("Connection terminated");
232 else if (result == DE_SOCKETRESULT_ERROR)
233 XE_FAIL("Socket error");
234 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
235 {
236 // \note Socket should not be in non-blocking mode.
237 DE_ASSERT(numSent == 0);
238 deYield();
239 }
240 else
241 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
242 }
243 }
244 }
245 catch (const std::exception& e)
246 {
247 m_state.setState(COMMLINKSTATE_ERROR, e.what());
248 }
249 }
250
stop(void)251 void TcpIpSendThread::stop (void)
252 {
253 if (m_isRunning)
254 {
255 m_buffer.cancel();
256 join();
257 m_isRunning = false;
258 }
259 }
260
261 // TcpIpRecvThread
262
TcpIpRecvThread(de::Socket & socket,TcpIpLinkState & state)263 TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
264 : m_socket (socket)
265 , m_state (state)
266 , m_curMsgPos (0)
267 , m_isRunning (false)
268 {
269 }
270
~TcpIpRecvThread(void)271 TcpIpRecvThread::~TcpIpRecvThread (void)
272 {
273 }
274
start(void)275 void TcpIpRecvThread::start (void)
276 {
277 DE_ASSERT(!m_isRunning);
278
279 // Reset state.
280 m_curMsgPos = 0;
281 m_isRunning = true;
282
283 de::Thread::start();
284 }
285
run(void)286 void TcpIpRecvThread::run (void)
287 {
288 try
289 {
290 for (;;)
291 {
292 bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293 bool hasPayload = false;
294 size_t messageSize = 0;
295 xs::MessageType messageType = (xs::MessageType)0;
296
297 if (hasHeader)
298 {
299 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300 hasPayload = m_curMsgPos >= messageSize;
301 }
302
303 if (hasPayload)
304 {
305 // Process message.
306 handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
307 m_curMsgPos = 0;
308 }
309 else
310 {
311 // Try to receive missing bytes.
312 size_t curSize = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
313 size_t bytesToRecv = curSize-m_curMsgPos;
314 size_t numRecv = 0;
315 deSocketResult result = DE_SOCKETRESULT_LAST;
316
317 if (m_curMsgBuf.size() < curSize)
318 m_curMsgBuf.resize(curSize);
319
320 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
321
322 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323 XE_FAIL("Connection closed");
324 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325 XE_FAIL("Connection terminated");
326 else if (result == DE_SOCKETRESULT_ERROR)
327 XE_FAIL("Socket error");
328 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
329 {
330 // \note Socket should not be in non-blocking mode.
331 DE_ASSERT(numRecv == 0);
332 deYield();
333 }
334 else
335 {
336 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337 DE_ASSERT(numRecv <= bytesToRecv);
338 m_curMsgPos += numRecv;
339 // Continue receiving bytes / handle message in next iter.
340 }
341 }
342 }
343 }
344 catch (const std::exception& e)
345 {
346 m_state.setState(COMMLINKSTATE_ERROR, e.what());
347 }
348 }
349
stop(void)350 void TcpIpRecvThread::stop (void)
351 {
352 if (m_isRunning)
353 {
354 // \note Socket must be closed before terminating receive thread.
355 XE_CHECK(!m_socket.isReceiveOpen());
356
357 join();
358 m_isRunning = false;
359 }
360 }
361
handleMessage(xs::MessageType messageType,const deUint8 * data,size_t dataSize)362 void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize)
363 {
364 switch (messageType)
365 {
366 case xs::MESSAGETYPE_KEEPALIVE:
367 m_state.onKeepaliveReceived();
368 break;
369
370 case xs::MESSAGETYPE_PROCESS_STARTED:
371 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372 m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
373 break;
374
375 case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
376 {
377 xs::ProcessLaunchFailedMessage msg(data, dataSize);
378 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
380 break;
381 }
382
383 case xs::MESSAGETYPE_PROCESS_FINISHED:
384 {
385 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386 xs::ProcessFinishedMessage msg(data, dataSize);
387 m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388 DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
389 break;
390 }
391
392 case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393 case xs::MESSAGETYPE_INFO:
394 // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395 if (data[dataSize-1] == 0)
396 dataSize -= 1;
397
398 if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
399 {
400 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401 m_state.onTestLogData(&data[0], dataSize);
402 }
403 else
404 m_state.onInfoLogData(&data[0], dataSize);
405 break;
406
407 default:
408 XE_FAIL("Unknown message");
409 }
410 }
411
412 // TcpIpLink
413
TcpIpLink(void)414 TcpIpLink::TcpIpLink (void)
415 : m_state (COMMLINKSTATE_ERROR, "Not connected")
416 , m_sendThread (m_socket, m_state)
417 , m_recvThread (m_socket, m_state)
418 , m_keepaliveTimer (DE_NULL)
419 {
420 m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421 XE_CHECK(m_keepaliveTimer);
422 }
423
~TcpIpLink(void)424 TcpIpLink::~TcpIpLink (void)
425 {
426 try
427 {
428 closeConnection();
429 }
430 catch (...)
431 {
432 // Can't do much except to ignore error.
433 }
434 deTimer_destroy(m_keepaliveTimer);
435 }
436
closeConnection(void)437 void TcpIpLink::closeConnection (void)
438 {
439 {
440 deSocketState state = m_socket.getState();
441 if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
442 m_socket.shutdown();
443 }
444
445 if (deTimer_isActive(m_keepaliveTimer))
446 deTimer_disable(m_keepaliveTimer);
447
448 if (m_sendThread.isRunning())
449 m_sendThread.stop();
450
451 if (m_recvThread.isRunning())
452 m_recvThread.stop();
453
454 if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
455 m_socket.close();
456 }
457
connect(const de::SocketAddress & address)458 void TcpIpLink::connect (const de::SocketAddress& address)
459 {
460 XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461 XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462 XE_CHECK(!m_sendThread.isRunning());
463 XE_CHECK(!m_recvThread.isRunning());
464
465 m_socket.connect(address);
466
467 try
468 {
469 // Clear error and set state to ready.
470 m_state.setState(COMMLINKSTATE_READY, "");
471 m_state.onKeepaliveReceived();
472
473 // Launch threads.
474 m_sendThread.start();
475 m_recvThread.start();
476
477 XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
478 }
479 catch (const std::exception& e)
480 {
481 closeConnection();
482 m_state.setState(COMMLINKSTATE_ERROR, e.what());
483 throw;
484 }
485 }
486
disconnect(void)487 void TcpIpLink::disconnect (void)
488 {
489 try
490 {
491 closeConnection();
492 m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
493 }
494 catch (const std::exception& e)
495 {
496 m_state.setState(COMMLINKSTATE_ERROR, e.what());
497 }
498 }
499
reset(void)500 void TcpIpLink::reset (void)
501 {
502 // \note Just clears error state if we are connected.
503 if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
504 {
505 m_state.setState(COMMLINKSTATE_READY, "");
506
507 // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
508 }
509 else
510 disconnect(); // Abnormal state/usage. Disconnect socket.
511 }
512
keepaliveTimerCallback(void * ptr)513 void TcpIpLink::keepaliveTimerCallback (void* ptr)
514 {
515 TcpIpLink* link = static_cast<TcpIpLink*>(ptr);
516 deUint64 lastKeepalive = link->m_state.getLastKeepaliveRecevied();
517 deUint64 curTime = deGetMicroseconds();
518
519 // Check for timeout.
520 if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
521 link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
522
523 // Enqueue new keepalive.
524 try
525 {
526 writeKeepalive(link->m_sendThread.getBuffer());
527 }
528 catch (const de::BlockBuffer<deUint8>::CanceledException&)
529 {
530 // Ignore. Can happen in connection teardown.
531 }
532 }
533
getState(void) const534 CommLinkState TcpIpLink::getState (void) const
535 {
536 return m_state.getState();
537 }
538
getState(std::string & message) const539 CommLinkState TcpIpLink::getState (std::string& message) const
540 {
541 return m_state.getState(message);
542 }
543
setCallbacks(StateChangedFunc stateChangedCallback,LogDataFunc testLogDataCallback,LogDataFunc infoLogDataCallback,void * userPtr)544 void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
545 {
546 m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
547 }
548
startTestProcess(const char * name,const char * params,const char * workingDir,const char * caseList)549 void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
550 {
551 XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
552
553 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
554 writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
555 }
556
stopTestProcess(void)557 void TcpIpLink::stopTestProcess (void)
558 {
559 XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
560 writeStopExecution(m_sendThread.getBuffer());
561 }
562
563 } // xe
564