• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2020 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5package main
6
7import (
8	"bufio"
9	"bytes"
10	"context"
11	"flag"
12	"fmt"
13	"math/rand"
14	"net/http"
15	"os"
16	"path/filepath"
17	"runtime"
18	"sort"
19	"strings"
20	"sync"
21	"sync/atomic"
22
23	"go.skia.org/infra/go/exec"
24	"go.skia.org/infra/go/util"
25	"go.skia.org/infra/task_driver/go/td"
26)
27
28func main() {
29	var (
30		projectId = flag.String("project_id", "", "ID of the Google Cloud project.")
31		taskId    = flag.String("task_id", "", "ID of this task.")
32		bot       = flag.String("bot", "", "Name of the task.")
33		output    = flag.String("o", "", "Dump JSON step data to the given file, or stdout if -.")
34		local     = flag.Bool("local", true, "Running locally (else on the bots)?")
35
36		resources     = flag.String("resources", "resources", "Passed to fm -i.")
37		imgs          = flag.String("imgs", "", "Shorthand `directory` contents as 'imgs'.")
38		skps          = flag.String("skps", "", "Shorthand `directory` contents as 'skps'.")
39		svgs          = flag.String("svgs", "", "Shorthand `directory` contents as 'svgs'.")
40		script        = flag.String("script", "", "File (or - for stdin) with one job per line.")
41		gold          = flag.Bool("gold", false, "Fetch known hashes, upload to Gold, etc.?")
42		goldHashesURL = flag.String("gold_hashes_url", "", "URL from which to download pre-existing hashes")
43	)
44	flag.Parse()
45
46	ctx := context.Background()
47	startStep := func(ctx context.Context, _ *td.StepProperties) context.Context { return ctx }
48	endStep := func(_ context.Context) {}
49	failStep := func(_ context.Context, err error) error {
50		fmt.Fprintln(os.Stderr, err)
51		return err
52	}
53	fatal := func(ctx context.Context, err error) {
54		failStep(ctx, err)
55		os.Exit(1)
56	}
57	httpClient := func(_ context.Context) *http.Client { return http.DefaultClient }
58
59	if !*local {
60		ctx = td.StartRun(projectId, taskId, bot, output, local)
61		defer td.EndRun(ctx)
62		startStep = td.StartStep
63		endStep = td.EndStep
64		failStep = td.FailStep
65		fatal = td.Fatal
66		httpClient = func(ctx context.Context) *http.Client { return td.HttpClient(ctx, nil) }
67	}
68
69	if flag.NArg() < 1 {
70		fatal(ctx, fmt.Errorf("Please pass an fm binary."))
71	}
72	fm := flag.Arg(0)
73
74	// Run `fm <flag>` to find the names of all linked GMs or tests.
75	query := func(flag string) []string {
76		stdout := &bytes.Buffer{}
77		cmd := &exec.Command{Name: fm, Stdout: stdout}
78		cmd.Args = append(cmd.Args, "-i", *resources)
79		cmd.Args = append(cmd.Args, flag)
80		if err := exec.Run(ctx, cmd); err != nil {
81			fatal(ctx, err)
82		}
83
84		lines := []string{}
85		scanner := bufio.NewScanner(stdout)
86		for scanner.Scan() {
87			lines = append(lines, scanner.Text())
88		}
89		if err := scanner.Err(); err != nil {
90			fatal(ctx, err)
91		}
92		return lines
93	}
94
95	// Lowercase with leading '.' stripped.
96	normalizedExt := func(s string) string {
97		return strings.ToLower(filepath.Ext(s)[1:])
98	}
99
100	// Walk directory for files with given set of extensions.
101	walk := func(dir string, exts map[string]bool) (files []string) {
102		if dir != "" {
103			err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
104				if err != nil {
105					return err
106				}
107				if !info.IsDir() && exts[normalizedExt(info.Name())] {
108					files = append(files, path)
109				}
110				return nil
111			})
112
113			if err != nil {
114				fatal(ctx, err)
115			}
116		}
117		return
118	}
119
120	rawExts := map[string]bool{
121		"arw": true,
122		"cr2": true,
123		"dng": true,
124		"nef": true,
125		"nrw": true,
126		"orf": true,
127		"pef": true,
128		"raf": true,
129		"rw2": true,
130		"srw": true,
131	}
132	imgExts := map[string]bool{
133		"astc": true,
134		"bmp":  true,
135		"gif":  true,
136		"ico":  true,
137		"jpeg": true,
138		"jpg":  true,
139		"ktx":  true,
140		"png":  true,
141		"wbmp": true,
142		"webp": true,
143	}
144	for k, v := range rawExts {
145		imgExts[k] = v
146	}
147
148	// We can use "gm" or "gms" as shorthand to refer to all GMs, and similar for the rest.
149	shorthands := map[string][]string{
150		"gm":   query("--listGMs"),
151		"test": query("--listTests"),
152		"img":  walk(*imgs, imgExts),
153		"skp":  walk(*skps, map[string]bool{"skp": true}),
154		"svg":  walk(*svgs, map[string]bool{"svg": true}),
155	}
156	for k, v := range shorthands {
157		shorthands[k+"s"] = v
158	}
159
160	// Query Gold for all known hashes when running as a bot.
161	known := map[string]bool{
162		"0832f708a97acc6da385446384647a8f": true, // MD5 of passing unit test.
163	}
164	if *gold {
165		func() {
166			resp, err := httpClient(ctx).Get(*goldHashesURL)
167			if err != nil {
168				fatal(ctx, err)
169			}
170			defer resp.Body.Close()
171
172			scanner := bufio.NewScanner(resp.Body)
173			for scanner.Scan() {
174				known[scanner.Text()] = true
175			}
176			if err := scanner.Err(); err != nil {
177				fatal(ctx, err)
178			}
179
180			fmt.Fprintf(os.Stdout, "Gold knew %v unique hashes.\n", len(known))
181		}()
182	}
183
184	// We'll pass `flag: value` as `--flag value` to FM.
185	// Such a short type name makes it easy to write out literals.
186	type F = map[string]string
187
188	flatten := func(flags F) (flat []string) {
189		// It's not strictly important that we sort, but it makes reading bot logs easier.
190		keys := []string{}
191		for k := range flags {
192			keys = append(keys, k)
193		}
194		sort.Strings(keys)
195
196		for _, k := range keys {
197			v := flags[k]
198
199			if v == "true" {
200				flat = append(flat, "--"+k)
201			} else if v == "false" {
202				flat = append(flat, "--no"+k)
203			} else if len(k) == 1 {
204				flat = append(flat, "-"+k, v)
205			} else {
206				flat = append(flat, "--"+k, v)
207			}
208		}
209		return
210	}
211
212	var worker func(context.Context, []string, F) int
213	worker = func(ctx context.Context, sources []string, flags F) (failures int) {
214		stdout := &bytes.Buffer{}
215		stderr := &bytes.Buffer{}
216		cmd := &exec.Command{Name: fm, Stdout: stdout, Stderr: stderr}
217		cmd.Args = append(cmd.Args, "-i", *resources)
218		cmd.Args = append(cmd.Args, flatten(flags)...)
219		cmd.Args = append(cmd.Args, "-s")
220		cmd.Args = append(cmd.Args, sources...)
221
222		// Run our FM command.
223		err := exec.Run(ctx, cmd)
224
225		// We'll rerun any source individually that didn't produce a known hash, i.e.
226		// sources that crash, produce unknown hashes, or that an crash prevented from running.
227		unknownHash := ""
228		{
229			// Start assuming we'll need to rerun everything.
230			reruns := map[string]bool{}
231			for _, name := range sources {
232				reruns[name] = true
233			}
234
235			// Scan stdout for lines like "<name> skipped" or "<name> <hash> ??ms"
236			// and exempt those sources from reruns if they were skipped or their hash is known.
237			scanner := bufio.NewScanner(stdout)
238			for scanner.Scan() {
239				if parts := strings.Fields(scanner.Text()); len(parts) >= 2 {
240					name, outcome := parts[0], parts[1]
241					if *gold && outcome != "skipped" && !known[outcome] {
242						unknownHash = outcome
243					} else {
244						delete(reruns, name)
245					}
246				}
247			}
248			if err := scanner.Err(); err != nil {
249				fatal(ctx, err)
250			}
251
252			// Only rerun sources from a batch (or we'd rerun failures over and over and over).
253			if len(sources) > 1 {
254				for name := range reruns {
255					failures += worker(ctx, []string{name}, flags)
256				}
257				return
258			}
259		}
260
261		// If an individual run failed, nothing more to do but fail.
262		if err != nil {
263			failures += 1
264
265			lines := []string{}
266			scanner := bufio.NewScanner(stderr)
267			for scanner.Scan() {
268				lines = append(lines, scanner.Text())
269			}
270			if err := scanner.Err(); err != nil {
271				fatal(ctx, err)
272			}
273
274			failStep(ctx, fmt.Errorf("%v #failed:\n\t%v\n",
275				exec.DebugString(cmd),
276				strings.Join(lines, "\n\t")))
277
278			return
279		}
280
281		// If an individual run succeeded but produced an unknown hash, TODO upload .png to Gold.
282		// For now just print out the command and the hash it produced.
283		if unknownHash != "" {
284			fmt.Fprintf(os.Stdout, "%v #%v\n",
285				exec.DebugString(cmd),
286				unknownHash)
287		}
288		return
289	}
290
291	type Work struct {
292		Ctx      context.Context
293		WG       *sync.WaitGroup
294		Failures *int32
295		Sources  []string // Passed to FM -s: names of gms/tests, paths to images, .skps, etc.
296		Flags    F        // Other flags to pass to FM: --ct 565, --msaa 16, etc.
297	}
298	queue := make(chan Work, 1<<20) // Arbitrarily huge buffer to avoid ever blocking.
299
300	for i := 0; i < runtime.NumCPU(); i++ {
301		go func() {
302			for w := range queue {
303				func() {
304					defer w.WG.Done()
305					// For organizational purposes, create a step representing this batch,
306					// with the batch call to FM and any individual reruns all nested inside.
307					ctx := startStep(w.Ctx, td.Props(strings.Join(w.Sources, " ")))
308					defer endStep(ctx)
309					if failures := worker(ctx, w.Sources, w.Flags); failures > 0 {
310						atomic.AddInt32(w.Failures, int32(failures))
311						if !*local { // Uninteresting to see on local runs.
312							failStep(ctx, fmt.Errorf("%v reruns failed\n", failures))
313						}
314					}
315				}()
316			}
317		}()
318	}
319
320	// Get some work going, first breaking it into batches to increase our parallelism.
321	pendingKickoffs := &sync.WaitGroup{}
322	var totalFailures int32 = 0
323
324	kickoff := func(sources []string, flags F) {
325		if len(sources) == 0 {
326			return // A blank or commented job line from -script or the command line.
327		}
328		pendingKickoffs.Add(1)
329
330		// Shuffle the sources randomly as a cheap way to approximate evenly expensive batches.
331		// (Intentionally not rand.Seed()'d to stay deterministically reproducible.)
332		sources = append([]string{}, sources...) // We'll be needing our own copy...
333		rand.Shuffle(len(sources), func(i, j int) {
334			sources[i], sources[j] = sources[j], sources[i]
335		})
336
337		// For organizational purposes, create a step representing this call to kickoff(),
338		// with each batch of sources nested inside.
339		ctx := startStep(ctx,
340			td.Props(fmt.Sprintf("%s, %s…", strings.Join(flatten(flags), " "), sources[0])))
341		pendingBatches := &sync.WaitGroup{}
342		failures := new(int32)
343
344		// Arbitrary, nice to scale ~= cores.
345		approxNumBatches := runtime.NumCPU()
346
347		// Round up batch size to avoid empty batches, making approxNumBatches approximate.
348		batchSize := (len(sources) + approxNumBatches - 1) / approxNumBatches
349
350		util.ChunkIter(len(sources), batchSize, func(start, end int) error {
351			pendingBatches.Add(1)
352			queue <- Work{ctx, pendingBatches, failures, sources[start:end], flags}
353			return nil
354		})
355
356		// When the batches for this kickoff() are all done, this kickoff() is done.
357		go func() {
358			pendingBatches.Wait()
359			if *failures > 0 {
360				atomic.AddInt32(&totalFailures, *failures)
361				if !*local { // Uninteresting to see on local runs.
362					failStep(ctx, fmt.Errorf("%v total reruns failed\n", *failures))
363				}
364			}
365			endStep(ctx)
366			pendingKickoffs.Done()
367		}()
368	}
369
370	// Parse a job like "gms b=cpu ct=8888" into sources and flags for kickoff().
371	parse := func(job []string) (sources []string, flags F) {
372		flags = make(F)
373		for _, token := range job {
374			// Everything after # is a comment.
375			if strings.HasPrefix(token, "#") {
376				break
377			}
378
379			// Expand "gm" or "gms"  to all known GMs, or same for tests, images, skps, svgs.
380			if vals, ok := shorthands[token]; ok {
381				sources = append(sources, vals...)
382				continue
383			}
384
385			// Is this a flag to pass through to FM?
386			if parts := strings.Split(token, "="); len(parts) == 2 {
387				flags[parts[0]] = parts[1]
388				continue
389			}
390
391			// Anything else must be the name of a source for FM to run.
392			sources = append(sources, token)
393		}
394		return
395	}
396
397	// Parse one job from the command line, handy for ad hoc local runs.
398	kickoff(parse(flag.Args()[1:]))
399
400	// Any number of jobs can come from -script.
401	if *script != "" {
402		file := os.Stdin
403		if *script != "-" {
404			file, err := os.Open(*script)
405			if err != nil {
406				fatal(ctx, err)
407			}
408			defer file.Close()
409		}
410		scanner := bufio.NewScanner(file)
411		for scanner.Scan() {
412			kickoff(parse(strings.Fields(scanner.Text())))
413		}
414		if err := scanner.Err(); err != nil {
415			fatal(ctx, err)
416		}
417	}
418
419	// If we're a bot (or acting as if we are one), kick off its work.
420	if *bot != "" {
421		parts := strings.Split(*bot, "-")
422		OS, model, CPU_or_GPU := parts[1], parts[3], parts[4]
423
424		// Bots use portable fonts except where we explicitly opt-in to native fonts.
425		defaultFlags := F{"nativeFonts": "false"}
426
427		run := func(sources []string, extraFlags F) {
428			// Default then extra to allow overriding the defaults.
429			flags := F{}
430			for k, v := range defaultFlags {
431				flags[k] = v
432			}
433			for k, v := range extraFlags {
434				flags[k] = v
435			}
436			kickoff(sources, flags)
437		}
438
439		gms := shorthands["gms"]
440		imgs := shorthands["imgs"]
441		svgs := shorthands["svgs"]
442		skps := shorthands["skps"]
443		tests := shorthands["tests"]
444
445		filter := func(in []string, keep func(string) bool) (out []string) {
446			for _, s := range in {
447				if keep(s) {
448					out = append(out, s)
449				}
450			}
451			return
452		}
453
454		if strings.Contains(OS, "Win") {
455			// We can't decode these formats on Windows.
456			imgs = filter(imgs, func(s string) bool { return !rawExts[normalizedExt(s)] })
457		}
458
459		if strings.Contains(*bot, "TSAN") {
460			// Run each test a few times in parallel to uncover races.
461			defaultFlags["race"] = "4"
462		}
463
464		if CPU_or_GPU == "CPU" {
465			defaultFlags["b"] = "cpu"
466
467			// FM's default ct/gamut/tf flags are equivalent to --config srgb in DM.
468			run(gms, F{})
469			run(gms, F{"nativeFonts": "true"})
470			run(imgs, F{})
471			run(svgs, F{})
472			run(skps, F{"clipW": "1000", "clipH": "1000"})
473			run(tests, F{"race": "0"}) // Several unit tests are not reentrant.
474
475			if model == "GCE" {
476				run(gms, F{"ct": "g8", "legacy": "true"})                     // --config g8
477				run(gms, F{"ct": "565", "legacy": "true"})                    // --config 565
478				run(gms, F{"ct": "8888", "legacy": "true"})                   // --config 8888
479				run(gms, F{"ct": "f16"})                                      // --config esrgb
480				run(gms, F{"ct": "f16", "tf": "linear"})                      // --config f16
481				run(gms, F{"ct": "8888", "gamut": "p3"})                      // --config p3
482				run(gms, F{"ct": "8888", "gamut": "narrow", "tf": "2.2"})     // --config narrow
483				run(gms, F{"ct": "f16", "gamut": "rec2020", "tf": "rec2020"}) // --config erec2020
484
485				run(gms, F{"skvm": "true"})
486				run(gms, F{"skvm": "true", "ct": "f16"})
487
488				run(imgs, F{
489					"decodeToDst": "true",
490					"ct":          "f16",
491					"gamut":       "rec2020",
492					"tf":          "rec2020"})
493			}
494
495			// TODO: pic-8888 equivalent?
496			// TODO: serialize-8888 equivalent?
497		}
498	}
499
500	pendingKickoffs.Wait()
501	if totalFailures > 0 {
502		fatal(ctx, fmt.Errorf("%v runs of %v failed after retries.\n", totalFailures, fm))
503	}
504}
505