• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package org.testng.internal.thread.graph;
2 
3 import org.testng.TestNGException;
4 import org.testng.collections.Lists;
5 import org.testng.internal.DynamicGraph;
6 import org.testng.internal.DynamicGraph.Status;
7 
8 import java.io.BufferedWriter;
9 import java.io.File;
10 import java.io.FileWriter;
11 import java.io.IOException;
12 import java.util.List;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.ThreadPoolExecutor;
16 import java.util.concurrent.TimeUnit;
17 
18 /**
19  * An Executor that launches tasks per batches. It takes a {@code DynamicGraph}
20  * of tasks to be run and a {@code IThreadWorkerFactory} to initialize/create
21  * {@code Runnable} wrappers around those tasks
22  */
23 public class GraphThreadPoolExecutor<T> extends ThreadPoolExecutor {
24   private static final boolean DEBUG = false;
25   /** Set to true if you want to generate GraphViz graphs */
26   private static final boolean DOT_FILES = false;
27 
28   private DynamicGraph<T> m_graph;
29   private List<Runnable> m_activeRunnables = Lists.newArrayList();
30   private IThreadWorkerFactory<T> m_factory;
31   private List<String> m_dotFiles = Lists.newArrayList();
32   private int m_threadCount;
33 
GraphThreadPoolExecutor(DynamicGraph<T> graph, IThreadWorkerFactory<T> factory, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)34   public GraphThreadPoolExecutor(DynamicGraph<T> graph, IThreadWorkerFactory<T> factory, int corePoolSize,
35       int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
36     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
37         /* , new TestNGThreadPoolFactory() */);
38     ppp("Initializing executor with " + corePoolSize + " threads and following graph " + graph);
39     m_threadCount = maximumPoolSize;
40     m_graph = graph;
41     m_factory = factory;
42 
43     if (m_graph.getFreeNodes().isEmpty()) {
44       throw new TestNGException("The graph of methods contains a cycle:" + graph.getEdges());
45     }
46   }
47 
run()48   public void run() {
49     synchronized(m_graph) {
50       if (DOT_FILES) {
51         m_dotFiles.add(m_graph.toDot());
52       }
53       List<T> freeNodes = m_graph.getFreeNodes();
54       runNodes(freeNodes);
55     }
56   }
57 
58   /**
59    * Create one worker per node and execute them.
60    */
runNodes(List<T> freeNodes)61   private void runNodes(List<T> freeNodes) {
62     List<IWorker<T>> runnables = m_factory.createWorkers(freeNodes);
63     for (IWorker<T> r : runnables) {
64       m_activeRunnables.add(r);
65       ppp("Added to active runnable");
66       setStatus(r, Status.RUNNING);
67       ppp("Executing: " + r);
68       try {
69         execute(r);
70 //        if (m_threadCount > 1) execute(r);
71 //        else r.run();
72       }
73       catch(Exception ex) {
74         ex.printStackTrace();
75       }
76     }
77   }
78 
setStatus(IWorker<T> worker, Status status)79   private void setStatus(IWorker<T> worker, Status status) {
80     ppp("Set status:" + worker + " status:" + status);
81     if (status == Status.FINISHED) {
82       m_activeRunnables.remove(worker);
83     }
84     synchronized(m_graph) {
85       for (T m : worker.getTasks()) {
86         m_graph.setStatus(m, status);
87       }
88     }
89   }
90 
91   @Override
afterExecute(Runnable r, Throwable t)92   public void afterExecute(Runnable r, Throwable t) {
93     ppp("Finished runnable:" + r);
94     setStatus((IWorker<T>) r, Status.FINISHED);
95     synchronized(m_graph) {
96       ppp("Node count:" + m_graph.getNodeCount() + " and "
97           + m_graph.getNodeCountWithStatus(Status.FINISHED) + " finished");
98       if (m_graph.getNodeCount() == m_graph.getNodeCountWithStatus(Status.FINISHED)) {
99         ppp("Shutting down executor " + this);
100         if (DOT_FILES) {
101           generateFiles(m_dotFiles);
102         }
103         shutdown();
104       } else {
105         if (DOT_FILES) {
106           m_dotFiles.add(m_graph.toDot());
107         }
108         List<T> freeNodes = m_graph.getFreeNodes();
109         runNodes(freeNodes);
110       }
111     }
112 //    if (m_activeRunnables.isEmpty() && m_index < m_runnables.getSize()) {
113 //      runNodes(m_index++);
114 //    }
115   }
116 
generateFiles(List<String> files)117   private void generateFiles(List<String> files) {
118     try {
119       File dir = File.createTempFile("TestNG-", "");
120       dir.delete();
121       dir.mkdir();
122       for (int i = 0; i < files.size(); i++) {
123         File f = new File(dir, "" + (i < 10 ? "0" : "") + i + ".dot");
124         BufferedWriter bw = new BufferedWriter(new FileWriter(f));
125         bw.append(files.get(i));
126         bw.close();
127       }
128       if (DOT_FILES) {
129         System.out.println("Created graph files in " + dir);
130       }
131     } catch(IOException ex) {
132       ex.printStackTrace();
133     }
134   }
135 
ppp(String string)136   private void ppp(String string) {
137     if (DEBUG) {
138       System.out.println("============ [GraphThreadPoolExecutor] " + Thread.currentThread().getId() + " "
139           + string);
140     }
141   }
142 
143 }
144 
145 class TestNGThreadPoolFactory implements ThreadFactory {
146   private int m_count = 0;
147 
148   @Override
newThread(Runnable r)149   public Thread newThread(Runnable r) {
150     Thread result = new Thread(r);
151     result.setName("TestNG-" + m_count++);
152     return result;
153   }
154 }
155