// Copyright 2020 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. package main import ( "bufio" "bytes" "context" "flag" "fmt" "math/rand" "net/http" "os" "path/filepath" "runtime" "sort" "strings" "sync" "sync/atomic" "go.skia.org/infra/go/exec" "go.skia.org/infra/go/util" "go.skia.org/infra/task_driver/go/td" ) func main() { var ( projectId = flag.String("project_id", "", "ID of the Google Cloud project.") taskId = flag.String("task_id", "", "ID of this task.") bot = flag.String("bot", "", "Name of the task.") output = flag.String("o", "", "Dump JSON step data to the given file, or stdout if -.") local = flag.Bool("local", true, "Running locally (else on the bots)?") resources = flag.String("resources", "resources", "Passed to fm -i.") imgs = flag.String("imgs", "", "Shorthand `directory` contents as 'imgs'.") skps = flag.String("skps", "", "Shorthand `directory` contents as 'skps'.") svgs = flag.String("svgs", "", "Shorthand `directory` contents as 'svgs'.") script = flag.String("script", "", "File (or - for stdin) with one job per line.") gold = flag.Bool("gold", false, "Fetch known hashes, upload to Gold, etc.?") goldHashesURL = flag.String("gold_hashes_url", "", "URL from which to download pre-existing hashes") ) flag.Parse() ctx := context.Background() startStep := func(ctx context.Context, _ *td.StepProperties) context.Context { return ctx } endStep := func(_ context.Context) {} failStep := func(_ context.Context, err error) error { fmt.Fprintln(os.Stderr, err) return err } fatal := func(ctx context.Context, err error) { failStep(ctx, err) os.Exit(1) } httpClient := func(_ context.Context) *http.Client { return http.DefaultClient } if !*local { ctx = td.StartRun(projectId, taskId, bot, output, local) defer td.EndRun(ctx) startStep = td.StartStep endStep = td.EndStep failStep = td.FailStep fatal = td.Fatal httpClient = func(ctx context.Context) *http.Client { return td.HttpClient(ctx, nil) } } if flag.NArg() < 1 { fatal(ctx, fmt.Errorf("Please pass an fm binary.")) } fm := flag.Arg(0) // Run `fm ` to find the names of all linked GMs or tests. query := func(flag string) []string { stdout := &bytes.Buffer{} cmd := &exec.Command{Name: fm, Stdout: stdout} cmd.Args = append(cmd.Args, "-i", *resources) cmd.Args = append(cmd.Args, flag) if err := exec.Run(ctx, cmd); err != nil { fatal(ctx, err) } lines := []string{} scanner := bufio.NewScanner(stdout) for scanner.Scan() { lines = append(lines, scanner.Text()) } if err := scanner.Err(); err != nil { fatal(ctx, err) } return lines } // Lowercase with leading '.' stripped. normalizedExt := func(s string) string { return strings.ToLower(filepath.Ext(s)[1:]) } // Walk directory for files with given set of extensions. walk := func(dir string, exts map[string]bool) (files []string) { if dir != "" { err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() && exts[normalizedExt(info.Name())] { files = append(files, path) } return nil }) if err != nil { fatal(ctx, err) } } return } rawExts := map[string]bool{ "arw": true, "cr2": true, "dng": true, "nef": true, "nrw": true, "orf": true, "pef": true, "raf": true, "rw2": true, "srw": true, } imgExts := map[string]bool{ "astc": true, "bmp": true, "gif": true, "ico": true, "jpeg": true, "jpg": true, "ktx": true, "png": true, "wbmp": true, "webp": true, } for k, v := range rawExts { imgExts[k] = v } // We can use "gm" or "gms" as shorthand to refer to all GMs, and similar for the rest. shorthands := map[string][]string{ "gm": query("--listGMs"), "test": query("--listTests"), "img": walk(*imgs, imgExts), "skp": walk(*skps, map[string]bool{"skp": true}), "svg": walk(*svgs, map[string]bool{"svg": true}), } for k, v := range shorthands { shorthands[k+"s"] = v } // Query Gold for all known hashes when running as a bot. known := map[string]bool{ "0832f708a97acc6da385446384647a8f": true, // MD5 of passing unit test. } if *gold { func() { resp, err := httpClient(ctx).Get(*goldHashesURL) if err != nil { fatal(ctx, err) } defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { known[scanner.Text()] = true } if err := scanner.Err(); err != nil { fatal(ctx, err) } fmt.Fprintf(os.Stdout, "Gold knew %v unique hashes.\n", len(known)) }() } // We'll pass `flag: value` as `--flag value` to FM. // Such a short type name makes it easy to write out literals. type F = map[string]string flatten := func(flags F) (flat []string) { // It's not strictly important that we sort, but it makes reading bot logs easier. keys := []string{} for k := range flags { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { v := flags[k] if v == "true" { flat = append(flat, "--"+k) } else if v == "false" { flat = append(flat, "--no"+k) } else if len(k) == 1 { flat = append(flat, "-"+k, v) } else { flat = append(flat, "--"+k, v) } } return } var worker func(context.Context, []string, F) int worker = func(ctx context.Context, sources []string, flags F) (failures int) { stdout := &bytes.Buffer{} stderr := &bytes.Buffer{} cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr} cmd.Args = append(cmd.Args, "-i", *resources) cmd.Args = append(cmd.Args, flatten(flags)...) cmd.Args = append(cmd.Args, "-s") cmd.Args = append(cmd.Args, sources...) // Run our FM command. err := exec.Run(ctx, cmd) // We'll rerun any source individually that didn't produce a known hash, i.e. // sources that crash, produce unknown hashes, or that an crash prevented from running. unknownHash := "" { // Start assuming we'll need to rerun everything. reruns := map[string]bool{} for _, name := range sources { reruns[name] = true } // Scan stdout for lines like " skipped" or " ??ms" // and exempt those sources from reruns if they were skipped or their hash is known. scanner := bufio.NewScanner(stdout) for scanner.Scan() { if parts := strings.Fields(scanner.Text()); len(parts) >= 2 { name, outcome := parts[0], parts[1] if *gold && outcome != "skipped" && !known[outcome] { unknownHash = outcome } else { delete(reruns, name) } } } if err := scanner.Err(); err != nil { fatal(ctx, err) } // Only rerun sources from a batch (or we'd rerun failures over and over and over). if len(sources) > 1 { for name := range reruns { failures += worker(ctx, []string{name}, flags) } return } } // If an individual run failed, nothing more to do but fail. if err != nil { failures += 1 lines := []string{} scanner := bufio.NewScanner(stderr) for scanner.Scan() { lines = append(lines, scanner.Text()) } if err := scanner.Err(); err != nil { fatal(ctx, err) } failStep(ctx, fmt.Errorf("%v #failed:\n\t%v\n", exec.DebugString(cmd), strings.Join(lines, "\n\t"))) return } // If an individual run succeeded but produced an unknown hash, TODO upload .png to Gold. // For now just print out the command and the hash it produced. if unknownHash != "" { fmt.Fprintf(os.Stdout, "%v #%v\n", exec.DebugString(cmd), unknownHash) } return } type Work struct { Ctx context.Context WG *sync.WaitGroup Failures *int32 Sources []string // Passed to FM -s: names of gms/tests, paths to images, .skps, etc. Flags F // Other flags to pass to FM: --ct 565, --msaa 16, etc. } queue := make(chan Work, 1<<20) // Arbitrarily huge buffer to avoid ever blocking. for i := 0; i < runtime.NumCPU(); i++ { go func() { for w := range queue { func() { defer w.WG.Done() // For organizational purposes, create a step representing this batch, // with the batch call to FM and any individual reruns all nested inside. ctx := startStep(w.Ctx, td.Props(strings.Join(w.Sources, " "))) defer endStep(ctx) if failures := worker(ctx, w.Sources, w.Flags); failures > 0 { atomic.AddInt32(w.Failures, int32(failures)) if !*local { // Uninteresting to see on local runs. failStep(ctx, fmt.Errorf("%v reruns failed\n", failures)) } } }() } }() } // Get some work going, first breaking it into batches to increase our parallelism. pendingKickoffs := &sync.WaitGroup{} var totalFailures int32 = 0 kickoff := func(sources []string, flags F) { if len(sources) == 0 { return // A blank or commented job line from -script or the command line. } pendingKickoffs.Add(1) // Shuffle the sources randomly as a cheap way to approximate evenly expensive batches. // (Intentionally not rand.Seed()'d to stay deterministically reproducible.) sources = append([]string{}, sources...) // We'll be needing our own copy... rand.Shuffle(len(sources), func(i, j int) { sources[i], sources[j] = sources[j], sources[i] }) // For organizational purposes, create a step representing this call to kickoff(), // with each batch of sources nested inside. ctx := startStep(ctx, td.Props(fmt.Sprintf("%s, %s…", strings.Join(flatten(flags), " "), sources[0]))) pendingBatches := &sync.WaitGroup{} failures := new(int32) // Arbitrary, nice to scale ~= cores. approxNumBatches := runtime.NumCPU() // Round up batch size to avoid empty batches, making approxNumBatches approximate. batchSize := (len(sources) + approxNumBatches - 1) / approxNumBatches util.ChunkIter(len(sources), batchSize, func(start, end int) error { pendingBatches.Add(1) queue <- Work{ctx, pendingBatches, failures, sources[start:end], flags} return nil }) // When the batches for this kickoff() are all done, this kickoff() is done. go func() { pendingBatches.Wait() if *failures > 0 { atomic.AddInt32(&totalFailures, *failures) if !*local { // Uninteresting to see on local runs. failStep(ctx, fmt.Errorf("%v total reruns failed\n", *failures)) } } endStep(ctx) pendingKickoffs.Done() }() } // Parse a job like "gms b=cpu ct=8888" into sources and flags for kickoff(). parse := func(job []string) (sources []string, flags F) { flags = make(F) for _, token := range job { // Everything after # is a comment. if strings.HasPrefix(token, "#") { break } // Expand "gm" or "gms" to all known GMs, or same for tests, images, skps, svgs. if vals, ok := shorthands[token]; ok { sources = append(sources, vals...) continue } // Is this a flag to pass through to FM? if parts := strings.Split(token, "="); len(parts) == 2 { flags[parts[0]] = parts[1] continue } // Anything else must be the name of a source for FM to run. sources = append(sources, token) } return } // Parse one job from the command line, handy for ad hoc local runs. kickoff(parse(flag.Args()[1:])) // Any number of jobs can come from -script. if *script != "" { file := os.Stdin if *script != "-" { file, err := os.Open(*script) if err != nil { fatal(ctx, err) } defer file.Close() } scanner := bufio.NewScanner(file) for scanner.Scan() { kickoff(parse(strings.Fields(scanner.Text()))) } if err := scanner.Err(); err != nil { fatal(ctx, err) } } // If we're a bot (or acting as if we are one), kick off its work. if *bot != "" { parts := strings.Split(*bot, "-") OS, model, CPU_or_GPU := parts[1], parts[3], parts[4] // Bots use portable fonts except where we explicitly opt-in to native fonts. defaultFlags := F{"nativeFonts": "false"} run := func(sources []string, extraFlags F) { // Default then extra to allow overriding the defaults. flags := F{} for k, v := range defaultFlags { flags[k] = v } for k, v := range extraFlags { flags[k] = v } kickoff(sources, flags) } gms := shorthands["gms"] imgs := shorthands["imgs"] svgs := shorthands["svgs"] skps := shorthands["skps"] tests := shorthands["tests"] filter := func(in []string, keep func(string) bool) (out []string) { for _, s := range in { if keep(s) { out = append(out, s) } } return } if strings.Contains(OS, "Win") { // We can't decode these formats on Windows. imgs = filter(imgs, func(s string) bool { return !rawExts[normalizedExt(s)] }) } if strings.Contains(*bot, "TSAN") { // Run each test a few times in parallel to uncover races. defaultFlags["race"] = "4" } if CPU_or_GPU == "CPU" { defaultFlags["b"] = "cpu" // FM's default ct/gamut/tf flags are equivalent to --config srgb in DM. run(gms, F{}) run(gms, F{"nativeFonts": "true"}) run(imgs, F{}) run(svgs, F{}) run(skps, F{"clipW": "1000", "clipH": "1000"}) run(tests, F{"race": "0"}) // Several unit tests are not reentrant. if model == "GCE" { run(gms, F{"ct": "g8", "legacy": "true"}) // --config g8 run(gms, F{"ct": "565", "legacy": "true"}) // --config 565 run(gms, F{"ct": "8888", "legacy": "true"}) // --config 8888 run(gms, F{"ct": "f16"}) // --config esrgb run(gms, F{"ct": "f16", "tf": "linear"}) // --config f16 run(gms, F{"ct": "8888", "gamut": "p3"}) // --config p3 run(gms, F{"ct": "8888", "gamut": "narrow", "tf": "2.2"}) // --config narrow run(gms, F{"ct": "f16", "gamut": "rec2020", "tf": "rec2020"}) // --config erec2020 run(gms, F{"skvm": "true"}) run(gms, F{"skvm": "true", "ct": "f16"}) run(imgs, F{ "decodeToDst": "true", "ct": "f16", "gamut": "rec2020", "tf": "rec2020"}) } // TODO: pic-8888 equivalent? // TODO: serialize-8888 equivalent? } } pendingKickoffs.Wait() if totalFailures > 0 { fatal(ctx, fmt.Errorf("%v runs of %v failed after retries.\n", totalFailures, fm)) } }