1// Copyright 2020 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5package main 6 7import ( 8 "bufio" 9 "bytes" 10 "context" 11 "flag" 12 "fmt" 13 "math/rand" 14 "net/http" 15 "os" 16 "path/filepath" 17 "runtime" 18 "sort" 19 "strings" 20 "sync" 21 "sync/atomic" 22 23 "go.skia.org/infra/go/exec" 24 "go.skia.org/infra/go/util" 25 "go.skia.org/infra/task_driver/go/td" 26) 27 28func main() { 29 var ( 30 projectId = flag.String("project_id", "", "ID of the Google Cloud project.") 31 taskId = flag.String("task_id", "", "ID of this task.") 32 bot = flag.String("bot", "", "Name of the task.") 33 output = flag.String("o", "", "Dump JSON step data to the given file, or stdout if -.") 34 local = flag.Bool("local", true, "Running locally (else on the bots)?") 35 36 resources = flag.String("resources", "resources", "Passed to fm -i.") 37 imgs = flag.String("imgs", "", "Shorthand `directory` contents as 'imgs'.") 38 skps = flag.String("skps", "", "Shorthand `directory` contents as 'skps'.") 39 svgs = flag.String("svgs", "", "Shorthand `directory` contents as 'svgs'.") 40 script = flag.String("script", "", "File (or - for stdin) with one job per line.") 41 gold = flag.Bool("gold", false, "Fetch known hashes, upload to Gold, etc.?") 42 goldHashesURL = flag.String("gold_hashes_url", "", "URL from which to download pre-existing hashes") 43 ) 44 flag.Parse() 45 46 ctx := context.Background() 47 startStep := func(ctx context.Context, _ *td.StepProperties) context.Context { return ctx } 48 endStep := func(_ context.Context) {} 49 failStep := func(_ context.Context, err error) error { 50 fmt.Fprintln(os.Stderr, err) 51 return err 52 } 53 fatal := func(ctx context.Context, err error) { 54 failStep(ctx, err) 55 os.Exit(1) 56 } 57 httpClient := func(_ context.Context) *http.Client { return http.DefaultClient } 58 59 if !*local { 60 ctx = td.StartRun(projectId, taskId, bot, output, local) 61 defer td.EndRun(ctx) 62 startStep = td.StartStep 63 endStep = td.EndStep 64 failStep = td.FailStep 65 fatal = td.Fatal 66 httpClient = func(ctx context.Context) *http.Client { return td.HttpClient(ctx, nil) } 67 } 68 69 if flag.NArg() < 1 { 70 fatal(ctx, fmt.Errorf("Please pass an fm binary.")) 71 } 72 fm := flag.Arg(0) 73 74 // Run `fm <flag>` to find the names of all linked GMs or tests. 75 query := func(flag string) []string { 76 stdout := &bytes.Buffer{} 77 cmd := &exec.Command{Name: fm, Stdout: stdout} 78 cmd.Args = append(cmd.Args, "-i", *resources) 79 cmd.Args = append(cmd.Args, flag) 80 if err := exec.Run(ctx, cmd); err != nil { 81 fatal(ctx, err) 82 } 83 84 lines := []string{} 85 scanner := bufio.NewScanner(stdout) 86 for scanner.Scan() { 87 lines = append(lines, scanner.Text()) 88 } 89 if err := scanner.Err(); err != nil { 90 fatal(ctx, err) 91 } 92 return lines 93 } 94 95 // Lowercase with leading '.' stripped. 96 normalizedExt := func(s string) string { 97 return strings.ToLower(filepath.Ext(s)[1:]) 98 } 99 100 // Walk directory for files with given set of extensions. 101 walk := func(dir string, exts map[string]bool) (files []string) { 102 if dir != "" { 103 err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { 104 if err != nil { 105 return err 106 } 107 if !info.IsDir() && exts[normalizedExt(info.Name())] { 108 files = append(files, path) 109 } 110 return nil 111 }) 112 113 if err != nil { 114 fatal(ctx, err) 115 } 116 } 117 return 118 } 119 120 rawExts := map[string]bool{ 121 "arw": true, 122 "cr2": true, 123 "dng": true, 124 "nef": true, 125 "nrw": true, 126 "orf": true, 127 "pef": true, 128 "raf": true, 129 "rw2": true, 130 "srw": true, 131 } 132 imgExts := map[string]bool{ 133 "astc": true, 134 "bmp": true, 135 "gif": true, 136 "ico": true, 137 "jpeg": true, 138 "jpg": true, 139 "ktx": true, 140 "png": true, 141 "wbmp": true, 142 "webp": true, 143 } 144 for k, v := range rawExts { 145 imgExts[k] = v 146 } 147 148 // We can use "gm" or "gms" as shorthand to refer to all GMs, and similar for the rest. 149 shorthands := map[string][]string{ 150 "gm": query("--listGMs"), 151 "test": query("--listTests"), 152 "img": walk(*imgs, imgExts), 153 "skp": walk(*skps, map[string]bool{"skp": true}), 154 "svg": walk(*svgs, map[string]bool{"svg": true}), 155 } 156 for k, v := range shorthands { 157 shorthands[k+"s"] = v 158 } 159 160 // Query Gold for all known hashes when running as a bot. 161 known := map[string]bool{ 162 "0832f708a97acc6da385446384647a8f": true, // MD5 of passing unit test. 163 } 164 if *gold { 165 func() { 166 resp, err := httpClient(ctx).Get(*goldHashesURL) 167 if err != nil { 168 fatal(ctx, err) 169 } 170 defer resp.Body.Close() 171 172 scanner := bufio.NewScanner(resp.Body) 173 for scanner.Scan() { 174 known[scanner.Text()] = true 175 } 176 if err := scanner.Err(); err != nil { 177 fatal(ctx, err) 178 } 179 180 fmt.Fprintf(os.Stdout, "Gold knew %v unique hashes.\n", len(known)) 181 }() 182 } 183 184 // We'll pass `flag: value` as `--flag value` to FM. 185 // Such a short type name makes it easy to write out literals. 186 type F = map[string]string 187 188 flatten := func(flags F) (flat []string) { 189 // It's not strictly important that we sort, but it makes reading bot logs easier. 190 keys := []string{} 191 for k := range flags { 192 keys = append(keys, k) 193 } 194 sort.Strings(keys) 195 196 for _, k := range keys { 197 v := flags[k] 198 199 if v == "true" { 200 flat = append(flat, "--"+k) 201 } else if v == "false" { 202 flat = append(flat, "--no"+k) 203 } else if len(k) == 1 { 204 flat = append(flat, "-"+k, v) 205 } else { 206 flat = append(flat, "--"+k, v) 207 } 208 } 209 return 210 } 211 212 var worker func(context.Context, []string, F) int 213 worker = func(ctx context.Context, sources []string, flags F) (failures int) { 214 stdout := &bytes.Buffer{} 215 stderr := &bytes.Buffer{} 216 cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr} 217 cmd.Args = append(cmd.Args, "-i", *resources) 218 cmd.Args = append(cmd.Args, flatten(flags)...) 219 cmd.Args = append(cmd.Args, "-s") 220 cmd.Args = append(cmd.Args, sources...) 221 222 // Run our FM command. 223 err := exec.Run(ctx, cmd) 224 225 // We'll rerun any source individually that didn't produce a known hash, i.e. 226 // sources that crash, produce unknown hashes, or that an crash prevented from running. 227 unknownHash := "" 228 { 229 // Start assuming we'll need to rerun everything. 230 reruns := map[string]bool{} 231 for _, name := range sources { 232 reruns[name] = true 233 } 234 235 // Scan stdout for lines like "<name> skipped" or "<name> <hash> ??ms" 236 // and exempt those sources from reruns if they were skipped or their hash is known. 237 scanner := bufio.NewScanner(stdout) 238 for scanner.Scan() { 239 if parts := strings.Fields(scanner.Text()); len(parts) >= 2 { 240 name, outcome := parts[0], parts[1] 241 if *gold && outcome != "skipped" && !known[outcome] { 242 unknownHash = outcome 243 } else { 244 delete(reruns, name) 245 } 246 } 247 } 248 if err := scanner.Err(); err != nil { 249 fatal(ctx, err) 250 } 251 252 // Only rerun sources from a batch (or we'd rerun failures over and over and over). 253 if len(sources) > 1 { 254 for name := range reruns { 255 failures += worker(ctx, []string{name}, flags) 256 } 257 return 258 } 259 } 260 261 // If an individual run failed, nothing more to do but fail. 262 if err != nil { 263 failures += 1 264 265 lines := []string{} 266 scanner := bufio.NewScanner(stderr) 267 for scanner.Scan() { 268 lines = append(lines, scanner.Text()) 269 } 270 if err := scanner.Err(); err != nil { 271 fatal(ctx, err) 272 } 273 274 failStep(ctx, fmt.Errorf("%v #failed:\n\t%v\n", 275 exec.DebugString(cmd), 276 strings.Join(lines, "\n\t"))) 277 278 return 279 } 280 281 // If an individual run succeeded but produced an unknown hash, TODO upload .png to Gold. 282 // For now just print out the command and the hash it produced. 283 if unknownHash != "" { 284 fmt.Fprintf(os.Stdout, "%v #%v\n", 285 exec.DebugString(cmd), 286 unknownHash) 287 } 288 return 289 } 290 291 type Work struct { 292 Ctx context.Context 293 WG *sync.WaitGroup 294 Failures *int32 295 Sources []string // Passed to FM -s: names of gms/tests, paths to images, .skps, etc. 296 Flags F // Other flags to pass to FM: --ct 565, --msaa 16, etc. 297 } 298 queue := make(chan Work, 1<<20) // Arbitrarily huge buffer to avoid ever blocking. 299 300 for i := 0; i < runtime.NumCPU(); i++ { 301 go func() { 302 for w := range queue { 303 func() { 304 defer w.WG.Done() 305 // For organizational purposes, create a step representing this batch, 306 // with the batch call to FM and any individual reruns all nested inside. 307 ctx := startStep(w.Ctx, td.Props(strings.Join(w.Sources, " "))) 308 defer endStep(ctx) 309 if failures := worker(ctx, w.Sources, w.Flags); failures > 0 { 310 atomic.AddInt32(w.Failures, int32(failures)) 311 if !*local { // Uninteresting to see on local runs. 312 failStep(ctx, fmt.Errorf("%v reruns failed\n", failures)) 313 } 314 } 315 }() 316 } 317 }() 318 } 319 320 // Get some work going, first breaking it into batches to increase our parallelism. 321 pendingKickoffs := &sync.WaitGroup{} 322 var totalFailures int32 = 0 323 324 kickoff := func(sources []string, flags F) { 325 if len(sources) == 0 { 326 return // A blank or commented job line from -script or the command line. 327 } 328 pendingKickoffs.Add(1) 329 330 // Shuffle the sources randomly as a cheap way to approximate evenly expensive batches. 331 // (Intentionally not rand.Seed()'d to stay deterministically reproducible.) 332 sources = append([]string{}, sources...) // We'll be needing our own copy... 333 rand.Shuffle(len(sources), func(i, j int) { 334 sources[i], sources[j] = sources[j], sources[i] 335 }) 336 337 // For organizational purposes, create a step representing this call to kickoff(), 338 // with each batch of sources nested inside. 339 ctx := startStep(ctx, 340 td.Props(fmt.Sprintf("%s, %s…", strings.Join(flatten(flags), " "), sources[0]))) 341 pendingBatches := &sync.WaitGroup{} 342 failures := new(int32) 343 344 // Arbitrary, nice to scale ~= cores. 345 approxNumBatches := runtime.NumCPU() 346 347 // Round up batch size to avoid empty batches, making approxNumBatches approximate. 348 batchSize := (len(sources) + approxNumBatches - 1) / approxNumBatches 349 350 util.ChunkIter(len(sources), batchSize, func(start, end int) error { 351 pendingBatches.Add(1) 352 queue <- Work{ctx, pendingBatches, failures, sources[start:end], flags} 353 return nil 354 }) 355 356 // When the batches for this kickoff() are all done, this kickoff() is done. 357 go func() { 358 pendingBatches.Wait() 359 if *failures > 0 { 360 atomic.AddInt32(&totalFailures, *failures) 361 if !*local { // Uninteresting to see on local runs. 362 failStep(ctx, fmt.Errorf("%v total reruns failed\n", *failures)) 363 } 364 } 365 endStep(ctx) 366 pendingKickoffs.Done() 367 }() 368 } 369 370 // Parse a job like "gms b=cpu ct=8888" into sources and flags for kickoff(). 371 parse := func(job []string) (sources []string, flags F) { 372 flags = make(F) 373 for _, token := range job { 374 // Everything after # is a comment. 375 if strings.HasPrefix(token, "#") { 376 break 377 } 378 379 // Expand "gm" or "gms" to all known GMs, or same for tests, images, skps, svgs. 380 if vals, ok := shorthands[token]; ok { 381 sources = append(sources, vals...) 382 continue 383 } 384 385 // Is this a flag to pass through to FM? 386 if parts := strings.Split(token, "="); len(parts) == 2 { 387 flags[parts[0]] = parts[1] 388 continue 389 } 390 391 // Anything else must be the name of a source for FM to run. 392 sources = append(sources, token) 393 } 394 return 395 } 396 397 // Parse one job from the command line, handy for ad hoc local runs. 398 kickoff(parse(flag.Args()[1:])) 399 400 // Any number of jobs can come from -script. 401 if *script != "" { 402 file := os.Stdin 403 if *script != "-" { 404 file, err := os.Open(*script) 405 if err != nil { 406 fatal(ctx, err) 407 } 408 defer file.Close() 409 } 410 scanner := bufio.NewScanner(file) 411 for scanner.Scan() { 412 kickoff(parse(strings.Fields(scanner.Text()))) 413 } 414 if err := scanner.Err(); err != nil { 415 fatal(ctx, err) 416 } 417 } 418 419 // If we're a bot (or acting as if we are one), kick off its work. 420 if *bot != "" { 421 parts := strings.Split(*bot, "-") 422 OS, model, CPU_or_GPU := parts[1], parts[3], parts[4] 423 424 // Bots use portable fonts except where we explicitly opt-in to native fonts. 425 defaultFlags := F{"nativeFonts": "false"} 426 427 run := func(sources []string, extraFlags F) { 428 // Default then extra to allow overriding the defaults. 429 flags := F{} 430 for k, v := range defaultFlags { 431 flags[k] = v 432 } 433 for k, v := range extraFlags { 434 flags[k] = v 435 } 436 kickoff(sources, flags) 437 } 438 439 gms := shorthands["gms"] 440 imgs := shorthands["imgs"] 441 svgs := shorthands["svgs"] 442 skps := shorthands["skps"] 443 tests := shorthands["tests"] 444 445 filter := func(in []string, keep func(string) bool) (out []string) { 446 for _, s := range in { 447 if keep(s) { 448 out = append(out, s) 449 } 450 } 451 return 452 } 453 454 if strings.Contains(OS, "Win") { 455 // We can't decode these formats on Windows. 456 imgs = filter(imgs, func(s string) bool { return !rawExts[normalizedExt(s)] }) 457 } 458 459 if strings.Contains(*bot, "TSAN") { 460 // Run each test a few times in parallel to uncover races. 461 defaultFlags["race"] = "4" 462 } 463 464 if CPU_or_GPU == "CPU" { 465 defaultFlags["b"] = "cpu" 466 467 // FM's default ct/gamut/tf flags are equivalent to --config srgb in DM. 468 run(gms, F{}) 469 run(gms, F{"nativeFonts": "true"}) 470 run(imgs, F{}) 471 run(svgs, F{}) 472 run(skps, F{"clipW": "1000", "clipH": "1000"}) 473 run(tests, F{"race": "0"}) // Several unit tests are not reentrant. 474 475 if model == "GCE" { 476 run(gms, F{"ct": "g8", "legacy": "true"}) // --config g8 477 run(gms, F{"ct": "565", "legacy": "true"}) // --config 565 478 run(gms, F{"ct": "8888", "legacy": "true"}) // --config 8888 479 run(gms, F{"ct": "f16"}) // --config esrgb 480 run(gms, F{"ct": "f16", "tf": "linear"}) // --config f16 481 run(gms, F{"ct": "8888", "gamut": "p3"}) // --config p3 482 run(gms, F{"ct": "8888", "gamut": "narrow", "tf": "2.2"}) // --config narrow 483 run(gms, F{"ct": "f16", "gamut": "rec2020", "tf": "rec2020"}) // --config erec2020 484 485 run(gms, F{"skvm": "true"}) 486 run(gms, F{"skvm": "true", "ct": "f16"}) 487 488 run(imgs, F{ 489 "decodeToDst": "true", 490 "ct": "f16", 491 "gamut": "rec2020", 492 "tf": "rec2020"}) 493 } 494 495 // TODO: pic-8888 equivalent? 496 // TODO: serialize-8888 equivalent? 497 } 498 } 499 500 pendingKickoffs.Wait() 501 if totalFailures > 0 { 502 fatal(ctx, fmt.Errorf("%v runs of %v failed after retries.\n", totalFailures, fm)) 503 } 504} 505