1// Copyright 2015 Google Inc. All rights reserved 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package kati 16 17import ( 18 "container/heap" 19 "errors" 20 "fmt" 21 "os" 22 "os/exec" 23 "syscall" 24 "time" 25 26 "github.com/golang/glog" 27) 28 29var ( 30 errNothingDone = errors.New("nothing done") 31) 32 33type job struct { 34 n *DepNode 35 ex *Executor 36 parents []*job 37 outputTs int64 38 numDeps int 39 depsTs int64 40 id int 41 42 runners []runner 43} 44 45type jobResult struct { 46 j *job 47 w *worker 48 err error 49} 50 51type newDep struct { 52 j *job 53 neededBy *job 54} 55 56type worker struct { 57 wm *workerManager 58 jobChan chan *job 59 waitChan chan bool 60 doneChan chan bool 61} 62 63type jobQueue []*job 64 65func (jq jobQueue) Len() int { return len(jq) } 66func (jq jobQueue) Swap(i, j int) { jq[i], jq[j] = jq[j], jq[i] } 67 68func (jq jobQueue) Less(i, j int) bool { 69 // First come, first serve, for GNU make compatibility. 70 return jq[i].id < jq[j].id 71} 72 73func (jq *jobQueue) Push(x interface{}) { 74 item := x.(*job) 75 *jq = append(*jq, item) 76} 77 78func (jq *jobQueue) Pop() interface{} { 79 old := *jq 80 n := len(old) 81 item := old[n-1] 82 *jq = old[0 : n-1] 83 return item 84} 85 86func newWorker(wm *workerManager) *worker { 87 w := &worker{ 88 wm: wm, 89 jobChan: make(chan *job), 90 waitChan: make(chan bool), 91 doneChan: make(chan bool), 92 } 93 return w 94} 95 96func (w *worker) Run() { 97 done := false 98 for !done { 99 select { 100 case j := <-w.jobChan: 101 err := j.build() 102 w.wm.ReportResult(w, j, err) 103 case done = <-w.waitChan: 104 } 105 } 106 w.doneChan <- true 107} 108 109func (w *worker) PostJob(j *job) { 110 w.jobChan <- j 111} 112 113func (w *worker) Wait() { 114 w.waitChan <- true 115 <-w.doneChan 116} 117 118func (j *job) createRunners() ([]runner, error) { 119 runners, _, err := createRunners(j.ex.ctx, j.n) 120 return runners, err 121} 122 123// TODO(ukai): use time.Time? 124func getTimestamp(filename string) int64 { 125 st, err := os.Stat(filename) 126 if err != nil { 127 return -2 128 } 129 return st.ModTime().Unix() 130} 131 132func (j *job) build() error { 133 if j.n.IsPhony { 134 j.outputTs = -2 // trigger cmd even if all inputs don't exist. 135 } else { 136 j.outputTs = getTimestamp(j.n.Output) 137 } 138 139 if !j.n.HasRule { 140 if j.outputTs >= 0 || j.n.IsPhony { 141 return errNothingDone 142 } 143 if len(j.parents) == 0 { 144 return fmt.Errorf("*** No rule to make target %q.", j.n.Output) 145 } 146 return fmt.Errorf("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output) 147 } 148 149 if j.outputTs >= j.depsTs { 150 // TODO: stats. 151 return errNothingDone 152 } 153 154 rr, err := j.createRunners() 155 if err != nil { 156 return err 157 } 158 if len(rr) == 0 { 159 return errNothingDone 160 } 161 for _, r := range rr { 162 err := r.run(j.n.Output) 163 glog.Warningf("cmd result for %q: %v", j.n.Output, err) 164 if err != nil { 165 exit := exitStatus(err) 166 return fmt.Errorf("*** [%s] Error %d", j.n.Output, exit) 167 } 168 } 169 170 if j.n.IsPhony { 171 j.outputTs = time.Now().Unix() 172 } else { 173 j.outputTs = getTimestamp(j.n.Output) 174 if j.outputTs < 0 { 175 j.outputTs = time.Now().Unix() 176 } 177 } 178 return nil 179} 180 181func (wm *workerManager) handleJobs() error { 182 for { 183 if len(wm.freeWorkers) == 0 { 184 return nil 185 } 186 if wm.readyQueue.Len() == 0 { 187 return nil 188 } 189 j := heap.Pop(&wm.readyQueue).(*job) 190 glog.V(1).Infof("run: %s", j.n.Output) 191 192 j.numDeps = -1 // Do not let other workers pick this. 193 w := wm.freeWorkers[0] 194 wm.freeWorkers = wm.freeWorkers[1:] 195 wm.busyWorkers[w] = true 196 w.jobChan <- j 197 } 198} 199 200func (wm *workerManager) updateParents(j *job) { 201 for _, p := range j.parents { 202 p.numDeps-- 203 glog.V(1).Infof("child: %s (%d)", p.n.Output, p.numDeps) 204 if p.depsTs < j.outputTs { 205 p.depsTs = j.outputTs 206 } 207 wm.maybePushToReadyQueue(p) 208 } 209} 210 211type workerManager struct { 212 maxJobs int 213 jobs []*job 214 readyQueue jobQueue 215 jobChan chan *job 216 resultChan chan jobResult 217 newDepChan chan newDep 218 stopChan chan bool 219 waitChan chan bool 220 doneChan chan error 221 freeWorkers []*worker 222 busyWorkers map[*worker]bool 223 ex *Executor 224 runnings map[string]*job 225 226 finishCnt int 227 skipCnt int 228} 229 230func newWorkerManager(numJobs int) (*workerManager, error) { 231 wm := &workerManager{ 232 maxJobs: numJobs, 233 jobChan: make(chan *job), 234 resultChan: make(chan jobResult), 235 newDepChan: make(chan newDep), 236 stopChan: make(chan bool), 237 waitChan: make(chan bool), 238 doneChan: make(chan error), 239 busyWorkers: make(map[*worker]bool), 240 } 241 242 wm.busyWorkers = make(map[*worker]bool) 243 for i := 0; i < numJobs; i++ { 244 w := newWorker(wm) 245 wm.freeWorkers = append(wm.freeWorkers, w) 246 go w.Run() 247 } 248 heap.Init(&wm.readyQueue) 249 go wm.Run() 250 return wm, nil 251} 252 253func exitStatus(err error) int { 254 if err == nil { 255 return 0 256 } 257 exit := 1 258 if err, ok := err.(*exec.ExitError); ok { 259 if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok { 260 return w.ExitStatus() 261 } 262 } 263 return exit 264} 265 266func (wm *workerManager) hasTodo() bool { 267 return wm.finishCnt != len(wm.jobs) 268} 269 270func (wm *workerManager) maybePushToReadyQueue(j *job) { 271 if j.numDeps != 0 { 272 return 273 } 274 heap.Push(&wm.readyQueue, j) 275 glog.V(1).Infof("ready: %s", j.n.Output) 276} 277 278func (wm *workerManager) handleNewDep(j *job, neededBy *job) { 279 if j.numDeps < 0 { 280 neededBy.numDeps-- 281 if neededBy.id > 0 { 282 panic("FIXME: already in WM... can this happen?") 283 } 284 } else { 285 j.parents = append(j.parents, neededBy) 286 } 287} 288 289func (wm *workerManager) Run() { 290 done := false 291 var err error 292Loop: 293 for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done { 294 select { 295 case j := <-wm.jobChan: 296 glog.V(1).Infof("wait: %s (%d)", j.n.Output, j.numDeps) 297 j.id = len(wm.jobs) + 1 298 wm.jobs = append(wm.jobs, j) 299 wm.maybePushToReadyQueue(j) 300 case jr := <-wm.resultChan: 301 glog.V(1).Infof("done: %s", jr.j.n.Output) 302 delete(wm.busyWorkers, jr.w) 303 wm.freeWorkers = append(wm.freeWorkers, jr.w) 304 wm.updateParents(jr.j) 305 wm.finishCnt++ 306 if jr.err == errNothingDone { 307 wm.skipCnt++ 308 jr.err = nil 309 } 310 if jr.err != nil { 311 err = jr.err 312 close(wm.stopChan) 313 break Loop 314 } 315 case af := <-wm.newDepChan: 316 wm.handleNewDep(af.j, af.neededBy) 317 glog.V(1).Infof("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output) 318 case done = <-wm.waitChan: 319 } 320 err = wm.handleJobs() 321 if err != nil { 322 break Loop 323 } 324 325 glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers)) 326 } 327 if !done { 328 <-wm.waitChan 329 } 330 331 for _, w := range wm.freeWorkers { 332 w.Wait() 333 } 334 for w := range wm.busyWorkers { 335 w.Wait() 336 } 337 wm.doneChan <- err 338} 339 340func (wm *workerManager) PostJob(j *job) error { 341 select { 342 case wm.jobChan <- j: 343 return nil 344 case <-wm.stopChan: 345 return errors.New("worker manager stopped") 346 } 347} 348 349func (wm *workerManager) ReportResult(w *worker, j *job, err error) { 350 select { 351 case wm.resultChan <- jobResult{w: w, j: j, err: err}: 352 case <-wm.stopChan: 353 } 354} 355 356func (wm *workerManager) ReportNewDep(j *job, neededBy *job) { 357 select { 358 case wm.newDepChan <- newDep{j: j, neededBy: neededBy}: 359 case <-wm.stopChan: 360 } 361} 362 363func (wm *workerManager) Wait() (int, error) { 364 wm.waitChan <- true 365 err := <-wm.doneChan 366 glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt) 367 return wm.finishCnt - wm.skipCnt, err 368} 369