1// Copyright 2019 Google LLC. 2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. 3package main 4 5import ( 6 "bufio" 7 "bytes" 8 "flag" 9 "fmt" 10 "log" 11 "math/rand" 12 "os" 13 "os/exec" 14 "path/filepath" 15 "runtime" 16 "strings" 17 "sync" 18 "sync/atomic" 19 "time" 20) 21 22// Too many GPU processes and we'll start to overwhelm your GPU, 23// even hanging your machine in the worst case. Here's a reasonable default. 24func defaultGpuLimit() int { 25 limit := 8 26 if n := runtime.NumCPU(); n < limit { 27 return n 28 } 29 return limit 30} 31 32var script = flag.String("script", "", "A file with jobs to run, one per line. - for stdin.") 33var random = flag.Bool("random", true, "Assign sources into job batches randomly?") 34var quiet = flag.Bool("quiet", false, "Print only failures?") 35var exact = flag.Bool("exact", false, "Match GM names only exactly.") 36var cpuLimit = flag.Int("cpuLimit", runtime.NumCPU(), 37 "Maximum number of concurrent processes for CPU-bound work.") 38var gpuLimit = flag.Int("gpuLimit", defaultGpuLimit(), 39 "Maximum number of concurrent processes for GPU-bound work.") 40 41func init() { 42 flag.StringVar(script, "s", *script, "Alias for --script.") 43 flag.BoolVar(random, "r", *random, "Alias for --random.") 44 flag.BoolVar(quiet, "q", *quiet, "Alias for --quiet.") 45 flag.BoolVar(exact, "e", *exact, "Alias for --exact.") 46 flag.IntVar(cpuLimit, "c", *cpuLimit, "Alias for --cpuLimit.") 47 flag.IntVar(gpuLimit, "g", *gpuLimit, "Alias for --gpuLimit.") 48} 49 50func listAllGMs(fm string) (gms []string, err error) { 51 // Query fm binary for list of all available GMs by running with no arguments. 52 cmd := exec.Command(fm) 53 stdout, err := cmd.Output() 54 if err != nil { 55 return 56 } 57 // GM names are listed line-by-line. 58 scanner := bufio.NewScanner(bytes.NewReader(stdout)) 59 for scanner.Scan() { 60 gms = append(gms, scanner.Text()) 61 } 62 err = scanner.Err() 63 return 64} 65 66type work struct { 67 Sources []string 68 Flags []string 69} 70 71func parseWork(args []string, gms []string) (*work, error) { 72 w := &work{} 73 for _, arg := range args { 74 // I wish we could parse flags here too, but it's too late. 75 if strings.HasPrefix(arg, "-") { 76 msg := "Is '%s' an fm flag? If so please pass it using flag=value syntax." 77 if flag.Lookup(arg[1:]) != nil { 78 msg = "Please pass fm_bot flags like '%s' on the command line before the FM binary." 79 } 80 return nil, fmt.Errorf(msg, arg) 81 } 82 83 // Everything after a # is a comment. 84 if strings.HasPrefix(arg, "#") { 85 break 86 } 87 88 // Treat "gm" or "gms" as a shortcut for all known GMs. 89 if arg == "gm" || arg == "gms" { 90 w.Sources = append(w.Sources, gms...) 91 continue 92 } 93 94 // Is this an option to pass through to fm? 95 if parts := strings.Split(arg, "="); len(parts) == 2 { 96 f := "-" 97 if len(parts[0]) > 1 { 98 f += "-" 99 } 100 f += parts[0] 101 102 w.Flags = append(w.Flags, f, parts[1]) 103 continue 104 } 105 106 // Is this argument naming a GM? 107 matchedAnyGM := false 108 for _, gm := range gms { 109 if (*exact && gm == arg) || (!*exact && strings.Contains(gm, arg)) { 110 w.Sources = append(w.Sources, gm) 111 matchedAnyGM = true 112 } 113 } 114 if matchedAnyGM { 115 continue 116 } 117 118 // Anything left ought to be on the file system: a file, a directory, or a glob. 119 // Not all shells expand globs, so we'll do it here just in case. 120 matches, err := filepath.Glob(arg) 121 if err != nil { 122 return nil, err 123 } 124 if len(matches) == 0 { 125 return nil, fmt.Errorf("Don't understand '%s'.", arg) 126 } 127 128 for _, match := range matches { 129 err := filepath.Walk(match, func(path string, info os.FileInfo, err error) error { 130 if !info.IsDir() { 131 w.Sources = append(w.Sources, path) 132 } 133 return err 134 }) 135 if err != nil { 136 return nil, err 137 } 138 } 139 } 140 return w, nil 141} 142 143func main() { 144 flag.Parse() 145 146 if flag.NArg() < 1 { 147 log.Fatal("Please pass an fm binary as the first argument.") 148 } 149 fm := flag.Args()[0] 150 151 gms, err := listAllGMs(fm) 152 if err != nil { 153 log.Fatalln("Could not query", fm, "for GMs:", err) 154 } 155 156 // One job can comes right on the command line, 157 // and any number can come one per line from -script. 158 jobs := [][]string{flag.Args()[1:]} 159 if *script != "" { 160 file := os.Stdin 161 if *script != "-" { 162 file, err = os.Open(*script) 163 if err != nil { 164 log.Fatal(err) 165 } 166 defer file.Close() 167 } 168 169 scanner := bufio.NewScanner(file) 170 for scanner.Scan() { 171 jobs = append(jobs, strings.Fields(scanner.Text())) 172 } 173 if err = scanner.Err(); err != nil { 174 log.Fatal(err) 175 } 176 } 177 178 wg := &sync.WaitGroup{} 179 var failures int32 = 0 180 181 worker := func(queue chan work) { 182 for w := range queue { 183 start := time.Now() 184 185 args := w.Flags[:] 186 args = append(args, "-s") 187 args = append(args, w.Sources...) 188 189 cmd := exec.Command(fm, args...) 190 output, err := cmd.CombinedOutput() 191 192 status := "#done" 193 if err != nil { 194 status = fmt.Sprintf("#failed (%v)", err) 195 196 if len(w.Sources) == 1 { 197 // If a source ran alone and failed, that's just a failure. 198 atomic.AddInt32(&failures, 1) 199 } else { 200 // If a batch of sources ran and failed, split them up and try again. 201 for _, source := range w.Sources { 202 wg.Add(1) 203 queue <- work{[]string{source}, w.Flags} 204 } 205 } 206 } 207 208 if !*quiet || (err != nil && len(w.Sources) == 1) { 209 log.Printf("\n%v %v in %v:\n%s", 210 strings.Join(cmd.Args, " "), status, time.Since(start), output) 211 } 212 213 wg.Done() 214 } 215 } 216 217 cpu := make(chan work, 1<<20) 218 for i := 0; i < *cpuLimit; i++ { 219 go worker(cpu) 220 } 221 222 gpu := make(chan work, 1<<20) 223 for i := 0; i < *gpuLimit; i++ { 224 go worker(gpu) 225 } 226 227 for _, job := range jobs { 228 // Skip blank lines, empty command lines. 229 if len(job) == 0 { 230 continue 231 } 232 233 w, err := parseWork(job, gms) 234 if err != nil { 235 log.Fatal(err) 236 } 237 238 // Determine if this is CPU-bound or GPU-bound work, conservatively assuming GPU. 239 queue, limit := gpu, *gpuLimit 240 backend := "" 241 for i, flag := range w.Flags { 242 if flag == "-b" || flag == "--backend" { 243 backend = w.Flags[i+1] 244 } 245 } 246 whitelisted := map[string]bool{ 247 "cpu": true, 248 "skp": true, 249 "pdf": true, 250 } 251 if whitelisted[backend] { 252 queue, limit = cpu, *cpuLimit 253 } 254 255 if *random { 256 rand.Shuffle(len(w.Sources), func(i, j int) { 257 w.Sources[i], w.Sources[j] = w.Sources[j], w.Sources[i] 258 }) 259 } 260 261 // Round up so there's at least one source per batch. 262 sourcesPerBatch := (len(w.Sources) + limit - 1) / limit 263 264 for i := 0; i < len(w.Sources); i += sourcesPerBatch { 265 end := i + sourcesPerBatch 266 if end > len(w.Sources) { 267 end = len(w.Sources) 268 } 269 batch := w.Sources[i:end] 270 271 wg.Add(1) 272 queue <- work{batch, w.Flags} 273 } 274 } 275 276 wg.Wait() 277 278 if failures > 0 { 279 log.Fatalln(failures, "failures after retries") 280 } 281} 282