• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2023 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package cache
6
7import (
8	"bufio"
9	"cmd/go/internal/base"
10	"cmd/internal/quoted"
11	"context"
12	"crypto/sha256"
13	"encoding/base64"
14	"encoding/json"
15	"errors"
16	"fmt"
17	"io"
18	"log"
19	"os"
20	"os/exec"
21	"sync"
22	"sync/atomic"
23	"time"
24)
25
26// ProgCache implements Cache via JSON messages over stdin/stdout to a child
27// helper process which can then implement whatever caching policy/mechanism it
28// wants.
29//
30// See https://github.com/golang/go/issues/59719
31type ProgCache struct {
32	cmd    *exec.Cmd
33	stdout io.ReadCloser  // from the child process
34	stdin  io.WriteCloser // to the child process
35	bw     *bufio.Writer  // to stdin
36	jenc   *json.Encoder  // to bw
37
38	// can are the commands that the child process declared that it supports.
39	// This is effectively the versioning mechanism.
40	can map[ProgCmd]bool
41
42	// fuzzDirCache is another Cache implementation to use for the FuzzDir
43	// method. In practice this is the default GOCACHE disk-based
44	// implementation.
45	//
46	// TODO(bradfitz): maybe this isn't ideal. But we'd need to extend the Cache
47	// interface and the fuzzing callers to be less disk-y to do more here.
48	fuzzDirCache Cache
49
50	closing      atomic.Bool
51	ctx          context.Context    // valid until Close via ctxClose
52	ctxCancel    context.CancelFunc // called on Close
53	readLoopDone chan struct{}      // closed when readLoop returns
54
55	mu         sync.Mutex // guards following fields
56	nextID     int64
57	inFlight   map[int64]chan<- *ProgResponse
58	outputFile map[OutputID]string // object => abs path on disk
59
60	// writeMu serializes writing to the child process.
61	// It must never be held at the same time as mu.
62	writeMu sync.Mutex
63}
64
65// ProgCmd is a command that can be issued to a child process.
66//
67// If the interface needs to grow, we can add new commands or new versioned
68// commands like "get2".
69type ProgCmd string
70
71const (
72	cmdGet   = ProgCmd("get")
73	cmdPut   = ProgCmd("put")
74	cmdClose = ProgCmd("close")
75)
76
77// ProgRequest is the JSON-encoded message that's sent from cmd/go to
78// the GOCACHEPROG child process over stdin. Each JSON object is on its
79// own line. A ProgRequest of Type "put" with BodySize > 0 will be followed
80// by a line containing a base64-encoded JSON string literal of the body.
81type ProgRequest struct {
82	// ID is a unique number per process across all requests.
83	// It must be echoed in the ProgResponse from the child.
84	ID int64
85
86	// Command is the type of request.
87	// The cmd/go tool will only send commands that were declared
88	// as supported by the child.
89	Command ProgCmd
90
91	// ActionID is non-nil for get and puts.
92	ActionID []byte `json:",omitempty"` // or nil if not used
93
94	// ObjectID is set for Type "put" and "output-file".
95	ObjectID []byte `json:",omitempty"` // or nil if not used
96
97	// Body is the body for "put" requests. It's sent after the JSON object
98	// as a base64-encoded JSON string when BodySize is non-zero.
99	// It's sent as a separate JSON value instead of being a struct field
100	// send in this JSON object so large values can be streamed in both directions.
101	// The base64 string body of a ProgRequest will always be written
102	// immediately after the JSON object and a newline.
103	Body io.Reader `json:"-"`
104
105	// BodySize is the number of bytes of Body. If zero, the body isn't written.
106	BodySize int64 `json:",omitempty"`
107}
108
109// ProgResponse is the JSON response from the child process to cmd/go.
110//
111// With the exception of the first protocol message that the child writes to its
112// stdout with ID==0 and KnownCommands populated, these are only sent in
113// response to a ProgRequest from cmd/go.
114//
115// ProgResponses can be sent in any order. The ID must match the request they're
116// replying to.
117type ProgResponse struct {
118	ID  int64  // that corresponds to ProgRequest; they can be answered out of order
119	Err string `json:",omitempty"` // if non-empty, the error
120
121	// KnownCommands is included in the first message that cache helper program
122	// writes to stdout on startup (with ID==0). It includes the
123	// ProgRequest.Command types that are supported by the program.
124	//
125	// This lets us extend the protocol gracefully over time (adding "get2",
126	// etc), or fail gracefully when needed. It also lets us verify the program
127	// wants to be a cache helper.
128	KnownCommands []ProgCmd `json:",omitempty"`
129
130	// For Get requests.
131
132	Miss     bool       `json:",omitempty"` // cache miss
133	OutputID []byte     `json:",omitempty"`
134	Size     int64      `json:",omitempty"` // in bytes
135	Time     *time.Time `json:",omitempty"` // an Entry.Time; when the object was added to the docs
136
137	// DiskPath is the absolute path on disk of the ObjectID corresponding
138	// a "get" request's ActionID (on cache hit) or a "put" request's
139	// provided ObjectID.
140	DiskPath string `json:",omitempty"`
141}
142
143// startCacheProg starts the prog binary (with optional space-separated flags)
144// and returns a Cache implementation that talks to it.
145//
146// It blocks a few seconds to wait for the child process to successfully start
147// and advertise its capabilities.
148func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
149	if fuzzDirCache == nil {
150		panic("missing fuzzDirCache")
151	}
152	args, err := quoted.Split(progAndArgs)
153	if err != nil {
154		base.Fatalf("GOCACHEPROG args: %v", err)
155	}
156	var prog string
157	if len(args) > 0 {
158		prog = args[0]
159		args = args[1:]
160	}
161
162	ctx, ctxCancel := context.WithCancel(context.Background())
163
164	cmd := exec.CommandContext(ctx, prog, args...)
165	out, err := cmd.StdoutPipe()
166	if err != nil {
167		base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
168	}
169	in, err := cmd.StdinPipe()
170	if err != nil {
171		base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
172	}
173	cmd.Stderr = os.Stderr
174	cmd.Cancel = in.Close
175
176	if err := cmd.Start(); err != nil {
177		base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
178	}
179
180	pc := &ProgCache{
181		ctx:          ctx,
182		ctxCancel:    ctxCancel,
183		fuzzDirCache: fuzzDirCache,
184		cmd:          cmd,
185		stdout:       out,
186		stdin:        in,
187		bw:           bufio.NewWriter(in),
188		inFlight:     make(map[int64]chan<- *ProgResponse),
189		outputFile:   make(map[OutputID]string),
190		readLoopDone: make(chan struct{}),
191	}
192
193	// Register our interest in the initial protocol message from the child to
194	// us, saying what it can do.
195	capResc := make(chan *ProgResponse, 1)
196	pc.inFlight[0] = capResc
197
198	pc.jenc = json.NewEncoder(pc.bw)
199	go pc.readLoop(pc.readLoopDone)
200
201	// Give the child process a few seconds to report its capabilities. This
202	// should be instant and not require any slow work by the program.
203	timer := time.NewTicker(5 * time.Second)
204	defer timer.Stop()
205	for {
206		select {
207		case <-timer.C:
208			log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
209		case capRes := <-capResc:
210			can := map[ProgCmd]bool{}
211			for _, cmd := range capRes.KnownCommands {
212				can[cmd] = true
213			}
214			if len(can) == 0 {
215				base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
216			}
217			pc.can = can
218			return pc
219		}
220	}
221}
222
223func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
224	defer close(readLoopDone)
225	jd := json.NewDecoder(c.stdout)
226	for {
227		res := new(ProgResponse)
228		if err := jd.Decode(res); err != nil {
229			if c.closing.Load() {
230				return // quietly
231			}
232			if err == io.EOF {
233				c.mu.Lock()
234				inFlight := len(c.inFlight)
235				c.mu.Unlock()
236				base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
237			}
238			base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
239		}
240		c.mu.Lock()
241		ch, ok := c.inFlight[res.ID]
242		delete(c.inFlight, res.ID)
243		c.mu.Unlock()
244		if ok {
245			ch <- res
246		} else {
247			base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
248		}
249	}
250}
251
252func (c *ProgCache) send(ctx context.Context, req *ProgRequest) (*ProgResponse, error) {
253	resc := make(chan *ProgResponse, 1)
254	if err := c.writeToChild(req, resc); err != nil {
255		return nil, err
256	}
257	select {
258	case res := <-resc:
259		if res.Err != "" {
260			return nil, errors.New(res.Err)
261		}
262		return res, nil
263	case <-ctx.Done():
264		return nil, ctx.Err()
265	}
266}
267
268func (c *ProgCache) writeToChild(req *ProgRequest, resc chan<- *ProgResponse) (err error) {
269	c.mu.Lock()
270	c.nextID++
271	req.ID = c.nextID
272	c.inFlight[req.ID] = resc
273	c.mu.Unlock()
274
275	defer func() {
276		if err != nil {
277			c.mu.Lock()
278			delete(c.inFlight, req.ID)
279			c.mu.Unlock()
280		}
281	}()
282
283	c.writeMu.Lock()
284	defer c.writeMu.Unlock()
285
286	if err := c.jenc.Encode(req); err != nil {
287		return err
288	}
289	if err := c.bw.WriteByte('\n'); err != nil {
290		return err
291	}
292	if req.Body != nil && req.BodySize > 0 {
293		if err := c.bw.WriteByte('"'); err != nil {
294			return err
295		}
296		e := base64.NewEncoder(base64.StdEncoding, c.bw)
297		wrote, err := io.Copy(e, req.Body)
298		if err != nil {
299			return err
300		}
301		if err := e.Close(); err != nil {
302			return nil
303		}
304		if wrote != req.BodySize {
305			return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, object %x: wrote %v; expected %v",
306				req.ActionID, req.ObjectID, wrote, req.BodySize)
307		}
308		if _, err := c.bw.WriteString("\"\n"); err != nil {
309			return err
310		}
311	}
312	if err := c.bw.Flush(); err != nil {
313		return err
314	}
315	return nil
316}
317
318func (c *ProgCache) Get(a ActionID) (Entry, error) {
319	if !c.can[cmdGet] {
320		// They can't do a "get". Maybe they're a write-only cache.
321		//
322		// TODO(bradfitz,bcmills): figure out the proper error type here. Maybe
323		// errors.ErrUnsupported? Is entryNotFoundError even appropriate? There
324		// might be places where we rely on the fact that a recent Put can be
325		// read through a corresponding Get. Audit callers and check, and document
326		// error types on the Cache interface.
327		return Entry{}, &entryNotFoundError{}
328	}
329	res, err := c.send(c.ctx, &ProgRequest{
330		Command:  cmdGet,
331		ActionID: a[:],
332	})
333	if err != nil {
334		return Entry{}, err // TODO(bradfitz): or entryNotFoundError? Audit callers.
335	}
336	if res.Miss {
337		return Entry{}, &entryNotFoundError{}
338	}
339	e := Entry{
340		Size: res.Size,
341	}
342	if res.Time != nil {
343		e.Time = *res.Time
344	} else {
345		e.Time = time.Now()
346	}
347	if res.DiskPath == "" {
348		return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
349	}
350	if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
351		return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
352	}
353	c.noteOutputFile(e.OutputID, res.DiskPath)
354	return e, nil
355}
356
357func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
358	c.mu.Lock()
359	defer c.mu.Unlock()
360	c.outputFile[o] = diskPath
361}
362
363func (c *ProgCache) OutputFile(o OutputID) string {
364	c.mu.Lock()
365	defer c.mu.Unlock()
366	return c.outputFile[o]
367}
368
369func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
370	// Compute output ID.
371	h := sha256.New()
372	if _, err := file.Seek(0, 0); err != nil {
373		return OutputID{}, 0, err
374	}
375	size, err := io.Copy(h, file)
376	if err != nil {
377		return OutputID{}, 0, err
378	}
379	var out OutputID
380	h.Sum(out[:0])
381
382	if _, err := file.Seek(0, 0); err != nil {
383		return OutputID{}, 0, err
384	}
385
386	if !c.can[cmdPut] {
387		// Child is a read-only cache. Do nothing.
388		return out, size, nil
389	}
390
391	res, err := c.send(c.ctx, &ProgRequest{
392		Command:  cmdPut,
393		ActionID: a[:],
394		ObjectID: out[:],
395		Body:     file,
396		BodySize: size,
397	})
398	if err != nil {
399		return OutputID{}, 0, err
400	}
401	if res.DiskPath == "" {
402		return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
403	}
404	c.noteOutputFile(out, res.DiskPath)
405	return out, size, err
406}
407
408func (c *ProgCache) Close() error {
409	c.closing.Store(true)
410	var err error
411
412	// First write a "close" message to the child so it can exit nicely
413	// and clean up if it wants. Only after that exchange do we cancel
414	// the context that kills the process.
415	if c.can[cmdClose] {
416		_, err = c.send(c.ctx, &ProgRequest{Command: cmdClose})
417	}
418	c.ctxCancel()
419	<-c.readLoopDone
420	return err
421}
422
423func (c *ProgCache) FuzzDir() string {
424	// TODO(bradfitz): figure out what to do here. For now just use the
425	// disk-based default.
426	return c.fuzzDirCache.FuzzDir()
427}
428