1// Copyright 2019 Google LLC. 2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. 3package main 4 5import ( 6 "bufio" 7 "bytes" 8 "flag" 9 "fmt" 10 "log" 11 "math/rand" 12 "os" 13 "os/exec" 14 "path/filepath" 15 "runtime" 16 "strings" 17 "sync" 18 "sync/atomic" 19 "time" 20) 21 22// Too many GPU processes and we'll start to overwhelm your GPU, 23// even hanging your machine in the worst case. Here's a reasonable default. 24func defaultGpuLimit() int { 25 limit := 8 26 if n := runtime.NumCPU(); n < limit { 27 return n 28 } 29 return limit 30} 31 32var script = flag.String("script", "", "A file with jobs to run, one per line. - for stdin.") 33var random = flag.Bool("random", true, "Assign sources into job batches randomly?") 34var quiet = flag.Bool("quiet", false, "Print only failures?") 35var exact = flag.Bool("exact", false, "Match GM names only exactly.") 36var cpuLimit = flag.Int("cpuLimit", runtime.NumCPU(), 37 "Maximum number of concurrent processes for CPU-bound work.") 38var gpuLimit = flag.Int("gpuLimit", defaultGpuLimit(), 39 "Maximum number of concurrent processes for GPU-bound work.") 40 41func init() { 42 flag.StringVar(script, "s", *script, "Alias for --script.") 43 flag.BoolVar(random, "r", *random, "Alias for --random.") 44 flag.BoolVar(quiet, "q", *quiet, "Alias for --quiet.") 45 flag.BoolVar(exact, "e", *exact, "Alias for --exact.") 46 flag.IntVar(cpuLimit, "c", *cpuLimit, "Alias for --cpuLimit.") 47 flag.IntVar(gpuLimit, "g", *gpuLimit, "Alias for --gpuLimit.") 48} 49 50// Query fm binary for list of all available GMs/tests by running with --listGMs/--listTests. 51func listAll(flag string, fm string) (list []string, err error) { 52 cmd := exec.Command(fm, flag) 53 stdout, err := cmd.Output() 54 if err != nil { 55 return 56 } 57 // Names are listed line-by-line. 58 scanner := bufio.NewScanner(bytes.NewReader(stdout)) 59 for scanner.Scan() { 60 list = append(list, scanner.Text()) 61 } 62 err = scanner.Err() 63 return 64} 65 66type work struct { 67 Sources []string 68 Flags []string 69} 70 71func parseWork(args []string, gms []string, tests []string) (*work, error) { 72 w := &work{} 73 for _, arg := range args { 74 // Everything after a # is a comment. 75 if strings.HasPrefix(arg, "#") { 76 break 77 } 78 79 // Treat "gm" or "gms" as a shortcut for all known GMs. 80 if arg == "gm" || arg == "gms" { 81 w.Sources = append(w.Sources, gms...) 82 continue 83 } 84 // Same for tests. 85 if arg == "test" || arg == "tests" { 86 w.Sources = append(w.Sources, tests...) 87 continue 88 } 89 90 // -foo to remove foo if already added (e.g. by gms, tests). 91 if strings.HasPrefix(arg, "-") { 92 for i, s := range w.Sources { 93 if s == arg[1:] { 94 w.Sources = append(w.Sources[:i], w.Sources[i+1:]...) 95 break 96 } 97 } 98 continue 99 } 100 101 // Is this an option to pass through to fm? 102 if parts := strings.Split(arg, "="); len(parts) == 2 { 103 f := "-" 104 if len(parts[0]) > 1 { 105 f += "-" 106 } 107 f += parts[0] 108 109 w.Flags = append(w.Flags, f, parts[1]) 110 continue 111 } 112 113 // Is this argument naming a GM or test? 114 matched := false 115 for _, gm := range gms { 116 if (*exact && gm == arg) || (!*exact && strings.Contains(gm, arg)) { 117 w.Sources = append(w.Sources, gm) 118 matched = true 119 } 120 } 121 for _, test := range tests { 122 if (*exact && test == arg) || (!*exact && strings.Contains(test, arg)) { 123 w.Sources = append(w.Sources, test) 124 matched = true 125 } 126 } 127 if matched { 128 continue 129 } 130 131 // Anything left ought to be on the file system: a file, a directory, or a glob. 132 // Not all shells expand globs, so we'll do it here just in case. 133 matches, err := filepath.Glob(arg) 134 if err != nil { 135 return nil, err 136 } 137 if len(matches) == 0 { 138 return nil, fmt.Errorf("Don't understand '%s'.", arg) 139 } 140 141 for _, match := range matches { 142 err := filepath.Walk(match, func(path string, info os.FileInfo, err error) error { 143 if !info.IsDir() { 144 w.Sources = append(w.Sources, path) 145 } 146 return err 147 }) 148 if err != nil { 149 return nil, err 150 } 151 } 152 } 153 return w, nil 154} 155 156func main() { 157 flag.Parse() 158 159 if flag.NArg() < 1 { 160 log.Fatal("Please pass an fm binary as the first argument.") 161 } 162 fm := flag.Args()[0] 163 164 gms, err := listAll("--listGMs", fm) 165 if err != nil { 166 log.Fatalln("Could not query", fm, "for GMs:", err) 167 } 168 169 tests, err := listAll("--listTests", fm) 170 if err != nil { 171 log.Fatalln("Could not query", fm, "for tests:", err) 172 } 173 174 // One job can comes right on the command line, 175 // and any number can come one per line from -script. 176 jobs := [][]string{flag.Args()[1:]} 177 if *script != "" { 178 file := os.Stdin 179 if *script != "-" { 180 file, err = os.Open(*script) 181 if err != nil { 182 log.Fatal(err) 183 } 184 defer file.Close() 185 } 186 187 scanner := bufio.NewScanner(file) 188 for scanner.Scan() { 189 jobs = append(jobs, strings.Fields(scanner.Text())) 190 } 191 if err = scanner.Err(); err != nil { 192 log.Fatal(err) 193 } 194 } 195 196 wg := &sync.WaitGroup{} 197 var failures int32 = 0 198 199 worker := func(queue chan work) { 200 for w := range queue { 201 start := time.Now() 202 203 args := w.Flags[:] 204 args = append(args, "-s") 205 args = append(args, w.Sources...) 206 207 cmd := exec.Command(fm, args...) 208 output, err := cmd.CombinedOutput() 209 210 status := "#done" 211 if err != nil { 212 status = fmt.Sprintf("#failed (%v)", err) 213 214 if len(w.Sources) == 1 { 215 // If a source ran alone and failed, that's just a failure. 216 atomic.AddInt32(&failures, 1) 217 } else { 218 // If a batch of sources ran and failed, split them up and try again. 219 for _, source := range w.Sources { 220 wg.Add(1) 221 queue <- work{[]string{source}, w.Flags} 222 } 223 } 224 } 225 226 if !*quiet || (err != nil && len(w.Sources) == 1) { 227 log.Printf("\n%v %v in %v:\n%s", 228 strings.Join(cmd.Args, " "), status, time.Since(start), output) 229 } 230 231 wg.Done() 232 } 233 } 234 235 cpu := make(chan work, 1<<20) 236 for i := 0; i < *cpuLimit; i++ { 237 go worker(cpu) 238 } 239 240 gpu := make(chan work, 1<<20) 241 for i := 0; i < *gpuLimit; i++ { 242 go worker(gpu) 243 } 244 245 for _, job := range jobs { 246 // Skip blank lines, empty command lines. 247 if len(job) == 0 { 248 continue 249 } 250 251 w, err := parseWork(job, gms, tests) 252 if err != nil { 253 log.Fatal(err) 254 } 255 256 // Determine if this is CPU-bound or GPU-bound work, conservatively assuming GPU. 257 queue, limit := gpu, *gpuLimit 258 backend := "" 259 for i, flag := range w.Flags { 260 if flag == "-b" || flag == "--backend" { 261 backend = w.Flags[i+1] 262 } 263 } 264 cpuBackends := map[string]bool{ 265 "cpu": true, 266 "skp": true, 267 "pdf": true, 268 } 269 if cpuBackends[backend] { 270 queue, limit = cpu, *cpuLimit 271 } 272 273 if *random { 274 rand.Shuffle(len(w.Sources), func(i, j int) { 275 w.Sources[i], w.Sources[j] = w.Sources[j], w.Sources[i] 276 }) 277 } 278 279 // Round up so there's at least one source per batch. 280 sourcesPerBatch := (len(w.Sources) + limit - 1) / limit 281 282 for i := 0; i < len(w.Sources); i += sourcesPerBatch { 283 end := i + sourcesPerBatch 284 if end > len(w.Sources) { 285 end = len(w.Sources) 286 } 287 batch := w.Sources[i:end] 288 289 wg.Add(1) 290 queue <- work{batch, w.Flags} 291 } 292 } 293 294 wg.Wait() 295 296 if failures > 0 { 297 log.Fatalln(failures, "failures after retries") 298 } 299} 300