• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2015 Google Inc. All rights reserved
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package kati
16
17import (
18	"container/heap"
19	"errors"
20	"fmt"
21	"os"
22	"os/exec"
23	"syscall"
24	"time"
25
26	"github.com/golang/glog"
27)
28
29var (
30	errNothingDone = errors.New("nothing done")
31)
32
33type job struct {
34	n        *DepNode
35	ex       *Executor
36	parents  []*job
37	outputTs int64
38	numDeps  int
39	depsTs   int64
40	id       int
41
42	runners []runner
43}
44
45type jobResult struct {
46	j   *job
47	w   *worker
48	err error
49}
50
51type newDep struct {
52	j        *job
53	neededBy *job
54}
55
56type worker struct {
57	wm       *workerManager
58	jobChan  chan *job
59	waitChan chan bool
60	doneChan chan bool
61}
62
63type jobQueue []*job
64
65func (jq jobQueue) Len() int      { return len(jq) }
66func (jq jobQueue) Swap(i, j int) { jq[i], jq[j] = jq[j], jq[i] }
67
68func (jq jobQueue) Less(i, j int) bool {
69	// First come, first serve, for GNU make compatibility.
70	return jq[i].id < jq[j].id
71}
72
73func (jq *jobQueue) Push(x interface{}) {
74	item := x.(*job)
75	*jq = append(*jq, item)
76}
77
78func (jq *jobQueue) Pop() interface{} {
79	old := *jq
80	n := len(old)
81	item := old[n-1]
82	*jq = old[0 : n-1]
83	return item
84}
85
86func newWorker(wm *workerManager) *worker {
87	w := &worker{
88		wm:       wm,
89		jobChan:  make(chan *job),
90		waitChan: make(chan bool),
91		doneChan: make(chan bool),
92	}
93	return w
94}
95
96func (w *worker) Run() {
97	done := false
98	for !done {
99		select {
100		case j := <-w.jobChan:
101			err := j.build()
102			w.wm.ReportResult(w, j, err)
103		case done = <-w.waitChan:
104		}
105	}
106	w.doneChan <- true
107}
108
109func (w *worker) PostJob(j *job) {
110	w.jobChan <- j
111}
112
113func (w *worker) Wait() {
114	w.waitChan <- true
115	<-w.doneChan
116}
117
118func (j *job) createRunners() ([]runner, error) {
119	runners, _, err := createRunners(j.ex.ctx, j.n)
120	return runners, err
121}
122
123// TODO(ukai): use time.Time?
124func getTimestamp(filename string) int64 {
125	st, err := os.Stat(filename)
126	if err != nil {
127		return -2
128	}
129	return st.ModTime().Unix()
130}
131
132func (j *job) build() error {
133	if j.n.IsPhony {
134		j.outputTs = -2 // trigger cmd even if all inputs don't exist.
135	} else {
136		j.outputTs = getTimestamp(j.n.Output)
137	}
138
139	if !j.n.HasRule {
140		if j.outputTs >= 0 || j.n.IsPhony {
141			return errNothingDone
142		}
143		if len(j.parents) == 0 {
144			return fmt.Errorf("*** No rule to make target %q.", j.n.Output)
145		}
146		return fmt.Errorf("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output)
147	}
148
149	if j.outputTs >= j.depsTs {
150		// TODO: stats.
151		return errNothingDone
152	}
153
154	rr, err := j.createRunners()
155	if err != nil {
156		return err
157	}
158	if len(rr) == 0 {
159		return errNothingDone
160	}
161	for _, r := range rr {
162		err := r.run(j.n.Output)
163		glog.Warningf("cmd result for %q: %v", j.n.Output, err)
164		if err != nil {
165			exit := exitStatus(err)
166			return fmt.Errorf("*** [%s] Error %d", j.n.Output, exit)
167		}
168	}
169
170	if j.n.IsPhony {
171		j.outputTs = time.Now().Unix()
172	} else {
173		j.outputTs = getTimestamp(j.n.Output)
174		if j.outputTs < 0 {
175			j.outputTs = time.Now().Unix()
176		}
177	}
178	return nil
179}
180
181func (wm *workerManager) handleJobs() error {
182	for {
183		if len(wm.freeWorkers) == 0 {
184			return nil
185		}
186		if wm.readyQueue.Len() == 0 {
187			return nil
188		}
189		j := heap.Pop(&wm.readyQueue).(*job)
190		glog.V(1).Infof("run: %s", j.n.Output)
191
192		j.numDeps = -1 // Do not let other workers pick this.
193		w := wm.freeWorkers[0]
194		wm.freeWorkers = wm.freeWorkers[1:]
195		wm.busyWorkers[w] = true
196		w.jobChan <- j
197	}
198}
199
200func (wm *workerManager) updateParents(j *job) {
201	for _, p := range j.parents {
202		p.numDeps--
203		glog.V(1).Infof("child: %s (%d)", p.n.Output, p.numDeps)
204		if p.depsTs < j.outputTs {
205			p.depsTs = j.outputTs
206		}
207		wm.maybePushToReadyQueue(p)
208	}
209}
210
211type workerManager struct {
212	maxJobs     int
213	jobs        []*job
214	readyQueue  jobQueue
215	jobChan     chan *job
216	resultChan  chan jobResult
217	newDepChan  chan newDep
218	stopChan    chan bool
219	waitChan    chan bool
220	doneChan    chan error
221	freeWorkers []*worker
222	busyWorkers map[*worker]bool
223	ex          *Executor
224	runnings    map[string]*job
225
226	finishCnt int
227	skipCnt   int
228}
229
230func newWorkerManager(numJobs int) (*workerManager, error) {
231	wm := &workerManager{
232		maxJobs:     numJobs,
233		jobChan:     make(chan *job),
234		resultChan:  make(chan jobResult),
235		newDepChan:  make(chan newDep),
236		stopChan:    make(chan bool),
237		waitChan:    make(chan bool),
238		doneChan:    make(chan error),
239		busyWorkers: make(map[*worker]bool),
240	}
241
242	wm.busyWorkers = make(map[*worker]bool)
243	for i := 0; i < numJobs; i++ {
244		w := newWorker(wm)
245		wm.freeWorkers = append(wm.freeWorkers, w)
246		go w.Run()
247	}
248	heap.Init(&wm.readyQueue)
249	go wm.Run()
250	return wm, nil
251}
252
253func exitStatus(err error) int {
254	if err == nil {
255		return 0
256	}
257	exit := 1
258	if err, ok := err.(*exec.ExitError); ok {
259		if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok {
260			return w.ExitStatus()
261		}
262	}
263	return exit
264}
265
266func (wm *workerManager) hasTodo() bool {
267	return wm.finishCnt != len(wm.jobs)
268}
269
270func (wm *workerManager) maybePushToReadyQueue(j *job) {
271	if j.numDeps != 0 {
272		return
273	}
274	heap.Push(&wm.readyQueue, j)
275	glog.V(1).Infof("ready: %s", j.n.Output)
276}
277
278func (wm *workerManager) handleNewDep(j *job, neededBy *job) {
279	if j.numDeps < 0 {
280		neededBy.numDeps--
281		if neededBy.id > 0 {
282			panic("FIXME: already in WM... can this happen?")
283		}
284	} else {
285		j.parents = append(j.parents, neededBy)
286	}
287}
288
289func (wm *workerManager) Run() {
290	done := false
291	var err error
292Loop:
293	for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
294		select {
295		case j := <-wm.jobChan:
296			glog.V(1).Infof("wait: %s (%d)", j.n.Output, j.numDeps)
297			j.id = len(wm.jobs) + 1
298			wm.jobs = append(wm.jobs, j)
299			wm.maybePushToReadyQueue(j)
300		case jr := <-wm.resultChan:
301			glog.V(1).Infof("done: %s", jr.j.n.Output)
302			delete(wm.busyWorkers, jr.w)
303			wm.freeWorkers = append(wm.freeWorkers, jr.w)
304			wm.updateParents(jr.j)
305			wm.finishCnt++
306			if jr.err == errNothingDone {
307				wm.skipCnt++
308				jr.err = nil
309			}
310			if jr.err != nil {
311				err = jr.err
312				close(wm.stopChan)
313				break Loop
314			}
315		case af := <-wm.newDepChan:
316			wm.handleNewDep(af.j, af.neededBy)
317			glog.V(1).Infof("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output)
318		case done = <-wm.waitChan:
319		}
320		err = wm.handleJobs()
321		if err != nil {
322			break Loop
323		}
324
325		glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
326	}
327	if !done {
328		<-wm.waitChan
329	}
330
331	for _, w := range wm.freeWorkers {
332		w.Wait()
333	}
334	for w := range wm.busyWorkers {
335		w.Wait()
336	}
337	wm.doneChan <- err
338}
339
340func (wm *workerManager) PostJob(j *job) error {
341	select {
342	case wm.jobChan <- j:
343		return nil
344	case <-wm.stopChan:
345		return errors.New("worker manager stopped")
346	}
347}
348
349func (wm *workerManager) ReportResult(w *worker, j *job, err error) {
350	select {
351	case wm.resultChan <- jobResult{w: w, j: j, err: err}:
352	case <-wm.stopChan:
353	}
354}
355
356func (wm *workerManager) ReportNewDep(j *job, neededBy *job) {
357	select {
358	case wm.newDepChan <- newDep{j: j, neededBy: neededBy}:
359	case <-wm.stopChan:
360	}
361}
362
363func (wm *workerManager) Wait() (int, error) {
364	wm.waitChan <- true
365	err := <-wm.doneChan
366	glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt)
367	return wm.finishCnt - wm.skipCnt, err
368}
369