1"""Stuff to parse Sun and NeXT audio files. 2 3An audio file consists of a header followed by the data. The structure 4of the header is as follows. 5 6 +---------------+ 7 | magic word | 8 +---------------+ 9 | header size | 10 +---------------+ 11 | data size | 12 +---------------+ 13 | encoding | 14 +---------------+ 15 | sample rate | 16 +---------------+ 17 | # of channels | 18 +---------------+ 19 | info | 20 | | 21 +---------------+ 22 23The magic word consists of the 4 characters '.snd'. Apart from the 24info field, all header fields are 4 bytes in size. They are all 2532-bit unsigned integers encoded in big-endian byte order. 26 27The header size really gives the start of the data. 28The data size is the physical size of the data. From the other 29parameters the number of frames can be calculated. 30The encoding gives the way in which audio samples are encoded. 31Possible values are listed below. 32The info field currently consists of an ASCII string giving a 33human-readable description of the audio file. The info field is 34padded with NUL bytes to the header size. 35 36Usage. 37 38Reading audio files: 39 f = sunau.open(file, 'r') 40where file is either the name of a file or an open file pointer. 41The open file pointer must have methods read(), seek(), and close(). 42When the setpos() and rewind() methods are not used, the seek() 43method is not necessary. 44 45This returns an instance of a class with the following public methods: 46 getnchannels() -- returns number of audio channels (1 for 47 mono, 2 for stereo) 48 getsampwidth() -- returns sample width in bytes 49 getframerate() -- returns sampling frequency 50 getnframes() -- returns number of audio frames 51 getcomptype() -- returns compression type ('NONE' or 'ULAW') 52 getcompname() -- returns human-readable version of 53 compression type ('not compressed' matches 'NONE') 54 getparams() -- returns a namedtuple consisting of all of the 55 above in the above order 56 getmarkers() -- returns None (for compatibility with the 57 aifc module) 58 getmark(id) -- raises an error since the mark does not 59 exist (for compatibility with the aifc module) 60 readframes(n) -- returns at most n frames of audio 61 rewind() -- rewind to the beginning of the audio stream 62 setpos(pos) -- seek to the specified position 63 tell() -- return the current position 64 close() -- close the instance (make it unusable) 65The position returned by tell() and the position given to setpos() 66are compatible and have nothing to do with the actual position in the 67file. 68The close() method is called automatically when the class instance 69is destroyed. 70 71Writing audio files: 72 f = sunau.open(file, 'w') 73where file is either the name of a file or an open file pointer. 74The open file pointer must have methods write(), tell(), seek(), and 75close(). 76 77This returns an instance of a class with the following public methods: 78 setnchannels(n) -- set the number of channels 79 setsampwidth(n) -- set the sample width 80 setframerate(n) -- set the frame rate 81 setnframes(n) -- set the number of frames 82 setcomptype(type, name) 83 -- set the compression type and the 84 human-readable compression type 85 setparams(tuple)-- set all parameters at once 86 tell() -- return current position in output file 87 writeframesraw(data) 88 -- write audio frames without pathing up the 89 file header 90 writeframes(data) 91 -- write audio frames and patch up the file header 92 close() -- patch up the file header and close the 93 output file 94You should set the parameters before the first writeframesraw or 95writeframes. The total number of frames does not need to be set, 96but when it is set to the correct value, the header does not have to 97be patched up. 98It is best to first set all parameters, perhaps possibly the 99compression type, and then write audio frames using writeframesraw. 100When all frames have been written, either call writeframes(b'') or 101close() to patch up the sizes in the header. 102The close() method is called automatically when the class instance 103is destroyed. 104""" 105 106from collections import namedtuple 107 108 109_sunau_params = namedtuple('_sunau_params', 110 'nchannels sampwidth framerate nframes comptype compname') 111 112# from <multimedia/audio_filehdr.h> 113AUDIO_FILE_MAGIC = 0x2e736e64 114AUDIO_FILE_ENCODING_MULAW_8 = 1 115AUDIO_FILE_ENCODING_LINEAR_8 = 2 116AUDIO_FILE_ENCODING_LINEAR_16 = 3 117AUDIO_FILE_ENCODING_LINEAR_24 = 4 118AUDIO_FILE_ENCODING_LINEAR_32 = 5 119AUDIO_FILE_ENCODING_FLOAT = 6 120AUDIO_FILE_ENCODING_DOUBLE = 7 121AUDIO_FILE_ENCODING_ADPCM_G721 = 23 122AUDIO_FILE_ENCODING_ADPCM_G722 = 24 123AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25 124AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26 125AUDIO_FILE_ENCODING_ALAW_8 = 27 126 127# from <multimedia/audio_hdr.h> 128AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF # ((unsigned)(~0)) 129 130_simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8, 131 AUDIO_FILE_ENCODING_LINEAR_8, 132 AUDIO_FILE_ENCODING_LINEAR_16, 133 AUDIO_FILE_ENCODING_LINEAR_24, 134 AUDIO_FILE_ENCODING_LINEAR_32, 135 AUDIO_FILE_ENCODING_ALAW_8] 136 137class Error(Exception): 138 pass 139 140def _read_u32(file): 141 x = 0 142 for i in range(4): 143 byte = file.read(1) 144 if not byte: 145 raise EOFError 146 x = x*256 + ord(byte) 147 return x 148 149def _write_u32(file, x): 150 data = [] 151 for i in range(4): 152 d, m = divmod(x, 256) 153 data.insert(0, int(m)) 154 x = d 155 file.write(bytes(data)) 156 157class Au_read: 158 159 def __init__(self, f): 160 if type(f) == type(''): 161 import builtins 162 f = builtins.open(f, 'rb') 163 self._opened = True 164 else: 165 self._opened = False 166 self.initfp(f) 167 168 def __del__(self): 169 if self._file: 170 self.close() 171 172 def __enter__(self): 173 return self 174 175 def __exit__(self, *args): 176 self.close() 177 178 def initfp(self, file): 179 self._file = file 180 self._soundpos = 0 181 magic = int(_read_u32(file)) 182 if magic != AUDIO_FILE_MAGIC: 183 raise Error('bad magic number') 184 self._hdr_size = int(_read_u32(file)) 185 if self._hdr_size < 24: 186 raise Error('header size too small') 187 if self._hdr_size > 100: 188 raise Error('header size ridiculously large') 189 self._data_size = _read_u32(file) 190 if self._data_size != AUDIO_UNKNOWN_SIZE: 191 self._data_size = int(self._data_size) 192 self._encoding = int(_read_u32(file)) 193 if self._encoding not in _simple_encodings: 194 raise Error('encoding not (yet) supported') 195 if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8, 196 AUDIO_FILE_ENCODING_ALAW_8): 197 self._sampwidth = 2 198 self._framesize = 1 199 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8: 200 self._framesize = self._sampwidth = 1 201 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16: 202 self._framesize = self._sampwidth = 2 203 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24: 204 self._framesize = self._sampwidth = 3 205 elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32: 206 self._framesize = self._sampwidth = 4 207 else: 208 raise Error('unknown encoding') 209 self._framerate = int(_read_u32(file)) 210 self._nchannels = int(_read_u32(file)) 211 if not self._nchannels: 212 raise Error('bad # of channels') 213 self._framesize = self._framesize * self._nchannels 214 if self._hdr_size > 24: 215 self._info = file.read(self._hdr_size - 24) 216 self._info, _, _ = self._info.partition(b'\0') 217 else: 218 self._info = b'' 219 try: 220 self._data_pos = file.tell() 221 except (AttributeError, OSError): 222 self._data_pos = None 223 224 def getfp(self): 225 return self._file 226 227 def getnchannels(self): 228 return self._nchannels 229 230 def getsampwidth(self): 231 return self._sampwidth 232 233 def getframerate(self): 234 return self._framerate 235 236 def getnframes(self): 237 if self._data_size == AUDIO_UNKNOWN_SIZE: 238 return AUDIO_UNKNOWN_SIZE 239 if self._encoding in _simple_encodings: 240 return self._data_size // self._framesize 241 return 0 # XXX--must do some arithmetic here 242 243 def getcomptype(self): 244 if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: 245 return 'ULAW' 246 elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: 247 return 'ALAW' 248 else: 249 return 'NONE' 250 251 def getcompname(self): 252 if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: 253 return 'CCITT G.711 u-law' 254 elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: 255 return 'CCITT G.711 A-law' 256 else: 257 return 'not compressed' 258 259 def getparams(self): 260 return _sunau_params(self.getnchannels(), self.getsampwidth(), 261 self.getframerate(), self.getnframes(), 262 self.getcomptype(), self.getcompname()) 263 264 def getmarkers(self): 265 return None 266 267 def getmark(self, id): 268 raise Error('no marks') 269 270 def readframes(self, nframes): 271 if self._encoding in _simple_encodings: 272 if nframes == AUDIO_UNKNOWN_SIZE: 273 data = self._file.read() 274 else: 275 data = self._file.read(nframes * self._framesize) 276 self._soundpos += len(data) // self._framesize 277 if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: 278 import audioop 279 data = audioop.ulaw2lin(data, self._sampwidth) 280 return data 281 return None # XXX--not implemented yet 282 283 def rewind(self): 284 if self._data_pos is None: 285 raise OSError('cannot seek') 286 self._file.seek(self._data_pos) 287 self._soundpos = 0 288 289 def tell(self): 290 return self._soundpos 291 292 def setpos(self, pos): 293 if pos < 0 or pos > self.getnframes(): 294 raise Error('position not in range') 295 if self._data_pos is None: 296 raise OSError('cannot seek') 297 self._file.seek(self._data_pos + pos * self._framesize) 298 self._soundpos = pos 299 300 def close(self): 301 file = self._file 302 if file: 303 self._file = None 304 if self._opened: 305 file.close() 306 307class Au_write: 308 309 def __init__(self, f): 310 if type(f) == type(''): 311 import builtins 312 f = builtins.open(f, 'wb') 313 self._opened = True 314 else: 315 self._opened = False 316 self.initfp(f) 317 318 def __del__(self): 319 if self._file: 320 self.close() 321 self._file = None 322 323 def __enter__(self): 324 return self 325 326 def __exit__(self, *args): 327 self.close() 328 329 def initfp(self, file): 330 self._file = file 331 self._framerate = 0 332 self._nchannels = 0 333 self._sampwidth = 0 334 self._framesize = 0 335 self._nframes = AUDIO_UNKNOWN_SIZE 336 self._nframeswritten = 0 337 self._datawritten = 0 338 self._datalength = 0 339 self._info = b'' 340 self._comptype = 'ULAW' # default is U-law 341 342 def setnchannels(self, nchannels): 343 if self._nframeswritten: 344 raise Error('cannot change parameters after starting to write') 345 if nchannels not in (1, 2, 4): 346 raise Error('only 1, 2, or 4 channels supported') 347 self._nchannels = nchannels 348 349 def getnchannels(self): 350 if not self._nchannels: 351 raise Error('number of channels not set') 352 return self._nchannels 353 354 def setsampwidth(self, sampwidth): 355 if self._nframeswritten: 356 raise Error('cannot change parameters after starting to write') 357 if sampwidth not in (1, 2, 3, 4): 358 raise Error('bad sample width') 359 self._sampwidth = sampwidth 360 361 def getsampwidth(self): 362 if not self._framerate: 363 raise Error('sample width not specified') 364 return self._sampwidth 365 366 def setframerate(self, framerate): 367 if self._nframeswritten: 368 raise Error('cannot change parameters after starting to write') 369 self._framerate = framerate 370 371 def getframerate(self): 372 if not self._framerate: 373 raise Error('frame rate not set') 374 return self._framerate 375 376 def setnframes(self, nframes): 377 if self._nframeswritten: 378 raise Error('cannot change parameters after starting to write') 379 if nframes < 0: 380 raise Error('# of frames cannot be negative') 381 self._nframes = nframes 382 383 def getnframes(self): 384 return self._nframeswritten 385 386 def setcomptype(self, type, name): 387 if type in ('NONE', 'ULAW'): 388 self._comptype = type 389 else: 390 raise Error('unknown compression type') 391 392 def getcomptype(self): 393 return self._comptype 394 395 def getcompname(self): 396 if self._comptype == 'ULAW': 397 return 'CCITT G.711 u-law' 398 elif self._comptype == 'ALAW': 399 return 'CCITT G.711 A-law' 400 else: 401 return 'not compressed' 402 403 def setparams(self, params): 404 nchannels, sampwidth, framerate, nframes, comptype, compname = params 405 self.setnchannels(nchannels) 406 self.setsampwidth(sampwidth) 407 self.setframerate(framerate) 408 self.setnframes(nframes) 409 self.setcomptype(comptype, compname) 410 411 def getparams(self): 412 return _sunau_params(self.getnchannels(), self.getsampwidth(), 413 self.getframerate(), self.getnframes(), 414 self.getcomptype(), self.getcompname()) 415 416 def tell(self): 417 return self._nframeswritten 418 419 def writeframesraw(self, data): 420 if not isinstance(data, (bytes, bytearray)): 421 data = memoryview(data).cast('B') 422 self._ensure_header_written() 423 if self._comptype == 'ULAW': 424 import audioop 425 data = audioop.lin2ulaw(data, self._sampwidth) 426 nframes = len(data) // self._framesize 427 self._file.write(data) 428 self._nframeswritten = self._nframeswritten + nframes 429 self._datawritten = self._datawritten + len(data) 430 431 def writeframes(self, data): 432 self.writeframesraw(data) 433 if self._nframeswritten != self._nframes or \ 434 self._datalength != self._datawritten: 435 self._patchheader() 436 437 def close(self): 438 if self._file: 439 try: 440 self._ensure_header_written() 441 if self._nframeswritten != self._nframes or \ 442 self._datalength != self._datawritten: 443 self._patchheader() 444 self._file.flush() 445 finally: 446 file = self._file 447 self._file = None 448 if self._opened: 449 file.close() 450 451 # 452 # private methods 453 # 454 455 def _ensure_header_written(self): 456 if not self._nframeswritten: 457 if not self._nchannels: 458 raise Error('# of channels not specified') 459 if not self._sampwidth: 460 raise Error('sample width not specified') 461 if not self._framerate: 462 raise Error('frame rate not specified') 463 self._write_header() 464 465 def _write_header(self): 466 if self._comptype == 'NONE': 467 if self._sampwidth == 1: 468 encoding = AUDIO_FILE_ENCODING_LINEAR_8 469 self._framesize = 1 470 elif self._sampwidth == 2: 471 encoding = AUDIO_FILE_ENCODING_LINEAR_16 472 self._framesize = 2 473 elif self._sampwidth == 3: 474 encoding = AUDIO_FILE_ENCODING_LINEAR_24 475 self._framesize = 3 476 elif self._sampwidth == 4: 477 encoding = AUDIO_FILE_ENCODING_LINEAR_32 478 self._framesize = 4 479 else: 480 raise Error('internal error') 481 elif self._comptype == 'ULAW': 482 encoding = AUDIO_FILE_ENCODING_MULAW_8 483 self._framesize = 1 484 else: 485 raise Error('internal error') 486 self._framesize = self._framesize * self._nchannels 487 _write_u32(self._file, AUDIO_FILE_MAGIC) 488 header_size = 25 + len(self._info) 489 header_size = (header_size + 7) & ~7 490 _write_u32(self._file, header_size) 491 if self._nframes == AUDIO_UNKNOWN_SIZE: 492 length = AUDIO_UNKNOWN_SIZE 493 else: 494 length = self._nframes * self._framesize 495 try: 496 self._form_length_pos = self._file.tell() 497 except (AttributeError, OSError): 498 self._form_length_pos = None 499 _write_u32(self._file, length) 500 self._datalength = length 501 _write_u32(self._file, encoding) 502 _write_u32(self._file, self._framerate) 503 _write_u32(self._file, self._nchannels) 504 self._file.write(self._info) 505 self._file.write(b'\0'*(header_size - len(self._info) - 24)) 506 507 def _patchheader(self): 508 if self._form_length_pos is None: 509 raise OSError('cannot seek') 510 self._file.seek(self._form_length_pos) 511 _write_u32(self._file, self._datawritten) 512 self._datalength = self._datawritten 513 self._file.seek(0, 2) 514 515def open(f, mode=None): 516 if mode is None: 517 if hasattr(f, 'mode'): 518 mode = f.mode 519 else: 520 mode = 'rb' 521 if mode in ('r', 'rb'): 522 return Au_read(f) 523 elif mode in ('w', 'wb'): 524 return Au_write(f) 525 else: 526 raise Error("mode must be 'r', 'rb', 'w', or 'wb'") 527