• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
2# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
3"""
4Module to find differences over time in a filesystem
5
6Basically this takes a snapshot of a directory, then sees what changes
7were made.  The contents of the files are not checked, so you can
8detect that the content was changed, but not what the old version of
9the file was.
10"""
11
12import os
13from fnmatch import fnmatch
14from datetime import datetime
15
16try:
17    # Python 3
18    import collections.UserDict as IterableUserDict
19except ImportError:
20    # Python 2.5-2.7
21    from UserDict import IterableUserDict
22import operator
23import re
24
25__all__ = ['Diff', 'Snapshot', 'File', 'Dir', 'report_expected_diffs',
26           'show_diff']
27
28class Diff(object):
29
30    """
31    Represents the difference between two snapshots
32    """
33
34    def __init__(self, before, after):
35        self.before = before
36        self.after = after
37        self._calculate()
38
39    def _calculate(self):
40        before = self.before.data
41        after = self.after.data
42        self.deleted = {}
43        self.updated = {}
44        self.created = after.copy()
45        for path, f in before.items():
46            if path not in after:
47                self.deleted[path] = f
48                continue
49            del self.created[path]
50            if f.mtime < after[path].mtime:
51                self.updated[path] = after[path]
52
53    def __str__(self):
54        return self.report()
55
56    def report(self, header=True, dates=False):
57        s = []
58        if header:
59            s.append('Difference in %s from %s to %s:' %
60                     (self.before.base_path,
61                      self.before.calculated,
62                      self.after.calculated))
63        for name, files, show_size in [
64            ('created', self.created, True),
65            ('deleted', self.deleted, True),
66            ('updated', self.updated, True)]:
67            if files:
68                s.append('-- %s: -------------------' % name)
69                files = files.items()
70                files.sort()
71                last = ''
72                for path, f in files:
73                    t = '  %s' % _space_prefix(last, path, indent=4,
74                                               include_sep=False)
75                    last = path
76                    if show_size and f.size != 'N/A':
77                        t += '  (%s bytes)' % f.size
78                    if dates:
79                        parts = []
80                        if self.before.get(path):
81                            parts.append(self.before[path].mtime)
82                        if self.after.get(path):
83                            parts.append(self.after[path].mtime)
84                        t += ' (mtime: %s)' % ('->'.join(map(repr, parts)))
85                    s.append(t)
86        if len(s) == 1:
87            s.append('  (no changes)')
88        return '\n'.join(s)
89
90class Snapshot(IterableUserDict):
91
92    """
93    Represents a snapshot of a set of files.  Has a dictionary-like
94    interface, keyed relative to ``base_path``
95    """
96
97    def __init__(self, base_path, files=None, ignore_wildcards=(),
98                 ignore_paths=(), ignore_hidden=True):
99        self.base_path = base_path
100        self.ignore_wildcards = ignore_wildcards
101        self.ignore_hidden = ignore_hidden
102        self.ignore_paths = ignore_paths
103        self.calculated = None
104        self.data = files or {}
105        if files is None:
106            self.find_files()
107
108    ############################################################
109    ## File finding
110    ############################################################
111
112    def find_files(self):
113        """
114        Find all the files under the base path, and put them in
115        ``self.data``
116        """
117        self._find_traverse('', self.data)
118        self.calculated = datetime.now()
119
120    def _ignore_file(self, fn):
121        if fn in self.ignore_paths:
122            return True
123        if self.ignore_hidden and os.path.basename(fn).startswith('.'):
124            return True
125        for pat in self.ignore_wildcards:
126            if fnmatch(fn, pat):
127                return True
128        return False
129
130    def _find_traverse(self, path, result):
131        full = os.path.join(self.base_path, path)
132        if os.path.isdir(full):
133            if path:
134                # Don't actually include the base path
135                result[path] = Dir(self.base_path, path)
136            for fn in os.listdir(full):
137                fn = os.path.join(path, fn)
138                if self._ignore_file(fn):
139                    continue
140                self._find_traverse(fn, result)
141        else:
142            result[path] = File(self.base_path, path)
143
144    def __repr__(self):
145        return '<%s in %r from %r>' % (
146            self.__class__.__name__, self.base_path,
147            self.calculated or '(no calculation done)')
148
149    def compare_expected(self, expected, comparison=operator.eq,
150                         differ=None, not_found=None,
151                         include_success=False):
152        """
153        Compares a dictionary of ``path: content`` to the
154        found files.  Comparison is done by equality, or the
155        ``comparison(actual_content, expected_content)`` function given.
156
157        Returns dictionary of differences, keyed by path.  Each
158        difference is either noted, or the output of
159        ``differ(actual_content, expected_content)`` is given.
160
161        If a file does not exist and ``not_found`` is given, then
162        ``not_found(path)`` is put in.
163        """
164        result = {}
165        for path in expected:
166            orig_path = path
167            path = path.strip('/')
168            if path not in self.data:
169                if not_found:
170                    msg = not_found(path)
171                else:
172                    msg = 'not found'
173                result[path] = msg
174                continue
175            expected_content = expected[orig_path]
176            file = self.data[path]
177            actual_content = file.bytes
178            if not comparison(actual_content, expected_content):
179                if differ:
180                    msg = differ(actual_content, expected_content)
181                else:
182                    if len(actual_content) < len(expected_content):
183                        msg = 'differ (%i bytes smaller)' % (
184                            len(expected_content) - len(actual_content))
185                    elif len(actual_content) > len(expected_content):
186                        msg = 'differ (%i bytes larger)' % (
187                            len(actual_content) - len(expected_content))
188                    else:
189                        msg = 'diff (same size)'
190                result[path] = msg
191            elif include_success:
192                result[path] = 'same!'
193        return result
194
195    def diff_to_now(self):
196        return Diff(self, self.clone())
197
198    def clone(self):
199        return self.__class__(base_path=self.base_path,
200                              ignore_wildcards=self.ignore_wildcards,
201                              ignore_paths=self.ignore_paths,
202                              ignore_hidden=self.ignore_hidden)
203
204class File(object):
205
206    """
207    Represents a single file found as the result of a command.
208
209    Has attributes:
210
211    ``path``:
212        The path of the file, relative to the ``base_path``
213
214    ``full``:
215        The full path
216
217    ``stat``:
218        The results of ``os.stat``.  Also ``mtime`` and ``size``
219        contain the ``.st_mtime`` and ``st_size`` of the stat.
220
221    ``bytes``:
222        The contents of the file.
223
224    You may use the ``in`` operator with these objects (tested against
225    the contents of the file), and the ``.mustcontain()`` method.
226    """
227
228    file = True
229    dir = False
230
231    def __init__(self, base_path, path):
232        self.base_path = base_path
233        self.path = path
234        self.full = os.path.join(base_path, path)
235        self.stat = os.stat(self.full)
236        self.mtime = self.stat.st_mtime
237        self.size = self.stat.st_size
238        self._bytes = None
239
240    def bytes__get(self):
241        if self._bytes is None:
242            f = open(self.full, 'rb')
243            self._bytes = f.read()
244            f.close()
245        return self._bytes
246    bytes = property(bytes__get)
247
248    def __contains__(self, s):
249        return s in self.bytes
250
251    def mustcontain(self, s):
252        __tracebackhide__ = True
253        bytes = self.bytes
254        if s not in bytes:
255            print('Could not find %r in:' % s)
256            print(bytes)
257            assert s in bytes
258
259    def __repr__(self):
260        return '<%s %s:%s>' % (
261            self.__class__.__name__,
262            self.base_path, self.path)
263
264class Dir(File):
265
266    """
267    Represents a directory created by a command.
268    """
269
270    file = False
271    dir = True
272
273    def __init__(self, base_path, path):
274        self.base_path = base_path
275        self.path = path
276        self.full = os.path.join(base_path, path)
277        self.size = 'N/A'
278        self.mtime = 'N/A'
279
280    def __repr__(self):
281        return '<%s %s:%s>' % (
282            self.__class__.__name__,
283            self.base_path, self.path)
284
285    def bytes__get(self):
286        raise NotImplementedError(
287            "Directory %r doesn't have content" % self)
288
289    bytes = property(bytes__get)
290
291
292def _space_prefix(pref, full, sep=None, indent=None, include_sep=True):
293    """
294    Anything shared by pref and full will be replaced with spaces
295    in full, and full returned.
296
297    Example::
298
299        >>> _space_prefix('/foo/bar', '/foo')
300        '    /bar'
301    """
302    if sep is None:
303        sep = os.path.sep
304    pref = pref.split(sep)
305    full = full.split(sep)
306    padding = []
307    while pref and full and pref[0] == full[0]:
308        if indent is None:
309            padding.append(' ' * (len(full[0]) + len(sep)))
310        else:
311            padding.append(' ' * indent)
312        full.pop(0)
313        pref.pop(0)
314    if padding:
315        if include_sep:
316            return ''.join(padding) + sep + sep.join(full)
317        else:
318            return ''.join(padding) + sep.join(full)
319    else:
320        return sep.join(full)
321
322def report_expected_diffs(diffs, colorize=False):
323    """
324    Takes the output of compare_expected, and returns a string
325    description of the differences.
326    """
327    if not diffs:
328        return 'No differences'
329    diffs = diffs.items()
330    diffs.sort()
331    s = []
332    last = ''
333    for path, desc in diffs:
334        t = _space_prefix(last, path, indent=4, include_sep=False)
335        if colorize:
336            t = color_line(t, 11)
337        last = path
338        if len(desc.splitlines()) > 1:
339            cur_indent = len(re.search(r'^[ ]*', t).group(0))
340            desc = indent(cur_indent+2, desc)
341            if colorize:
342                t += '\n'
343                for line in desc.splitlines():
344                    if line.strip().startswith('+'):
345                        line = color_line(line, 10)
346                    elif line.strip().startswith('-'):
347                        line = color_line(line, 9)
348                    else:
349                        line = color_line(line, 14)
350                    t += line+'\n'
351            else:
352                t += '\n' + desc
353        else:
354            t += ' '+desc
355        s.append(t)
356    s.append('Files with differences: %s' % len(diffs))
357    return '\n'.join(s)
358
359def color_code(foreground=None, background=None):
360    """
361    0  black
362    1  red
363    2  green
364    3  yellow
365    4  blue
366    5  magenta (purple)
367    6  cyan
368    7  white (gray)
369
370    Add 8 to get high-intensity
371    """
372    if foreground is None and background is None:
373        # Reset
374        return '\x1b[0m'
375    codes = []
376    if foreground is None:
377        codes.append('[39m')
378    elif foreground > 7:
379        codes.append('[1m')
380        codes.append('[%im' % (22+foreground))
381    else:
382        codes.append('[%im' % (30+foreground))
383    if background is None:
384        codes.append('[49m')
385    else:
386        codes.append('[%im' % (40+background))
387    return '\x1b' + '\x1b'.join(codes)
388
389def color_line(line, foreground=None, background=None):
390    match = re.search(r'^(\s*)', line)
391    return (match.group(1) + color_code(foreground, background)
392            + line[match.end():] + color_code())
393
394def indent(indent, text):
395    return '\n'.join(
396        [' '*indent + l for l in text.splitlines()])
397
398def show_diff(actual_content, expected_content):
399    actual_lines = [l.strip() for l in actual_content.splitlines()
400                    if l.strip()]
401    expected_lines = [l.strip() for l in expected_content.splitlines()
402                      if l.strip()]
403    if len(actual_lines) == len(expected_lines) == 1:
404        return '%r not %r' % (actual_lines[0], expected_lines[0])
405    if not actual_lines:
406        return 'Empty; should have:\n'+expected_content
407    import difflib
408    return '\n'.join(difflib.ndiff(actual_lines, expected_lines))
409