1 2""" 3csv.py - read/write/investigate CSV files 4""" 5 6import re 7from _csv import Error, __version__, writer, reader, register_dialect, \ 8 unregister_dialect, get_dialect, list_dialects, \ 9 field_size_limit, \ 10 QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \ 11 __doc__ 12from _csv import Dialect as _Dialect 13 14from collections import OrderedDict 15from io import StringIO 16 17__all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE", 18 "Error", "Dialect", "__doc__", "excel", "excel_tab", 19 "field_size_limit", "reader", "writer", 20 "register_dialect", "get_dialect", "list_dialects", "Sniffer", 21 "unregister_dialect", "__version__", "DictReader", "DictWriter", 22 "unix_dialect"] 23 24class Dialect: 25 """Describe a CSV dialect. 26 27 This must be subclassed (see csv.excel). Valid attributes are: 28 delimiter, quotechar, escapechar, doublequote, skipinitialspace, 29 lineterminator, quoting. 30 31 """ 32 _name = "" 33 _valid = False 34 # placeholders 35 delimiter = None 36 quotechar = None 37 escapechar = None 38 doublequote = None 39 skipinitialspace = None 40 lineterminator = None 41 quoting = None 42 43 def __init__(self): 44 if self.__class__ != Dialect: 45 self._valid = True 46 self._validate() 47 48 def _validate(self): 49 try: 50 _Dialect(self) 51 except TypeError as e: 52 # We do this for compatibility with py2.3 53 raise Error(str(e)) 54 55class excel(Dialect): 56 """Describe the usual properties of Excel-generated CSV files.""" 57 delimiter = ',' 58 quotechar = '"' 59 doublequote = True 60 skipinitialspace = False 61 lineterminator = '\r\n' 62 quoting = QUOTE_MINIMAL 63register_dialect("excel", excel) 64 65class excel_tab(excel): 66 """Describe the usual properties of Excel-generated TAB-delimited files.""" 67 delimiter = '\t' 68register_dialect("excel-tab", excel_tab) 69 70class unix_dialect(Dialect): 71 """Describe the usual properties of Unix-generated CSV files.""" 72 delimiter = ',' 73 quotechar = '"' 74 doublequote = True 75 skipinitialspace = False 76 lineterminator = '\n' 77 quoting = QUOTE_ALL 78register_dialect("unix", unix_dialect) 79 80 81class DictReader: 82 def __init__(self, f, fieldnames=None, restkey=None, restval=None, 83 dialect="excel", *args, **kwds): 84 self._fieldnames = fieldnames # list of keys for the dict 85 self.restkey = restkey # key to catch long rows 86 self.restval = restval # default value for short rows 87 self.reader = reader(f, dialect, *args, **kwds) 88 self.dialect = dialect 89 self.line_num = 0 90 91 def __iter__(self): 92 return self 93 94 @property 95 def fieldnames(self): 96 if self._fieldnames is None: 97 try: 98 self._fieldnames = next(self.reader) 99 except StopIteration: 100 pass 101 self.line_num = self.reader.line_num 102 return self._fieldnames 103 104 @fieldnames.setter 105 def fieldnames(self, value): 106 self._fieldnames = value 107 108 def __next__(self): 109 if self.line_num == 0: 110 # Used only for its side effect. 111 self.fieldnames 112 row = next(self.reader) 113 self.line_num = self.reader.line_num 114 115 # unlike the basic reader, we prefer not to return blanks, 116 # because we will typically wind up with a dict full of None 117 # values 118 while row == []: 119 row = next(self.reader) 120 d = OrderedDict(zip(self.fieldnames, row)) 121 lf = len(self.fieldnames) 122 lr = len(row) 123 if lf < lr: 124 d[self.restkey] = row[lf:] 125 elif lf > lr: 126 for key in self.fieldnames[lr:]: 127 d[key] = self.restval 128 return d 129 130 131class DictWriter: 132 def __init__(self, f, fieldnames, restval="", extrasaction="raise", 133 dialect="excel", *args, **kwds): 134 self.fieldnames = fieldnames # list of keys for the dict 135 self.restval = restval # for writing short dicts 136 if extrasaction.lower() not in ("raise", "ignore"): 137 raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'" 138 % extrasaction) 139 self.extrasaction = extrasaction 140 self.writer = writer(f, dialect, *args, **kwds) 141 142 def writeheader(self): 143 header = dict(zip(self.fieldnames, self.fieldnames)) 144 self.writerow(header) 145 146 def _dict_to_list(self, rowdict): 147 if self.extrasaction == "raise": 148 wrong_fields = rowdict.keys() - self.fieldnames 149 if wrong_fields: 150 raise ValueError("dict contains fields not in fieldnames: " 151 + ", ".join([repr(x) for x in wrong_fields])) 152 return (rowdict.get(key, self.restval) for key in self.fieldnames) 153 154 def writerow(self, rowdict): 155 return self.writer.writerow(self._dict_to_list(rowdict)) 156 157 def writerows(self, rowdicts): 158 return self.writer.writerows(map(self._dict_to_list, rowdicts)) 159 160# Guard Sniffer's type checking against builds that exclude complex() 161try: 162 complex 163except NameError: 164 complex = float 165 166class Sniffer: 167 ''' 168 "Sniffs" the format of a CSV file (i.e. delimiter, quotechar) 169 Returns a Dialect object. 170 ''' 171 def __init__(self): 172 # in case there is more than one possible delimiter 173 self.preferred = [',', '\t', ';', ' ', ':'] 174 175 176 def sniff(self, sample, delimiters=None): 177 """ 178 Returns a dialect (or None) corresponding to the sample 179 """ 180 181 quotechar, doublequote, delimiter, skipinitialspace = \ 182 self._guess_quote_and_delimiter(sample, delimiters) 183 if not delimiter: 184 delimiter, skipinitialspace = self._guess_delimiter(sample, 185 delimiters) 186 187 if not delimiter: 188 raise Error("Could not determine delimiter") 189 190 class dialect(Dialect): 191 _name = "sniffed" 192 lineterminator = '\r\n' 193 quoting = QUOTE_MINIMAL 194 # escapechar = '' 195 196 dialect.doublequote = doublequote 197 dialect.delimiter = delimiter 198 # _csv.reader won't accept a quotechar of '' 199 dialect.quotechar = quotechar or '"' 200 dialect.skipinitialspace = skipinitialspace 201 202 return dialect 203 204 205 def _guess_quote_and_delimiter(self, data, delimiters): 206 """ 207 Looks for text enclosed between two identical quotes 208 (the probable quotechar) which are preceded and followed 209 by the same character (the probable delimiter). 210 For example: 211 ,'some text', 212 The quote with the most wins, same with the delimiter. 213 If there is no quotechar the delimiter can't be determined 214 this way. 215 """ 216 217 matches = [] 218 for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?", 219 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?", 220 r'(?P<delim>>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?" 221 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space) 222 regexp = re.compile(restr, re.DOTALL | re.MULTILINE) 223 matches = regexp.findall(data) 224 if matches: 225 break 226 227 if not matches: 228 # (quotechar, doublequote, delimiter, skipinitialspace) 229 return ('', False, None, 0) 230 quotes = {} 231 delims = {} 232 spaces = 0 233 groupindex = regexp.groupindex 234 for m in matches: 235 n = groupindex['quote'] - 1 236 key = m[n] 237 if key: 238 quotes[key] = quotes.get(key, 0) + 1 239 try: 240 n = groupindex['delim'] - 1 241 key = m[n] 242 except KeyError: 243 continue 244 if key and (delimiters is None or key in delimiters): 245 delims[key] = delims.get(key, 0) + 1 246 try: 247 n = groupindex['space'] - 1 248 except KeyError: 249 continue 250 if m[n]: 251 spaces += 1 252 253 quotechar = max(quotes, key=quotes.get) 254 255 if delims: 256 delim = max(delims, key=delims.get) 257 skipinitialspace = delims[delim] == spaces 258 if delim == '\n': # most likely a file with a single column 259 delim = '' 260 else: 261 # there is *no* delimiter, it's a single column of quoted data 262 delim = '' 263 skipinitialspace = 0 264 265 # if we see an extra quote between delimiters, we've got a 266 # double quoted format 267 dq_regexp = re.compile( 268 r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \ 269 {'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE) 270 271 272 273 if dq_regexp.search(data): 274 doublequote = True 275 else: 276 doublequote = False 277 278 return (quotechar, doublequote, delim, skipinitialspace) 279 280 281 def _guess_delimiter(self, data, delimiters): 282 """ 283 The delimiter /should/ occur the same number of times on 284 each row. However, due to malformed data, it may not. We don't want 285 an all or nothing approach, so we allow for small variations in this 286 number. 287 1) build a table of the frequency of each character on every line. 288 2) build a table of frequencies of this frequency (meta-frequency?), 289 e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows, 290 7 times in 2 rows' 291 3) use the mode of the meta-frequency to determine the /expected/ 292 frequency for that character 293 4) find out how often the character actually meets that goal 294 5) the character that best meets its goal is the delimiter 295 For performance reasons, the data is evaluated in chunks, so it can 296 try and evaluate the smallest portion of the data possible, evaluating 297 additional chunks as necessary. 298 """ 299 300 data = list(filter(None, data.split('\n'))) 301 302 ascii = [chr(c) for c in range(127)] # 7-bit ASCII 303 304 # build frequency tables 305 chunkLength = min(10, len(data)) 306 iteration = 0 307 charFrequency = {} 308 modes = {} 309 delims = {} 310 start, end = 0, min(chunkLength, len(data)) 311 while start < len(data): 312 iteration += 1 313 for line in data[start:end]: 314 for char in ascii: 315 metaFrequency = charFrequency.get(char, {}) 316 # must count even if frequency is 0 317 freq = line.count(char) 318 # value is the mode 319 metaFrequency[freq] = metaFrequency.get(freq, 0) + 1 320 charFrequency[char] = metaFrequency 321 322 for char in charFrequency.keys(): 323 items = list(charFrequency[char].items()) 324 if len(items) == 1 and items[0][0] == 0: 325 continue 326 # get the mode of the frequencies 327 if len(items) > 1: 328 modes[char] = max(items, key=lambda x: x[1]) 329 # adjust the mode - subtract the sum of all 330 # other frequencies 331 items.remove(modes[char]) 332 modes[char] = (modes[char][0], modes[char][1] 333 - sum(item[1] for item in items)) 334 else: 335 modes[char] = items[0] 336 337 # build a list of possible delimiters 338 modeList = modes.items() 339 total = float(chunkLength * iteration) 340 # (rows of consistent data) / (number of rows) = 100% 341 consistency = 1.0 342 # minimum consistency threshold 343 threshold = 0.9 344 while len(delims) == 0 and consistency >= threshold: 345 for k, v in modeList: 346 if v[0] > 0 and v[1] > 0: 347 if ((v[1]/total) >= consistency and 348 (delimiters is None or k in delimiters)): 349 delims[k] = v 350 consistency -= 0.01 351 352 if len(delims) == 1: 353 delim = list(delims.keys())[0] 354 skipinitialspace = (data[0].count(delim) == 355 data[0].count("%c " % delim)) 356 return (delim, skipinitialspace) 357 358 # analyze another chunkLength lines 359 start = end 360 end += chunkLength 361 362 if not delims: 363 return ('', 0) 364 365 # if there's more than one, fall back to a 'preferred' list 366 if len(delims) > 1: 367 for d in self.preferred: 368 if d in delims.keys(): 369 skipinitialspace = (data[0].count(d) == 370 data[0].count("%c " % d)) 371 return (d, skipinitialspace) 372 373 # nothing else indicates a preference, pick the character that 374 # dominates(?) 375 items = [(v,k) for (k,v) in delims.items()] 376 items.sort() 377 delim = items[-1][1] 378 379 skipinitialspace = (data[0].count(delim) == 380 data[0].count("%c " % delim)) 381 return (delim, skipinitialspace) 382 383 384 def has_header(self, sample): 385 # Creates a dictionary of types of data in each column. If any 386 # column is of a single type (say, integers), *except* for the first 387 # row, then the first row is presumed to be labels. If the type 388 # can't be determined, it is assumed to be a string in which case 389 # the length of the string is the determining factor: if all of the 390 # rows except for the first are the same length, it's a header. 391 # Finally, a 'vote' is taken at the end for each column, adding or 392 # subtracting from the likelihood of the first row being a header. 393 394 rdr = reader(StringIO(sample), self.sniff(sample)) 395 396 header = next(rdr) # assume first row is header 397 398 columns = len(header) 399 columnTypes = {} 400 for i in range(columns): columnTypes[i] = None 401 402 checked = 0 403 for row in rdr: 404 # arbitrary number of rows to check, to keep it sane 405 if checked > 20: 406 break 407 checked += 1 408 409 if len(row) != columns: 410 continue # skip rows that have irregular number of columns 411 412 for col in list(columnTypes.keys()): 413 414 for thisType in [int, float, complex]: 415 try: 416 thisType(row[col]) 417 break 418 except (ValueError, OverflowError): 419 pass 420 else: 421 # fallback to length of string 422 thisType = len(row[col]) 423 424 if thisType != columnTypes[col]: 425 if columnTypes[col] is None: # add new column type 426 columnTypes[col] = thisType 427 else: 428 # type is inconsistent, remove column from 429 # consideration 430 del columnTypes[col] 431 432 # finally, compare results against first row and "vote" 433 # on whether it's a header 434 hasHeader = 0 435 for col, colType in columnTypes.items(): 436 if type(colType) == type(0): # it's a length 437 if len(header[col]) != colType: 438 hasHeader += 1 439 else: 440 hasHeader -= 1 441 else: # attempt typecast 442 try: 443 colType(header[col]) 444 except (ValueError, TypeError): 445 hasHeader += 1 446 else: 447 hasHeader -= 1 448 449 return hasHeader > 0 450