• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Cache lines from Python source files.
2
3This is intended to read lines from modules imported -- hence if a filename
4is not found, it will look down the module search path for a file by
5that name.
6"""
7
8import functools
9import sys
10import os
11import tokenize
12
13__all__ = ["getline", "clearcache", "checkcache"]
14
15def getline(filename, lineno, module_globals=None):
16    lines = getlines(filename, module_globals)
17    if 1 <= lineno <= len(lines):
18        return lines[lineno-1]
19    else:
20        return ''
21
22
23# The cache
24
25# The cache. Maps filenames to either a thunk which will provide source code,
26# or a tuple (size, mtime, lines, fullname) once loaded.
27cache = {}
28
29
30def clearcache():
31    """Clear the cache entirely."""
32
33    global cache
34    cache = {}
35
36
37def getlines(filename, module_globals=None):
38    """Get the lines for a Python source file from the cache.
39    Update the cache if it doesn't contain an entry for this file already."""
40
41    if filename in cache:
42        entry = cache[filename]
43        if len(entry) != 1:
44            return cache[filename][2]
45
46    try:
47        return updatecache(filename, module_globals)
48    except MemoryError:
49        clearcache()
50        return []
51
52
53def checkcache(filename=None):
54    """Discard cache entries that are out of date.
55    (This is not checked upon each call!)"""
56
57    if filename is None:
58        filenames = list(cache.keys())
59    else:
60        if filename in cache:
61            filenames = [filename]
62        else:
63            return
64
65    for filename in filenames:
66        entry = cache[filename]
67        if len(entry) == 1:
68            # lazy cache entry, leave it lazy.
69            continue
70        size, mtime, lines, fullname = entry
71        if mtime is None:
72            continue   # no-op for files loaded via a __loader__
73        try:
74            stat = os.stat(fullname)
75        except OSError:
76            del cache[filename]
77            continue
78        if size != stat.st_size or mtime != stat.st_mtime:
79            del cache[filename]
80
81
82def updatecache(filename, module_globals=None):
83    """Update a cache entry and return its list of lines.
84    If something's wrong, print a message, discard the cache entry,
85    and return an empty list."""
86
87    if filename in cache:
88        if len(cache[filename]) != 1:
89            del cache[filename]
90    if not filename or (filename.startswith('<') and filename.endswith('>')):
91        return []
92
93    fullname = filename
94    try:
95        stat = os.stat(fullname)
96    except OSError:
97        basename = filename
98
99        # Realise a lazy loader based lookup if there is one
100        # otherwise try to lookup right now.
101        if lazycache(filename, module_globals):
102            try:
103                data = cache[filename][0]()
104            except (ImportError, OSError):
105                pass
106            else:
107                if data is None:
108                    # No luck, the PEP302 loader cannot find the source
109                    # for this module.
110                    return []
111                cache[filename] = (
112                    len(data), None,
113                    [line+'\n' for line in data.splitlines()], fullname
114                )
115                return cache[filename][2]
116
117        # Try looking through the module search path, which is only useful
118        # when handling a relative filename.
119        if os.path.isabs(filename):
120            return []
121
122        for dirname in sys.path:
123            try:
124                fullname = os.path.join(dirname, basename)
125            except (TypeError, AttributeError):
126                # Not sufficiently string-like to do anything useful with.
127                continue
128            try:
129                stat = os.stat(fullname)
130                break
131            except OSError:
132                pass
133        else:
134            return []
135    try:
136        with tokenize.open(fullname) as fp:
137            lines = fp.readlines()
138    except OSError:
139        return []
140    if lines and not lines[-1].endswith('\n'):
141        lines[-1] += '\n'
142    size, mtime = stat.st_size, stat.st_mtime
143    cache[filename] = size, mtime, lines, fullname
144    return lines
145
146
147def lazycache(filename, module_globals):
148    """Seed the cache for filename with module_globals.
149
150    The module loader will be asked for the source only when getlines is
151    called, not immediately.
152
153    If there is an entry in the cache already, it is not altered.
154
155    :return: True if a lazy load is registered in the cache,
156        otherwise False. To register such a load a module loader with a
157        get_source method must be found, the filename must be a cachable
158        filename, and the filename must not be already cached.
159    """
160    if filename in cache:
161        if len(cache[filename]) == 1:
162            return True
163        else:
164            return False
165    if not filename or (filename.startswith('<') and filename.endswith('>')):
166        return False
167    # Try for a __loader__, if available
168    if module_globals and '__loader__' in module_globals:
169        name = module_globals.get('__name__')
170        loader = module_globals['__loader__']
171        get_source = getattr(loader, 'get_source', None)
172
173        if name and get_source:
174            get_lines = functools.partial(get_source, name)
175            cache[filename] = (get_lines,)
176            return True
177    return False
178