• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import csv
2import subprocess
3
4
5_NOT_SET = object()
6
7
8def run_cmd(argv, **kwargs):
9    proc = subprocess.run(
10            argv,
11            #capture_output=True,
12            #stderr=subprocess.STDOUT,
13            stdout=subprocess.PIPE,
14            text=True,
15            check=True,
16            **kwargs
17            )
18    return proc.stdout
19
20
21def read_tsv(infile, header, *,
22             _open=open,
23             _get_reader=csv.reader,
24             ):
25    """Yield each row of the given TSV (tab-separated) file."""
26    if isinstance(infile, str):
27        with _open(infile, newline='') as infile:
28            yield from read_tsv(infile, header,
29                                _open=_open,
30                                _get_reader=_get_reader,
31                                )
32            return
33    lines = iter(infile)
34
35    # Validate the header.
36    try:
37        actualheader = next(lines).strip()
38    except StopIteration:
39        actualheader = ''
40    if actualheader != header:
41        raise ValueError(f'bad header {actualheader!r}')
42
43    for row in _get_reader(lines, delimiter='\t'):
44        yield tuple(v.strip() for v in row)
45
46
47def write_tsv(outfile, header, rows, *,
48             _open=open,
49             _get_writer=csv.writer,
50             ):
51    """Write each of the rows to the given TSV (tab-separated) file."""
52    if isinstance(outfile, str):
53        with _open(outfile, 'w', newline='') as outfile:
54            return write_tsv(outfile, header, rows,
55                            _open=_open,
56                            _get_writer=_get_writer,
57                            )
58
59    if isinstance(header, str):
60        header = header.split('\t')
61    writer = _get_writer(outfile, delimiter='\t')
62    writer.writerow(header)
63    for row in rows:
64        writer.writerow('' if v is None else str(v)
65                        for v in row)
66
67
68class Slot:
69    """A descriptor that provides a slot.
70
71    This is useful for types that can't have slots via __slots__,
72    e.g. tuple subclasses.
73    """
74
75    __slots__ = ('initial', 'default', 'readonly', 'instances', 'name')
76
77    def __init__(self, initial=_NOT_SET, *,
78                 default=_NOT_SET,
79                 readonly=False,
80                 ):
81        self.initial = initial
82        self.default = default
83        self.readonly = readonly
84
85        # The instance cache is not inherently tied to the normal
86        # lifetime of the instances.  So must do something in order to
87        # avoid keeping the instances alive by holding a reference here.
88        # Ideally we would use weakref.WeakValueDictionary to do this.
89        # However, most builtin types do not support weakrefs.  So
90        # instead we monkey-patch __del__ on the attached class to clear
91        # the instance.
92        self.instances = {}
93        self.name = None
94
95    def __set_name__(self, cls, name):
96        if self.name is not None:
97            raise TypeError('already used')
98        self.name = name
99        try:
100            slotnames = cls.__slot_names__
101        except AttributeError:
102            slotnames = cls.__slot_names__ = []
103        slotnames.append(name)
104        self._ensure___del__(cls, slotnames)
105
106    def __get__(self, obj, cls):
107        if obj is None:  # called on the class
108            return self
109        try:
110            value = self.instances[id(obj)]
111        except KeyError:
112            if self.initial is _NOT_SET:
113                value = self.default
114            else:
115                value = self.initial
116            self.instances[id(obj)] = value
117        if value is _NOT_SET:
118            raise AttributeError(self.name)
119        # XXX Optionally make a copy?
120        return value
121
122    def __set__(self, obj, value):
123        if self.readonly:
124            raise AttributeError(f'{self.name} is readonly')
125        # XXX Optionally coerce?
126        self.instances[id(obj)] = value
127
128    def __delete__(self, obj):
129        if self.readonly:
130            raise AttributeError(f'{self.name} is readonly')
131        self.instances[id(obj)] = self.default  # XXX refleak?
132
133    def _ensure___del__(self, cls, slotnames):  # See the comment in __init__().
134        try:
135            old___del__ = cls.__del__
136        except AttributeError:
137            old___del__ = (lambda s: None)
138        else:
139            if getattr(old___del__, '_slotted', False):
140                return
141
142        def __del__(_self):
143            for name in slotnames:
144                delattr(_self, name)
145            old___del__(_self)
146        __del__._slotted = True
147        cls.__del__ = __del__
148
149    def set(self, obj, value):
150        """Update the cached value for an object.
151
152        This works even if the descriptor is read-only.  This is
153        particularly useful when initializing the object (e.g. in
154        its __new__ or __init__).
155        """
156        self.instances[id(obj)] = value
157
158
159class classonly:
160    """A non-data descriptor that makes a value only visible on the class.
161
162    This is like the "classmethod" builtin, but does not show up on
163    instances of the class.  It may be used as a decorator.
164    """
165
166    def __init__(self, value):
167        self.value = value
168        self.getter = classmethod(value).__get__
169        self.name = None
170
171    def __set_name__(self, cls, name):
172        if self.name is not None:
173            raise TypeError('already used')
174        self.name = name
175
176    def __get__(self, obj, cls):
177        if obj is not None:
178            raise AttributeError(self.name)
179        # called on the class
180        return self.getter(None, cls)
181
182
183class _NTBase:
184
185    __slots__ = ()
186
187    @classonly
188    def from_raw(cls, raw):
189        if not raw:
190            return None
191        elif isinstance(raw, cls):
192            return raw
193        elif isinstance(raw, str):
194            return cls.from_string(raw)
195        else:
196            if hasattr(raw, 'items'):
197                return cls(**raw)
198            try:
199                args = tuple(raw)
200            except TypeError:
201                pass
202            else:
203                return cls(*args)
204        raise NotImplementedError
205
206    @classonly
207    def from_string(cls, value):
208        """Return a new instance based on the given string."""
209        raise NotImplementedError
210
211    @classmethod
212    def _make(cls, iterable):  # The default _make() is not subclass-friendly.
213        return cls.__new__(cls, *iterable)
214
215    # XXX Always validate?
216    #def __init__(self, *args, **kwargs):
217    #    self.validate()
218
219    # XXX The default __repr__() is not subclass-friendly (where the name changes).
220    #def __repr__(self):
221    #    _, _, sig = super().__repr__().partition('(')
222    #    return f'{self.__class__.__name__}({sig}'
223
224    # To make sorting work with None:
225    def __lt__(self, other):
226        try:
227            return super().__lt__(other)
228        except TypeError:
229            if None in self:
230                return True
231            elif None in other:
232                return False
233            else:
234                raise
235
236    def validate(self):
237        return
238
239    # XXX Always validate?
240    #def _replace(self, **kwargs):
241    #    obj = super()._replace(**kwargs)
242    #    obj.validate()
243    #    return obj
244