1#!/usr/bin/env python3
2#
3#  Copyright (C) 2018 The Android Open Source Project
4#
5#  Licensed under the Apache License, Version 2.0 (the "License");
6#  you may not use this file except in compliance with the License.
7#  You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#  Unless required by applicable law or agreed to in writing, software
12#  distributed under the License is distributed on an "AS IS" BASIS,
13#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#  See the License for the specific language governing permissions and
15#  limitations under the License.
16#
17
18
19import datetime, filecmp, math, multiprocessing, os, shutil, subprocess, stat, sys, time
20from collections import OrderedDict
21
22def usage():
23  print("""Usage: diff-filterer.py [--assume-input-states-are-correct] [--allow-goal-passing] [--work-path <workpath>] [--num-jobs <count>] [--timeout <seconds>] [--debug] <passingPath> <goalPath> <shellCommand>
24
25diff-filterer.py attempts to transform (a copy of) the contents of <passingPath> into the contents of <goalPath> subject to the constraint that when <shellCommand> is run in that directory, it returns 0
26
27OPTIONS
28  --assume-input-states-are-correct
29    Assume that <shellCommand> passes in <passingPath> and fails in <goalPath> rather than re-verifying this
30  --allow-goal-passing
31    If <goalPath> passes the test, report it as the best result rather than reporting an error.
32    Usually it's a mistake to pass a passing state as the goal path, because then the passing state is the best result. This usually means the inputs might be reversed or mistaken. It also means that the binary search is very short.
33    In some cases this can make sense if the caller hasn't already checked whether the goal state passes.
34  --work-path <filepath>
35    File path to use as the work directory for testing the shell command
36    This file path will be overwritten and modified as needed for testing purposes, and will also be the working directory of the shell command when it is run
37  --num-jobs <count>
38    The maximum number of concurrent executions of <shellCommand> to spawn at once
39    Specify 'auto' to have diff-filterer.py dynamically adjust the number of jobs based on system load
40  --timeout <seconds>
41    Approximate maximum amount of time to run. If diff-filterer.py expects that running a test would exceed this timeout, then it will skip running the test, terminate early, and report what it did find.
42    diff-filterer.py doesn't terminate any child processes that have already started, so it is still possible that diff-filterer.py might exceed this timeout by the amount of time required to run one test.
43  --debug
44    Enable some debug checks in diff-filterer.py
45""")
46  sys.exit(1)
47
48debug = False
49
50# Miscellaneous file utilities
51class FileIo(object):
52  def __init__(self):
53    return
54
55  def ensureDirExists(self, filePath):
56    if not os.path.isdir(filePath):
57      if os.path.isfile(filePath) or os.path.islink(filePath):
58        os.remove(filePath)
59      os.makedirs(filePath)
60
61  def copyFile(self, fromPath, toPath):
62    self.ensureDirExists(os.path.dirname(toPath))
63    self.removePath(toPath)
64    if os.path.islink(fromPath):
65      linkText = os.readlink(fromPath)
66      os.symlink(linkText, toPath)
67    else:
68      shutil.copy2(fromPath, toPath)
69
70  def hardLink(self, oldPath, newPath):
71    self.ensureDirExists(os.path.dirname(newPath))
72    self.removePath(newPath)
73    os.link(oldPath, newPath)
74
75  def writeFile(self, path, text):
76    f = open(path, "w+")
77    f.write(text)
78    f.close()
79
80  def writeScript(self, path, text):
81    self.writeFile(path, text)
82    os.chmod(path, stat.S_IRWXU)
83
84  def removePath(self, filePath):
85    if len(os.path.split(filePath)) < 2:
86      raise Exception("Will not remove path at " + filePath + "; is too close to the root of the filesystem")
87    if os.path.islink(filePath):
88      os.remove(filePath)
89    elif os.path.isdir(filePath):
90      shutil.rmtree(filePath)
91    elif os.path.isfile(filePath):
92      os.remove(filePath)
93
94  def move(self, fromPath, toPath):
95    os.rename(fromPath, toPath)
96
97  def join(self, path1, path2):
98    return os.path.normpath(os.path.join(path1, path2))
99
100  # tells whether <parent> either contains <child> or is <child>
101  def contains(self, parent, child):
102    if parent == child:
103      return True
104    return child.startswith(parent + "/")
105
106  # returns the common prefix of two paths. For example, commonPrefixOf2("a/b/c", "a/b/cat") returns "a/b"
107  def commonPrefixOf2(self, path1, path2):
108    prefix = path2
109    while True:
110      if self.contains(prefix, path1):
111        return prefix
112      parent = os.path.dirname(prefix)
113      if parent == prefix:
114        return None
115      prefix = parent
116
117  # returns the common prefix of multiple paths
118  def commonPrefix(self, paths):
119    if len(paths) < 1:
120      return None
121    result = None
122    for path in paths:
123      if result is None:
124        # first iteration
125        result = path
126      else:
127        prev = result
128        result = self.commonPrefixOf2(result, path)
129        if result is None:
130          # the common prefix of two paths was nothing
131          return result
132    return result
133
134  # returns the time at which <path> was last modified, without following symlinks
135  def getModificationTime(self, path):
136    if os.path.exists(path):
137      if os.path.islink(path):
138        # for a symlink, the last time the link itself was modified is the ctime (mtime for a broken link is undefined)
139        return os.path.getctime(path)
140      else:
141        # for a file, the last time its content was modified is the mtime
142        return os.path.getmtime(path)
143    return None
144
145fileIo = FileIo()
146
147# Returns cpu usage
148class CpuStats(object):
149
150  def cpu_times_percent(self):
151    # We wait to attempt to import psutil in case we don't need it and it doesn't exist on this system
152    import psutil
153    return psutil.cpu_times_percent(interval=None)
154
155cpuStats = CpuStats()
156
157# Runs a shell command
158class ShellScript(object):
159  def __init__(self, commandText, cwd):
160    self.commandText = commandText
161    self.cwd = cwd
162
163  def process(self):
164    cwd = self.cwd
165    print("Running '" + self.commandText + "' in " + cwd)
166    try:
167      subprocess.check_call(["bash", "-c", "cd " + cwd + " && " + self.commandText])
168      return 0
169    except subprocess.CalledProcessError as e:
170      return e.returncode
171
172# Base class that can hold the state of a file
173class FileContent(object):
174  def apply(self, filePath):
175    pass
176
177  def equals(self, other, checkWithFileSystem=False):
178    pass
179
180# A FileContent that refers to the content of a specific file
181class FileBacked_FileContent(FileContent):
182  def __init__(self, referencePath):
183    super(FileBacked_FileContent, self).__init__()
184    self.referencePath = referencePath
185    self.isLink = os.path.islink(self.referencePath)
186
187  def apply(self, filePath):
188    fileIo.copyFile(self.referencePath, filePath)
189
190  def equals(self, other, checkWithFileSystem=False):
191    if not isinstance(other, FileBacked_FileContent):
192      return False
193    if self.referencePath == other.referencePath:
194      return True
195    if not checkWithFileSystem:
196      return False
197    if self.isLink and other.isLink:
198      return os.readlink(self.referencePath) == os.readlink(other.referencePath)
199    if self.isLink != other.isLink:
200      return False # symlink not equal to non-symlink
201    return filecmp.cmp(self.referencePath, other.referencePath)
202
203  def __str__(self):
204    return self.referencePath
205
206# A FileContent describing the nonexistence of a file
207class MissingFile_FileContent(FileContent):
208  def __init__(self):
209    super(MissingFile_FileContent, self).__init__()
210
211  def apply(self, filePath):
212    fileIo.removePath(filePath)
213
214  def equals(self, other, checkWithFileSystem=False):
215    return isinstance(other, MissingFile_FileContent)
216
217  def __str__(self):
218    return "Empty"
219
220# A FileContent describing a directory
221class Directory_FileContent(FileContent):
222  def __init__(self):
223    super(Directory_FileContent, self).__init__()
224
225  def apply(self, filePath):
226    fileIo.ensureDirExists(filePath)
227
228  def equals(self, other, checkWithFileSystem=False):
229    return isinstance(other, Directory_FileContent)
230
231  def __str__(self):
232    return "[empty dir]"
233
234# A collection of many FileContent objects
235class FilesState(object):
236  def __init__(self):
237    self.fileStates = OrderedDict()
238
239  def apply(self, filePath):
240    for relPath, state in self.fileStates.items():
241      state.apply(fileIo.join(filePath, relPath))
242
243  def add(self, filePath, fileContent):
244    self.fileStates[filePath] = fileContent
245
246  def addAllFrom(self, other):
247    for filePath in other.fileStates:
248        self.add(filePath, other.fileStates[filePath])
249
250  def getContent(self, filePath):
251    if filePath in self.fileStates:
252      return self.fileStates[filePath]
253    return None
254
255  def getKeys(self):
256    return self.fileStates.keys()
257
258  # returns a FilesState resembling <self> but without the keys for which other[key] == self[key]
259  def withoutDuplicatesFrom(self, other, checkWithFileSystem=False):
260    result = FilesState()
261    for filePath, fileState in self.fileStates.items():
262      otherContent = other.getContent(filePath)
263      if not fileState.equals(otherContent, checkWithFileSystem):
264        result.add(filePath, fileState)
265    return result
266
267  # returns self[fromIndex:toIndex]
268  def slice(self, fromIndex, toIndex):
269    result = FilesState()
270    for filePath in list(self.fileStates.keys())[fromIndex:toIndex]:
271      result.fileStates[filePath] = self.fileStates[filePath]
272    return result
273
274  def restrictedToKeysIn(self, other):
275    result = FilesState()
276    for filePath, fileState in self.fileStates.items():
277      if filePath in other.fileStates:
278        result.add(filePath, fileState)
279    return result
280
281  # returns a FilesState having the same keys as this FilesState, but with values taken from <other> when it has them, and <self> otherwise
282  def withConflictsFrom(self, other, listEmptyDirs = False):
283    result = FilesState()
284    for filePath, fileContent in self.fileStates.items():
285      if filePath in other.fileStates:
286        result.add(filePath, other.fileStates[filePath])
287      else:
288        result.add(filePath, fileContent)
289    if listEmptyDirs:
290      oldImpliedDirs = self.listImpliedDirs()
291      newImpliedDirs = result.listImpliedDirs()
292      for impliedDir in oldImpliedDirs:
293        if impliedDir not in newImpliedDirs and impliedDir not in result.fileStates:
294          result.add(impliedDir, MissingFile_FileContent())
295    return result
296
297  def checkSameKeys(self, other):
298    a = self.checkContainsKeys(other)
299    b = other.checkContainsKeys(self)
300    if a and b:
301      return True
302    if not a:
303      print("a does not contain all of the keys from b")
304    if not b:
305      print("b does not contain all of the keys from a")
306    return False
307
308  def checkContainsKeys(self, other):
309    contains = True
310    for f in other.fileStates.keys():
311      if f not in self.fileStates:
312        print("Found in " + other.summarize() + " but not in " + self.summarize() + ": " + f)
313        contains = False
314    return contains
315
316  # returns a set of paths to all of the dirs in <self> that are implied by any files in <self>
317  def listImpliedDirs(self):
318    dirs = set()
319    empty = MissingFile_FileContent()
320    keys = [key for (key, value) in self.fileStates.items() if not empty.equals(value)]
321    i = 0
322    while i < len(keys):
323      path = keys[i]
324      parent, child = os.path.split(path)
325      if parent == "":
326        parent = "."
327      if not parent in dirs:
328        dirs.add(parent)
329        keys.append(parent)
330      i += 1
331    return dirs
332
333  # returns a FilesState having all of the entries from <self>, plus empty entries for any keys in <other> not in <self>
334  def expandedWithEmptyEntriesFor(self, other, listEmptyDirs=False):
335    impliedDirs = self.listImpliedDirs()
336    # now look for entries in <other> not present in <self>
337    result = self.clone()
338    for filePath in other.fileStates:
339      if filePath not in result.fileStates and filePath not in impliedDirs:
340        result.fileStates[filePath] = MissingFile_FileContent()
341    if listEmptyDirs:
342      newImpliedDirs = other.listImpliedDirs()
343      oldImpliedDirs = result.listImpliedDirs()
344      for impliedDir in newImpliedDirs:
345        if impliedDir not in oldImpliedDirs and impliedDir not in result.fileStates:
346          result.add(impliedDir, MissingFile_FileContent())
347    return result
348
349  def clone(self):
350    result = FilesState()
351    for path, content in self.fileStates.items():
352      result.add(path, content)
353    return result
354
355  def withoutEmptyEntries(self):
356    result = FilesState()
357    empty = MissingFile_FileContent()
358    for path, state in self.fileStates.items():
359      if not empty.equals(state):
360        result.add(path, state)
361    return result
362
363  def getCommonDir(self):
364    result = fileIo.commonPrefix(self.fileStates.keys())
365    return result
366
367  # Returns a list of FilesState objects each containing a different subdirectory of <self>
368  # If groupDirectFilesTogether == True, then all files directly under self.getCommonDir() will be assigned to the same group
369  def groupByDirs(self, groupDirectFilesTogether = False):
370    if len(self.fileStates) <= 1:
371      if len(self.fileStates) == 1:
372        return [self]
373      return []
374
375    commonDir = self.getCommonDir()
376    if commonDir is None:
377      prefixLength = 0
378    else:
379      prefixLength = len(commonDir) + 1 # skip the following '/'
380    groupsByDir = {}
381
382    for filePath, fileContent in self.fileStates.items():
383      subPath = filePath[prefixLength:]
384      slashIndex = subPath.find("/")
385      if slashIndex < 0:
386        if groupDirectFilesTogether:
387          firstDir = ""
388        else:
389          firstDir = subPath
390      else:
391        firstDir = subPath[:slashIndex]
392      if not firstDir in groupsByDir:
393        groupsByDir[firstDir] = FilesState()
394      groupsByDir[firstDir].add(filePath, fileContent)
395    return [group for group in groupsByDir.values()]
396
397  # splits into multiple, smaller, FilesState objects
398  def splitOnce(self, maxNumChildren = 2):
399    if self.size() <= 1:
400      return [self]
401    children = self.groupByDirs(True)
402    if len(children) == 1:
403      children = children[0].groupByDirs(False)
404    if len(children) > maxNumChildren:
405      # If there are lots of child directories, we still want to test a smaller number of larger groups before testing smaller groups
406      # So we arbitrarily recombine child directories to make a smaller number of children
407      minIndex = 0
408      mergedChildren = []
409      for i in range(maxNumChildren):
410        maxIndex = len(children) * (i + 1) // maxNumChildren
411        merge = FilesState()
412        for child in children[minIndex:maxIndex]:
413          merge.addAllFrom(child)
414        mergedChildren.append(merge)
415        minIndex = maxIndex
416      children = mergedChildren
417    return children
418
419  def summarize(self):
420    numFiles = self.size()
421    commonDir = self.getCommonDir()
422    if numFiles <= 4:
423      return str(self)
424    if commonDir is not None:
425      return str(numFiles) + " files under " + str(commonDir)
426    return str(numFiles) + " files"
427
428  def size(self):
429    return len(self.fileStates)
430
431  def __str__(self):
432    if len(self.fileStates) == 0:
433      return "[empty fileState]"
434    entries = []
435    for filePath, state in self.fileStates.items():
436      entries.append(filePath + " -> " + str(state))
437    if len(self.fileStates) > 1:
438      prefix = str(len(entries)) + " entries:\n"
439    else:
440      prefix = "1 entry: "
441    return prefix + "\n".join(entries)
442
443# Creates a FilesState matching the state of a directory on disk
444def filesStateFromTree(rootPath):
445  rootPath = os.path.abspath(rootPath)
446
447  paths = []
448  states = {}
449
450  for root, dirPaths, filePaths in os.walk(rootPath, topdown=True):
451    if len(filePaths) == 0 and len(dirPaths) == 0:
452      relPath = os.path.relpath(root, rootPath)
453      paths.append(relPath)
454      states[relPath] = Directory_FileContent()
455    # include every file and every symlink (even if the symlink points to a dir)
456    leaves = filePaths
457    for dirPath in dirPaths:
458      fullPath = os.path.join(root, dirPath)
459      if os.path.islink(fullPath):
460        leaves.append(dirPath)
461    for filePath in leaves:
462      fullPath = fileIo.join(root, filePath)
463      relPath = os.path.relpath(fullPath, rootPath)
464      paths.append(relPath)
465      states[relPath] = FileBacked_FileContent(fullPath)
466
467  paths = sorted(paths)
468  state = FilesState()
469  for path in paths:
470    state.add(path, states[path])
471  return state
472
473# runs a Test in this process
474def runJobInSameProcess(shellCommand, workPath, previousTestState, clean, fullTestState, description, twoWayPipe):
475  job = Test(shellCommand, workPath, previousTestState, clean, fullTestState, description, twoWayPipe)
476  job.runAndReport()
477
478# starts a Test in a new process
479def runJobInOtherProcess(shellCommand, workPath, previousTestState, clean, fullTestState, description, queue, identifier):
480  parentWriter, childReader = multiprocessing.Pipe()
481  childInfo = TwoWayPipe(childReader, queue, identifier)
482  process = multiprocessing.Process(target=runJobInSameProcess, args=(shellCommand, workPath, previousTestState, clean, fullTestState, description, childInfo,))
483  process.start()
484  return parentWriter
485
486class TwoWayPipe(object):
487  def __init__(self, readerConnection, writerQueue, identifier):
488    self.readerConnection = readerConnection
489    self.writerQueue = writerQueue
490    self.identifier = identifier
491
492# Stores a subprocess for running tests and some information about which tests to run
493class Test(object):
494  def __init__(self, shellCommand, workPath, previousTestState, clean, fullTestState, description, twoWayPipe):
495    # the test to run
496    self.shellCommand = shellCommand
497    # directory to run the test in
498    self.workPath = workPath
499    # The previous state that we were asked to test. If the test command didn't modify any files, then our working directly would exactly match this state
500    self.previousTestState = previousTestState
501    # whether to reset the worker's state to match the target state exactly
502    self.clean = clean
503    # the state to test
504    self.fullTestState = fullTestState
505    # description of changes
506    self.description = description
507    self.pipe = twoWayPipe
508
509  def runAndReport(self):
510    succeeded = False
511    postState = None
512    try:
513      succeeded = self.run()
514    finally:
515      print("^" * 100)
516      self.pipe.writerQueue.put((self.pipe.identifier, succeeded, self.clean))
517
518  def run(self):
519    print("#" * 100)
520    print("Checking " + self.description + " at " + str(datetime.datetime.now()))
521
522    if self.clean:
523      # update all files to match the target state
524      currentState = filesStateFromTree(self.workPath)
525      self.fullTestState.expandedWithEmptyEntriesFor(currentState, True).withoutDuplicatesFrom(currentState).apply(self.workPath)
526    else:
527      # just apply the difference from previousTestState to full_resetTo_state
528      self.fullTestState.expandedWithEmptyEntriesFor(self.previousTestState).withoutDuplicatesFrom(self.previousTestState).apply(self.workPath)
529
530    # run test
531    testStartSeconds = time.time()
532    testStart = datetime.datetime.now()
533    returnCode = ShellScript(self.shellCommand, self.workPath).process()
534    testEnd = datetime.datetime.now()
535    duration = (testEnd - testStart).total_seconds()
536
537    # report results
538    if returnCode == 0:
539      print("Passed: " + self.description + " at " + str(datetime.datetime.now()) + " in " + str(duration) + "s")
540      return True
541    else:
542      print("Failed: " + self.description + " at " + str(datetime.datetime.now()) + " in " + str(duration) + "s")
543      return False
544
545# keeps track of a plan for running a Test
546class Job(object):
547  def __init__(self, testState, ancestorSucceeded):
548    self.testState = testState
549    self.ancestorSucceeded = ancestorSucceeded
550
551  def size(self):
552    return self.testState.size()
553
554# Runner class that determines which diffs between two directories cause the given shell command to fail
555class DiffRunner(object):
556  def __init__(self, failingPath, passingPath, shellCommand, workPath, assumeInputStatesAreCorrect, allowGoalPassing, maxNumJobsAtOnce, timeoutSeconds):
557    # some simple params
558    self.workPath = os.path.abspath(workPath)
559    self.bestState_path = fileIo.join(self.workPath, "bestResults")
560    self.sampleFailure_path = fileIo.join(self.workPath, "sampleFailure")
561    self.testScript_path = fileIo.join(self.workPath, "test.sh")
562    fileIo.ensureDirExists(os.path.dirname(self.testScript_path))
563    fileIo.writeScript(self.testScript_path, shellCommand)
564    self.originalPassingPath = os.path.abspath(passingPath)
565    self.originalFailingPath = os.path.abspath(failingPath)
566    self.assumeInputStatesAreCorrect = assumeInputStatesAreCorrect
567    self.allowGoalPassing = allowGoalPassing
568    self.timeoutSeconds = timeoutSeconds
569
570    # lists of all the files under the two dirs
571    print("Finding files in " + passingPath)
572    self.originalPassingState = filesStateFromTree(passingPath)
573    print("Found " + self.originalPassingState.summarize() + " in " + str(passingPath))
574    print("")
575    print("Finding files in " + failingPath)
576    self.originalFailingState = filesStateFromTree(failingPath)
577    print("Found " + self.originalFailingState.summarize() + " in " + str(failingPath))
578    print("")
579    print("Identifying duplicates")
580    # list of the files in the state to reset to after each test
581    self.full_resetTo_state = self.originalPassingState
582    # minimal description of only the files that are supposed to need to be reset after each test
583    self.resetTo_state = self.originalPassingState.expandedWithEmptyEntriesFor(self.originalFailingState).withoutDuplicatesFrom(self.originalFailingState, True)
584    self.targetState = self.originalFailingState.expandedWithEmptyEntriesFor(self.originalPassingState).withoutDuplicatesFrom(self.originalPassingState, True)
585    self.originalNumDifferences = self.resetTo_state.size()
586    print("Processing " + str(self.originalNumDifferences) + " file differences")
587    self.maxNumJobsAtOnce = maxNumJobsAtOnce
588
589  def cleanupTempDirs(self):
590    print("Clearing work directories")
591    numAttempts = 3
592    for attempt in range(numAttempts):
593      if os.path.isdir(self.workPath):
594        for child in os.listdir(self.workPath):
595          if child.startswith("job-"):
596            path = os.path.join(self.workPath, child)
597            try:
598              fileIo.removePath(path)
599            except IOError as e:
600              if attempt >= numAttempts - 1:
601                raise Exception("Failed to remove " + path, e)
602
603  def runnerTest(self, testState, timeout = None):
604    self.cleanupTempDirs()
605    workPath = self.getWorkPath(0)
606    testState.apply(workPath)
607    start = datetime.datetime.now()
608    returnCode = ShellScript(self.testScript_path, workPath).process()
609    duration = (datetime.datetime.now() - start).total_seconds()
610    print("shell command completed in " + str(duration))
611    if returnCode == 0:
612      return (True, duration)
613    else:
614      return (False, duration)
615
616  def onSuccess(self, testState):
617    #print("Runner received success of testState: " + str(testState.summarize()))
618    if debug:
619      if not filesStateFromTree(self.bestState_path).checkSameKeys(self.full_resetTo_state.withoutEmptyEntries()):
620        print("Contents of " + self.bestState_path + " don't match self.full_resetTo_state at beginning of onSuccess")
621        sys.exit(1)
622    self.targetState = self.targetState.withoutDuplicatesFrom(testState)
623    self.resetTo_state = self.resetTo_state.withConflictsFrom(testState).withoutDuplicatesFrom(testState)
624    delta = self.full_resetTo_state.expandedWithEmptyEntriesFor(testState).withConflictsFrom(testState, True).withoutDuplicatesFrom(self.full_resetTo_state)
625    self.full_resetTo_state = self.full_resetTo_state.expandedWithEmptyEntriesFor(delta).withConflictsFrom(delta)
626    # Update results path for the user to look at
627    if os.path.exists(self.bestState_path):
628      # The previous results are still there, so we just apply the difference between the previous and new best results
629      delta.apply(self.bestState_path)
630    else:
631      # The previous results are missing (most likely moved/deleted by the user) so we save them again
632      self.full_resetTo_state.apply(self.bestState_path)
633    if debug:
634      if not filesStateFromTree(self.bestState_path).checkSameKeys(self.full_resetTo_state.withoutEmptyEntries()):
635        print("Contents of " + self.bestState_path + " don't match self.full_resetTo_state at end of onSuccess")
636        print("Applied this delta: " + str(delta))
637        sys.exit(1)
638
639  def getWorkPath(self, jobId):
640    return os.path.join(self.workPath, "job-" + str(jobId))
641
642  def run(self):
643    start = datetime.datetime.now()
644    numIterationsCompleted = 0
645    self.cleanupTempDirs()
646    workPath = self.getWorkPath(0)
647    if not self.assumeInputStatesAreCorrect:
648      print("Testing that the given failing state actually fails")
649      fileIo.removePath(workPath)
650      if self.runnerTest(self.originalFailingState)[0]:
651        if self.allowGoalPassing:
652          print("\nGiven goal state at " + self.originalFailingPath + " passes, so it is the best result")
653          self.cleanupTempDirs()
654          fileIo.removePath(self.bestState_path)
655          self.originalFailingState.apply(self.bestState_path)
656          return True
657        print("\nGiven goal state at " + self.originalFailingPath + " does not fail! Pass --allow-goal-passing if this is intentional")
658        return False
659      # clean up temporary dirs in case any daemons remain running
660      self.cleanupTempDirs()
661
662      print("Testing that the given passing state actually passes")
663      if not self.runnerTest(self.full_resetTo_state)[0]:
664        print("\nGiven passing state at " + self.originalPassingPath + " does not actually pass!")
665        return False
666      # clean up temporary dirs in case any daemons remain running
667      self.cleanupTempDirs()
668
669    print("Saving best state found so far")
670    fileIo.removePath(self.bestState_path)
671    self.full_resetTo_state.apply(self.bestState_path)
672
673    print("Starting")
674    print("You can inspect " + self.bestState_path + " while this process runs, to observe the best state discovered so far")
675    print("You can inspect " + self.sampleFailure_path + " while this process runs, to observe a state for which the test failed. If you delete this filepath, then it will be updated later to contain a new failing state")
676    print("")
677    # Now we search over groups of inodes (files or dirs) in the tree
678    # Every time we encounter a group of inodes, we try replacing them and seeing if the replacement passes our test
679    # If it does, we accept those changes and continue searching
680    # If it doesn't, we split that group into smaller groups and continue
681    jobId = 0
682    workingDir = self.getWorkPath(jobId)
683    queue = multiprocessing.Queue()
684    activeJobsById = {}
685    workerStatesById = {}
686    consecutiveIncrementalBuildsById = {}
687    initialSplitSize = 2
688    if self.maxNumJobsAtOnce != "auto" and self.maxNumJobsAtOnce > 2:
689      initialSplitSize = self.maxNumJobsAtOnce
690    availableJobs = [Job(testState, False) for testState in self.targetState.splitOnce(initialSplitSize)]
691    numConsecutiveFailures = 0
692    numFailuresSinceLastSplitOrSuccess = 0
693    numCompletionsSinceLastPoolSizeChange = 0
694    invalidatedIds = set()
695    probablyAcceptableStates = []
696    numCompletedTests = 2 # Already tested initial passing state and initial failing state
697    numJobsAtFirstSuccessAfterMerge = None
698    timedOut = False
699    summaryLogPath = os.path.join(self.workPath, "diff-filterer.log")
700    summaryLog = open(summaryLogPath, "w")
701    summaryLog.write("diff-filterer.py starting at " + str(datetime.datetime.now()))
702    summaryLog.flush()
703    # continue until all files fail and no jobs are running
704    while (numFailuresSinceLastSplitOrSuccess < self.resetTo_state.size() and not timedOut) or len(activeJobsById) > 0:
705      # display status message
706      now = datetime.datetime.now()
707      elapsedDuration = now - start
708      minNumTestsRemaining = sum([math.log(job.testState.size(), 2) + 1 for job in availableJobs + list(activeJobsById.values())]) - numFailuresSinceLastSplitOrSuccess
709      estimatedNumTestsRemaining = max(minNumTestsRemaining, 1)
710      if numConsecutiveFailures >= 4 and numFailuresSinceLastSplitOrSuccess < 1:
711        # If we are splitting often and failing often, then we probably haven't yet
712        # shrunken the individual boxes down to each contain only one failing file
713        # During this phase, on average we've completed half of the work
714        # So, we estimate that the total work remaining is double what we've completed
715        estimatedNumTestsRemaining *= 2
716      estimatedRemainingDuration = datetime.timedelta(seconds = elapsedDuration.total_seconds() * float(estimatedNumTestsRemaining) / float(numCompletedTests))
717      message = "Elapsed duration: " + str(elapsedDuration) + ". Waiting for " + str(len(activeJobsById)) + " active subprocesses (" + str(len(availableJobs) + len(activeJobsById)) + " total available jobs). " + str(self.resetTo_state.size()) + " changes left to test, should take about " + str(estimatedNumTestsRemaining) + " tests, about " + str(estimatedRemainingDuration)
718      print(message)
719      if self.timeoutSeconds is not None:
720        # what fraction of the time is left
721        remainingTimeFraction = 1.0 - (elapsedDuration.total_seconds() / self.timeoutSeconds)
722        # how many jobs there will be if we add another one
723        possibleNumPendingJobs = len(activeJobsById) + 1
724        if possibleNumPendingJobs / (numCompletedTests + possibleNumPendingJobs) > remainingTimeFraction:
725          # adding one more job would be likely to cause us to exceed our time limit
726          timedOut = True
727
728      if len(activeJobsById) > 0:
729        # wait for a response from a worker
730        identifier, didAcceptState, clean = queue.get()
731        job = activeJobsById[identifier]
732        numCompletedTests += 1
733        numCompletionsSinceLastPoolSizeChange += 1
734        if didAcceptState:
735          numConsecutiveFailures = 0
736          numFailuresSinceLastSplitOrSuccess = 0
737          acceptedState = job.testState
738          maxRunningSize = max([job.testState.size() for job in activeJobsById.values()])
739          maxRelevantSize = maxRunningSize / len(activeJobsById)
740          if acceptedState.size() < maxRelevantSize:
741            print("Queuing a retest of response of size " + str(acceptedState.size()) + " from job " + str(identifier) + " because a much larger job of size " + str(maxRunningSize) + " is still running")
742            probablyAcceptableStates.append(acceptedState)
743          else:
744            if identifier in invalidatedIds:
745              print("Queuing a retest of response from job " + str(identifier) + " due to previous invalidation. Successful state: " + str(acceptedState.summarize()))
746              probablyAcceptableStates.append(acceptedState)
747            else:
748              if not clean:
749                print("Queuing a clean retest of incremental success from job " + str(identifier))
750                probablyAcceptableStates.append(acceptedState)
751              else:
752                print("Accepting clean success from job " + str(identifier))
753                summaryLog.write("Succeeded : " + acceptedState.summarize() + " (job " + str(identifier) + ") at " + str(datetime.datetime.now()) + "\n")
754                summaryLog.flush()
755                # A worker discovered a nonempty change that can be made successfully; update our best accepted state
756                self.onSuccess(acceptedState)
757                if debug:
758                  # The files in self.bestState_path should exactly match what's in workPath[identifier], except for files that didn't originally exist
759                  if not filesStateFromTree(self.bestState_path).checkSameKeys(filesStateFromTree(self.getWorkPath(identifier)).restrictedToKeysIn(self.originalPassingState.expandedWithEmptyEntriesFor(self.originalFailingState))):
760                    print("Successful state from work path " + str(identifier) + " wasn't correctly copied to bestState. Could the test command be deleting files that previously existed?")
761                    sys.exit(1)
762                # record that the results from any previously started process are no longer guaranteed to be valid
763                for i in activeJobsById.keys():
764                  if i != identifier:
765                    invalidatedIds.add(i)
766                # record our first success
767                if numJobsAtFirstSuccessAfterMerge is None:
768                  numJobsAtFirstSuccessAfterMerge = len(availableJobs) + len(activeJobsById)
769        else:
770          testState = job.testState
771          if not os.path.isdir(self.sampleFailure_path):
772            # save sample failure path where user can see it
773            print("Saving sample failed state to " + str(self.sampleFailure_path))
774            # write to a temporary directory so if a user looks at this path while we're writing, they don't see incomplete results
775            tempPath = self.sampleFailure_path + ".temp"
776            fileIo.removePath(tempPath)
777            fileIo.ensureDirExists(tempPath)
778            self.full_resetTo_state.expandedWithEmptyEntriesFor(testState).withConflictsFrom(testState, True).apply(tempPath)
779            # rename temporary directory
780            if os.path.exists(tempPath):
781              fileIo.move(tempPath, self.sampleFailure_path)
782          # count failures
783          numConsecutiveFailures += 1
784          numFailuresSinceLastSplitOrSuccess += 1
785          # find any children that failed and queue a re-test of those children
786          updatedChild = testState.withoutDuplicatesFrom(testState.withConflictsFrom(self.resetTo_state))
787          if updatedChild.size() > 0:
788            if numConsecutiveFailures >= 4:
789              # Suppose we are trying to identify n single-file changes that cause failures
790              # Suppose we have tried c changes of size s, each one of which failed
791              # We conclude that n >= c
792              # A mostly unbiased estimate of c as a function of n is that c = n / 2
793              # Similarly, a mostly unbiased estimate of n is that n = c * 2
794              # We want to choose a new number of changes to test, c2, such that running c2 tests results in efficiently identifying the relevant n changes
795              # Let's set c2 = 2 * n = 2 * 2 * c
796              splitFactor = 4
797            else:
798              # After we reach a sufficiently small change size such that some changes start passing,
799              # Then we assume that we've probably narrowed down to each individual failing change,
800              # And we can increase block sizes more slowly
801              splitFactor = 2
802            split = updatedChild.splitOnce(splitFactor)
803            if len(split) > 1:
804              numFailuresSinceLastSplitOrSuccess = 0
805            for testState in split:
806              availableJobs.append(Job(testState, job.ancestorSucceeded))
807        # clear invalidation status
808        if identifier in invalidatedIds:
809          invalidatedIds.remove(identifier)
810        del activeJobsById[identifier]
811        # Check whether we've had enough failures lately to warrant checking for the possibility of dependencies among files
812        if numJobsAtFirstSuccessAfterMerge is not None:
813          if len(availableJobs) > 3 * numJobsAtFirstSuccessAfterMerge:
814            # It's plausible that every file in one directory depends on every file in another directory
815            # If this happens, then after we delete the dependent directory, we can delete the dependency directory too
816            # To make sure that we consider deleting the dependency directory, we recombine all of our states and start splitting from there
817            print("#############################################################")
818            print("#                                                           #")
819            print("# Lots of failures (" + str(len(availableJobs)) + " available jobs) since first success (" + str(numJobsAtFirstSuccessAfterMerge) + ")!")
820            print("# Recombining all states in case we uncovered a dependency! #")
821            print("#                                                           #")
822            print("#############################################################")
823            rejoinedState = FilesState()
824            for job in availableJobs:
825              state = job.testState
826              rejoinedState = rejoinedState.expandedWithEmptyEntriesFor(state).withConflictsFrom(state)
827            rejoinedState = rejoinedState.withoutDuplicatesFrom(self.resetTo_state)
828            availableJobs = [Job(testState, False) for testState in rejoinedState.splitOnce(initialSplitSize)]
829            numFailuresSinceLastSplitOrSuccess = 0
830            numJobsAtFirstSuccessAfterMerge = None
831            numCompletionsSinceLastPoolSizeChange = 0
832
833      # if probablyAcceptableStates has become large enough, then retest its contents too
834      if len(probablyAcceptableStates) > 0 and (len(probablyAcceptableStates) >= len(activeJobsById) + 1 or numConsecutiveFailures >= len(activeJobsById) or len(activeJobsById) < 1):
835        probablyAcceptableState = FilesState()
836        for state in probablyAcceptableStates:
837          probablyAcceptableState = probablyAcceptableState.expandedWithEmptyEntriesFor(state).withConflictsFrom(state)
838        probablyAcceptableState = probablyAcceptableState.withoutDuplicatesFrom(self.resetTo_state)
839        if probablyAcceptableState.size() > 0:
840          print("Retesting " + str(len(probablyAcceptableStates)) + " previous likely successful states as a single test: " + probablyAcceptableState.summarize())
841          availableJobs = [Job(probablyAcceptableState, True)] + availableJobs
842        probablyAcceptableStates = []
843      if len(availableJobs) < 1 and len(activeJobsById) < 1:
844        print("Error: no changes remain left to test. It was expected that applying all changes would fail")
845        break
846
847      # if we haven't checked everything yet, then try to queue more jobs
848      if numFailuresSinceLastSplitOrSuccess < self.resetTo_state.size():
849        availableJobs.sort(reverse=True, key=Job.size)
850
851        if self.maxNumJobsAtOnce != "auto":
852          targetNumJobs = self.maxNumJobsAtOnce
853        else:
854          # If N jobs are running then wait for all N to fail before increasing the number of running jobs
855            # Recalibrate the number of processes based on the system load
856            systemUsageStats = cpuStats.cpu_times_percent()
857            systemIdleFraction = systemUsageStats.idle / 100
858            if systemIdleFraction >= 0.5:
859              if numCompletionsSinceLastPoolSizeChange <= len(activeJobsById):
860                # Not much time has passed since the previous time we changed the pool size
861                targetNumJobs = len(activeJobsById) + 1 # just replace existing job
862              else:
863                # We've been using less than the target capacity for a while, so add another job
864                targetNumJobs = len(activeJobsById) + 2 # replace existing job and add a new one
865                numCompletionsSinceLastPoolSizeChange = 0
866            else:
867              targetNumJobs = len(activeJobsById) # don't replace existing job
868              numCompletionsSinceLastPoolSizeChange = 0
869
870              if targetNumJobs < 1:
871                targetNumJobs = 1
872            print("System idle = " + str(systemIdleFraction) + ", current num jobs = " + str(len(activeJobsById) + 1) + ", target num jobs = " + str(targetNumJobs))
873
874        if timedOut:
875          print("Timeout reached, not starting new jobs")
876        else:
877          while len(activeJobsById) < targetNumJobs and len(activeJobsById) < self.resetTo_state.size() and len(availableJobs) > 0:
878            # find next pending job
879            job = availableJobs[0]
880            # find next unused job id
881            workerId = 0
882            while workerId in activeJobsById:
883              workerId += 1
884            # start job
885            workingDir = self.getWorkPath(workerId)
886            if workerId in workerStatesById:
887              workerPreviousState = workerStatesById[workerId]
888            else:
889              workerPreviousState = FilesState()
890            testState = job.testState
891
892            # If job.ancestorSucceeded, then this job came from another job that succeeded (it's either the union of several jobs that succeeded, or a piece of a job that succeeded).
893            # However, if we get here, then this job failed.
894            # So, joining or splitting this job's successful ancestor(s) created a failed job (this one).
895            # So, in the future it's also likely that we'll find jobs that succeed on their own but if joined will fail.
896            # So, in the future we don't want to join all successful jobs (because that could be likely to introduce a failure).
897            # Any successful jobs in the future that we don't accept, we join together.
898            # So, we want to accept a successful job soon.
899            # We can only accept the results of clean builds (because for incremental builds we're not sure that the results are reliable)
900            # So, if job.ancestorSucceeded, we make the descendants of this job be clean
901            #
902            # Also, we want each worker to occasionally use a new state in case so that incremental errors can't remain forever
903            clean = job.ancestorSucceeded
904            if workerId in consecutiveIncrementalBuildsById:
905              consecutiveIncrementalBuilds = consecutiveIncrementalBuildsById[workerId]
906              if consecutiveIncrementalBuilds >= 10:
907                clean = True
908                consecutiveIncrementalBuilds = 0
909            else:
910              consecutiveIncrementalBuilds = 0
911              # Also, if this worker hasn't run any jobs yet, then we don't expect it to have any leftover files, so an incremental test is essentially equivalent to a clean test anyway
912              # We ask the worker to run a clean test so that if it succeeds, we can detect that the success started from a clean state
913              clean = True
914            consecutiveIncrementalBuildsById[workerId] = 0
915            fullTestState = self.full_resetTo_state.expandedWithEmptyEntriesFor(testState).withConflictsFrom(testState)
916            description = testState.summarize() + " (job " + str(workerId) + ", "
917            if clean:
918              description += "clean"
919            else:
920              description += "incremental"
921            description += ")"
922            runJobInOtherProcess(self.testScript_path, workingDir, workerPreviousState, clean, fullTestState, description, queue, workerId)
923            activeJobsById[workerId] = job
924            workerStatesById[workerId] = fullTestState
925            availableJobs = availableJobs[1:]
926
927    if timedOut:
928      wasSuccessful = False
929    else:
930      print("double-checking results")
931      wasSuccessful = True
932      if not self.runnerTest(filesStateFromTree(self.bestState_path))[0]:
933        message = "Error: expected best state at " + self.bestState_path + " did not pass the second time. Could the test be non-deterministic?"
934        if self.assumeInputStatesAreCorrect:
935          message += " (it may help to remove the --assume-input-states-are-correct flag)"
936        print(message)
937        wasSuccessful = False
938
939    self.cleanupTempDirs()
940
941    print("")
942    if self.targetState.size() < 1000:
943      filesDescription = str(self.targetState)
944    else:
945      filesDescription = str(self.targetState.summarize())
946    print("Done trying to transform the contents of passing path:\n " + self.originalPassingPath + "\ninto the contents of failing path:\n " + self.originalFailingPath)
947    print("Of " + str(self.originalNumDifferences) + " differences, could not accept: " + filesDescription)
948    print("The final accepted state can be seen at " + self.bestState_path)
949    print("Ran " + str(numCompletedTests) + " tests")
950    if timedOut:
951      print("Note that these results might not be optimal due to reaching the timeout of " + str(self.timeoutSeconds) + " seconds")
952    return wasSuccessful
953
954def main(args):
955  assumeInputStatesAreCorrect = False
956  allowGoalPassing = False
957  workPath = "/tmp/diff-filterer"
958  timeoutSeconds = None
959  maxNumJobsAtOnce = 1
960  while len(args) > 0:
961    arg = args[0]
962    if arg == "--assume-input-states-are-correct":
963      assumeInputStatesAreCorrect = True
964      args = args[1:]
965      continue
966    if arg == "--allow-goal-passing":
967      allowGoalPassing = True
968      args = args[1:]
969      continue
970    if arg == "--work-path":
971      if len(args) < 2:
972        usage()
973      workPath = args[1]
974      args = args[2:]
975      continue
976    if arg == "--num-jobs":
977      if len(args) < 2:
978        usage()
979      val = args[1]
980      if val == "auto":
981        maxNumJobsAtOnce = val
982      else:
983        maxNumJobsAtOnce = int(val)
984      args = args[2:]
985      continue
986    if arg == "--timeout":
987      if len(args) < 2:
988        usage()
989      val = args[1]
990      timeoutSeconds = float(val)
991      args = args[2:]
992      continue
993    if arg == "--debug":
994      global debug
995      debug = True
996      args = args[1:]
997      continue
998    if len(arg) > 0 and arg[0] == "-":
999      print("Unrecognized argument: '" + arg + "'")
1000      usage()
1001    break
1002  if len(args) != 3:
1003    usage()
1004  passingPath = args[0]
1005  failingPath = args[1]
1006  shellCommand = args[2]
1007  startTime = datetime.datetime.now()
1008  if not os.path.exists(passingPath):
1009    print("Specified passing path " + passingPath + " does not exist")
1010    sys.exit(1)
1011  if not os.path.exists(failingPath):
1012    print("Specified failing path " + failingPath + " does not exist")
1013    sys.exit(1)
1014  success = DiffRunner(failingPath, passingPath, shellCommand, workPath, assumeInputStatesAreCorrect, allowGoalPassing, maxNumJobsAtOnce, timeoutSeconds).run()
1015  endTime = datetime.datetime.now()
1016  duration = endTime - startTime
1017  if success:
1018    print("Succeeded in " + str(duration))
1019  else:
1020    print("Failed in " + str(duration))
1021    sys.exit(1)
1022
1023main(sys.argv[1:])
1024