• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package org.testng.remote.strprotocol;
2 
3 import java.io.BufferedReader;
4 import java.io.BufferedWriter;
5 import java.io.Closeable;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.io.InputStreamReader;
9 import java.io.OutputStream;
10 import java.io.OutputStreamWriter;
11 import java.io.PrintWriter;
12 import java.io.UnsupportedEncodingException;
13 import java.net.ConnectException;
14 import java.net.ServerSocket;
15 import java.net.Socket;
16 import java.net.SocketTimeoutException;
17 
18 import org.testng.TestNGException;
19 
20 import static org.testng.remote.RemoteTestNG.isVerbose;
21 
22 abstract public class BaseMessageSender implements IMessageSender {
23   private boolean m_debug = false;
24   protected Socket m_clientSocket;
25   private String m_host;
26   private int m_port;
27   protected final Object m_ackLock = new Object();
28 
29   private boolean m_requestStopReceiver;
30   /** Outgoing message stream. */
31   protected OutputStream m_outStream;
32   /** Used to send ACK and STOP */
33   private PrintWriter m_outWriter;
34 
35   /** Incoming message stream. */
36   protected volatile InputStream m_inStream;
37   /** Used to receive ACK and STOP */
38   protected volatile BufferedReader m_inReader;
39 
40   private ReaderThread m_readerThread;
41   private boolean m_ack;
42 //  protected InputStream m_receiverInputStream;
43 
BaseMessageSender(String host, int port, boolean ack)44   public BaseMessageSender(String host, int port, boolean ack) {
45     m_host = host;
46     m_port = port;
47     m_ack = ack;
48   }
49 
50   /**
51    * Starts the connection.
52    *
53    * @throws TestNGException if an exception occurred while establishing the connection
54    */
55   @Override
connect()56   public void connect() throws IOException {
57     p("Waiting for Eclipse client on " + m_host + ":" + m_port);
58     while (true) {
59       try {
60         m_clientSocket = new Socket(m_host, m_port);
61         p("Received a connection from Eclipse on " + m_host + ":" + m_port);
62 
63         // Output streams
64         m_outStream = m_clientSocket.getOutputStream();
65         m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream)));
66 
67         // Input streams
68         m_inStream = m_clientSocket.getInputStream();
69         try {
70           m_inReader = new BufferedReader(new InputStreamReader(m_inStream,
71               "UTF-8")); //$NON-NLS-1$
72         }
73         catch(UnsupportedEncodingException ueex) {
74           // Should never happen
75           m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
76         }
77 
78         p("Connection established, starting reader thread");
79         m_readerThread = new ReaderThread();
80         m_readerThread.start();
81         return;
82       }
83       catch(ConnectException ex) {
84         // ignore and retry
85         try {
86           Thread.sleep(4000);
87         }
88         catch(InterruptedException handled) {
89           Thread.currentThread().interrupt();
90         }
91       }
92     }
93   }
94 
sendAdminMessage(String message)95   private void sendAdminMessage(String message) {
96     m_outWriter.println(message);
97     m_outWriter.flush();
98   }
99 
100   private int m_serial = 0;
101 
102   @Override
sendAck()103   public void sendAck() {
104     p("Sending ACK " + m_serial);
105     // Note: adding the serial at the end of this message causes a lock up if interacting
106     // with TestNG 5.14 and older (reported by JetBrains). The following git commit:
107     // 5730bdfb33ec7a8bf4104852cd4a5f2875ba8267
108     // changed equals() to startsWith().
109     // It's ok to add this serial back for debugging, but don't commit it until JetBrains
110     // confirms they no longer need backward compatibility with 5.14.
111     sendAdminMessage(MessageHelper.ACK_MSG); // + m_serial++);
112   }
113 
114   @Override
sendStop()115   public void sendStop() {
116     sendAdminMessage(MessageHelper.STOP_MSG);
117   }
118 
119   @Override
initReceiver()120   public void initReceiver() throws SocketTimeoutException {
121     if (m_inStream != null) {
122       p("Receiver already initialized");
123     }
124     ServerSocket serverSocket = null;
125     try {
126       p("initReceiver on port " + m_port);
127       serverSocket = new ServerSocket(m_port);
128       serverSocket.setSoTimeout(5000);
129 
130       Socket socket = null;
131       while (!m_requestStopReceiver) {
132         try {
133           if (m_debug) {
134             p("polling the client connection");
135           }
136           socket = serverSocket.accept();
137           // break the loop once the first client connected
138           break;
139         }
140         catch (IOException ioe) {
141           try {
142             Thread.sleep(100L);
143           }
144           catch (InterruptedException ie) {
145             // Do nothing.
146           }
147         }
148       }
149       if (socket != null) {
150         m_inStream = socket.getInputStream();
151         m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
152         m_outStream = socket.getOutputStream();
153         m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream));
154       }
155     }
156     catch(SocketTimeoutException ste) {
157       throw ste;
158     }
159     catch (IOException ioe) {
160       closeQuietly(serverSocket);
161     }
162   }
163 
stopReceiver()164   public void stopReceiver() {
165     m_requestStopReceiver = true;
166   }
167 
168   @Override
shutDown()169   public void shutDown() {
170     closeQuietly(m_outStream);
171     m_outStream = null;
172 
173     if (null != m_readerThread) {
174       m_readerThread.interrupt();
175     }
176 
177     closeQuietly(m_inReader);
178     m_inReader = null;
179 
180     closeQuietly(m_clientSocket);
181     m_clientSocket = null;
182   }
183 
closeQuietly(Closeable c)184   private void closeQuietly(Closeable c) {
185     if (c != null) {
186       try {
187         c.close();
188       } catch (IOException e) {
189         if (m_debug) {
190           e.printStackTrace();
191         }
192       }
193     }
194   }
195 
196   private String m_latestAck;
197 
waitForAck()198   protected void waitForAck() {
199     if (m_ack) {
200       try {
201         p("Message sent, waiting for ACK...");
202         synchronized(m_ackLock) {
203           m_ackLock.wait();
204         }
205         p("... ACK received:" + m_latestAck);
206       }
207       catch(InterruptedException handled) {
208         Thread.currentThread().interrupt();
209       }
210     }
211   }
212 
p(String msg)213   private static void p(String msg) {
214     if (isVerbose()) {
215       System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$
216     }
217   }
218 
219   /**
220    * Reader thread that processes messages from the client.
221    */
222   private class ReaderThread extends Thread {
223 
ReaderThread()224     public ReaderThread() {
225       super("ReaderThread"); //$NON-NLS-1$
226     }
227 
228     @Override
run()229     public void run() {
230       try {
231         p("ReaderThread waiting for an admin message");
232         String message = m_inReader.readLine();
233         p("ReaderThread received admin message:" + message);
234         while (message != null) {
235           if (m_debug) {
236             p("Admin message:" + message); //$NON-NLS-1$
237           }
238           boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG);
239           boolean stop = MessageHelper.STOP_MSG.equals(message);
240           if(acknowledge || stop) {
241             if (acknowledge) {
242               p("Received ACK:" + message);
243               m_latestAck = message;
244             }
245             synchronized(m_ackLock) {
246               m_ackLock.notifyAll();
247             }
248             if (stop) {
249               break;
250             }
251           } else {
252             p("Received unknown message: '" + message + "'");
253           }
254           message = m_inReader != null ? m_inReader.readLine() : null;
255         }
256 //        while((m_reader != null) && (message = m_reader.readLine()) != null) {
257 //          if (m_debug) {
258 //            p("Admin message:" + message); //$NON-NLS-1$
259 //          }
260 //          boolean acknowledge = MessageHelper.ACK_MSG.equals(message);
261 //          boolean stop = MessageHelper.STOP_MSG.equals(message);
262 //          if(acknowledge || stop) {
263 //            synchronized(m_lock) {
264 //              m_lock.notifyAll();
265 //            }
266 //            if (stop) {
267 //              break;
268 //            }
269 //          }
270 //        }
271       }
272       catch(IOException ioe) {
273         if (isVerbose()) {
274           ioe.printStackTrace();
275         }
276       }
277     }
278   }
279 }
280