• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Stuff to parse Sun and NeXT audio files.
2
3An audio file consists of a header followed by the data.  The structure
4of the header is as follows.
5
6        +---------------+
7        | magic word    |
8        +---------------+
9        | header size   |
10        +---------------+
11        | data size     |
12        +---------------+
13        | encoding      |
14        +---------------+
15        | sample rate   |
16        +---------------+
17        | # of channels |
18        +---------------+
19        | info          |
20        |               |
21        +---------------+
22
23The magic word consists of the 4 characters '.snd'.  Apart from the
24info field, all header fields are 4 bytes in size.  They are all
2532-bit unsigned integers encoded in big-endian byte order.
26
27The header size really gives the start of the data.
28The data size is the physical size of the data.  From the other
29parameters the number of frames can be calculated.
30The encoding gives the way in which audio samples are encoded.
31Possible values are listed below.
32The info field currently consists of an ASCII string giving a
33human-readable description of the audio file.  The info field is
34padded with NUL bytes to the header size.
35
36Usage.
37
38Reading audio files:
39        f = sunau.open(file, 'r')
40where file is either the name of a file or an open file pointer.
41The open file pointer must have methods read(), seek(), and close().
42When the setpos() and rewind() methods are not used, the seek()
43method is not  necessary.
44
45This returns an instance of a class with the following public methods:
46        getnchannels()  -- returns number of audio channels (1 for
47                           mono, 2 for stereo)
48        getsampwidth()  -- returns sample width in bytes
49        getframerate()  -- returns sampling frequency
50        getnframes()    -- returns number of audio frames
51        getcomptype()   -- returns compression type ('NONE' or 'ULAW')
52        getcompname()   -- returns human-readable version of
53                           compression type ('not compressed' matches 'NONE')
54        getparams()     -- returns a namedtuple consisting of all of the
55                           above in the above order
56        getmarkers()    -- returns None (for compatibility with the
57                           aifc module)
58        getmark(id)     -- raises an error since the mark does not
59                           exist (for compatibility with the aifc module)
60        readframes(n)   -- returns at most n frames of audio
61        rewind()        -- rewind to the beginning of the audio stream
62        setpos(pos)     -- seek to the specified position
63        tell()          -- return the current position
64        close()         -- close the instance (make it unusable)
65The position returned by tell() and the position given to setpos()
66are compatible and have nothing to do with the actual position in the
67file.
68The close() method is called automatically when the class instance
69is destroyed.
70
71Writing audio files:
72        f = sunau.open(file, 'w')
73where file is either the name of a file or an open file pointer.
74The open file pointer must have methods write(), tell(), seek(), and
75close().
76
77This returns an instance of a class with the following public methods:
78        setnchannels(n) -- set the number of channels
79        setsampwidth(n) -- set the sample width
80        setframerate(n) -- set the frame rate
81        setnframes(n)   -- set the number of frames
82        setcomptype(type, name)
83                        -- set the compression type and the
84                           human-readable compression type
85        setparams(tuple)-- set all parameters at once
86        tell()          -- return current position in output file
87        writeframesraw(data)
88                        -- write audio frames without pathing up the
89                           file header
90        writeframes(data)
91                        -- write audio frames and patch up the file header
92        close()         -- patch up the file header and close the
93                           output file
94You should set the parameters before the first writeframesraw or
95writeframes.  The total number of frames does not need to be set,
96but when it is set to the correct value, the header does not have to
97be patched up.
98It is best to first set all parameters, perhaps possibly the
99compression type, and then write audio frames using writeframesraw.
100When all frames have been written, either call writeframes(b'') or
101close() to patch up the sizes in the header.
102The close() method is called automatically when the class instance
103is destroyed.
104"""
105
106from collections import namedtuple
107
108_sunau_params = namedtuple('_sunau_params',
109                           'nchannels sampwidth framerate nframes comptype compname')
110
111# from <multimedia/audio_filehdr.h>
112AUDIO_FILE_MAGIC = 0x2e736e64
113AUDIO_FILE_ENCODING_MULAW_8 = 1
114AUDIO_FILE_ENCODING_LINEAR_8 = 2
115AUDIO_FILE_ENCODING_LINEAR_16 = 3
116AUDIO_FILE_ENCODING_LINEAR_24 = 4
117AUDIO_FILE_ENCODING_LINEAR_32 = 5
118AUDIO_FILE_ENCODING_FLOAT = 6
119AUDIO_FILE_ENCODING_DOUBLE = 7
120AUDIO_FILE_ENCODING_ADPCM_G721 = 23
121AUDIO_FILE_ENCODING_ADPCM_G722 = 24
122AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25
123AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26
124AUDIO_FILE_ENCODING_ALAW_8 = 27
125
126# from <multimedia/audio_hdr.h>
127AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF        # ((unsigned)(~0))
128
129_simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8,
130                     AUDIO_FILE_ENCODING_LINEAR_8,
131                     AUDIO_FILE_ENCODING_LINEAR_16,
132                     AUDIO_FILE_ENCODING_LINEAR_24,
133                     AUDIO_FILE_ENCODING_LINEAR_32,
134                     AUDIO_FILE_ENCODING_ALAW_8]
135
136class Error(Exception):
137    pass
138
139def _read_u32(file):
140    x = 0
141    for i in range(4):
142        byte = file.read(1)
143        if not byte:
144            raise EOFError
145        x = x*256 + ord(byte)
146    return x
147
148def _write_u32(file, x):
149    data = []
150    for i in range(4):
151        d, m = divmod(x, 256)
152        data.insert(0, int(m))
153        x = d
154    file.write(bytes(data))
155
156class Au_read:
157
158    def __init__(self, f):
159        if type(f) == type(''):
160            import builtins
161            f = builtins.open(f, 'rb')
162            self._opened = True
163        else:
164            self._opened = False
165        self.initfp(f)
166
167    def __del__(self):
168        if self._file:
169            self.close()
170
171    def __enter__(self):
172        return self
173
174    def __exit__(self, *args):
175        self.close()
176
177    def initfp(self, file):
178        self._file = file
179        self._soundpos = 0
180        magic = int(_read_u32(file))
181        if magic != AUDIO_FILE_MAGIC:
182            raise Error('bad magic number')
183        self._hdr_size = int(_read_u32(file))
184        if self._hdr_size < 24:
185            raise Error('header size too small')
186        if self._hdr_size > 100:
187            raise Error('header size ridiculously large')
188        self._data_size = _read_u32(file)
189        if self._data_size != AUDIO_UNKNOWN_SIZE:
190            self._data_size = int(self._data_size)
191        self._encoding = int(_read_u32(file))
192        if self._encoding not in _simple_encodings:
193            raise Error('encoding not (yet) supported')
194        if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8,
195                  AUDIO_FILE_ENCODING_ALAW_8):
196            self._sampwidth = 2
197            self._framesize = 1
198        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8:
199            self._framesize = self._sampwidth = 1
200        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16:
201            self._framesize = self._sampwidth = 2
202        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24:
203            self._framesize = self._sampwidth = 3
204        elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32:
205            self._framesize = self._sampwidth = 4
206        else:
207            raise Error('unknown encoding')
208        self._framerate = int(_read_u32(file))
209        self._nchannels = int(_read_u32(file))
210        self._framesize = self._framesize * self._nchannels
211        if self._hdr_size > 24:
212            self._info = file.read(self._hdr_size - 24)
213            self._info, _, _ = self._info.partition(b'\0')
214        else:
215            self._info = b''
216        try:
217            self._data_pos = file.tell()
218        except (AttributeError, OSError):
219            self._data_pos = None
220
221    def getfp(self):
222        return self._file
223
224    def getnchannels(self):
225        return self._nchannels
226
227    def getsampwidth(self):
228        return self._sampwidth
229
230    def getframerate(self):
231        return self._framerate
232
233    def getnframes(self):
234        if self._data_size == AUDIO_UNKNOWN_SIZE:
235            return AUDIO_UNKNOWN_SIZE
236        if self._encoding in _simple_encodings:
237            return self._data_size // self._framesize
238        return 0                # XXX--must do some arithmetic here
239
240    def getcomptype(self):
241        if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
242            return 'ULAW'
243        elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
244            return 'ALAW'
245        else:
246            return 'NONE'
247
248    def getcompname(self):
249        if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
250            return 'CCITT G.711 u-law'
251        elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8:
252            return 'CCITT G.711 A-law'
253        else:
254            return 'not compressed'
255
256    def getparams(self):
257        return _sunau_params(self.getnchannels(), self.getsampwidth(),
258                  self.getframerate(), self.getnframes(),
259                  self.getcomptype(), self.getcompname())
260
261    def getmarkers(self):
262        return None
263
264    def getmark(self, id):
265        raise Error('no marks')
266
267    def readframes(self, nframes):
268        if self._encoding in _simple_encodings:
269            if nframes == AUDIO_UNKNOWN_SIZE:
270                data = self._file.read()
271            else:
272                data = self._file.read(nframes * self._framesize)
273            self._soundpos += len(data) // self._framesize
274            if self._encoding == AUDIO_FILE_ENCODING_MULAW_8:
275                import audioop
276                data = audioop.ulaw2lin(data, self._sampwidth)
277            return data
278        return None             # XXX--not implemented yet
279
280    def rewind(self):
281        if self._data_pos is None:
282            raise OSError('cannot seek')
283        self._file.seek(self._data_pos)
284        self._soundpos = 0
285
286    def tell(self):
287        return self._soundpos
288
289    def setpos(self, pos):
290        if pos < 0 or pos > self.getnframes():
291            raise Error('position not in range')
292        if self._data_pos is None:
293            raise OSError('cannot seek')
294        self._file.seek(self._data_pos + pos * self._framesize)
295        self._soundpos = pos
296
297    def close(self):
298        file = self._file
299        if file:
300            self._file = None
301            if self._opened:
302                file.close()
303
304class Au_write:
305
306    def __init__(self, f):
307        if type(f) == type(''):
308            import builtins
309            f = builtins.open(f, 'wb')
310            self._opened = True
311        else:
312            self._opened = False
313        self.initfp(f)
314
315    def __del__(self):
316        if self._file:
317            self.close()
318        self._file = None
319
320    def __enter__(self):
321        return self
322
323    def __exit__(self, *args):
324        self.close()
325
326    def initfp(self, file):
327        self._file = file
328        self._framerate = 0
329        self._nchannels = 0
330        self._sampwidth = 0
331        self._framesize = 0
332        self._nframes = AUDIO_UNKNOWN_SIZE
333        self._nframeswritten = 0
334        self._datawritten = 0
335        self._datalength = 0
336        self._info = b''
337        self._comptype = 'ULAW' # default is U-law
338
339    def setnchannels(self, nchannels):
340        if self._nframeswritten:
341            raise Error('cannot change parameters after starting to write')
342        if nchannels not in (1, 2, 4):
343            raise Error('only 1, 2, or 4 channels supported')
344        self._nchannels = nchannels
345
346    def getnchannels(self):
347        if not self._nchannels:
348            raise Error('number of channels not set')
349        return self._nchannels
350
351    def setsampwidth(self, sampwidth):
352        if self._nframeswritten:
353            raise Error('cannot change parameters after starting to write')
354        if sampwidth not in (1, 2, 3, 4):
355            raise Error('bad sample width')
356        self._sampwidth = sampwidth
357
358    def getsampwidth(self):
359        if not self._framerate:
360            raise Error('sample width not specified')
361        return self._sampwidth
362
363    def setframerate(self, framerate):
364        if self._nframeswritten:
365            raise Error('cannot change parameters after starting to write')
366        self._framerate = framerate
367
368    def getframerate(self):
369        if not self._framerate:
370            raise Error('frame rate not set')
371        return self._framerate
372
373    def setnframes(self, nframes):
374        if self._nframeswritten:
375            raise Error('cannot change parameters after starting to write')
376        if nframes < 0:
377            raise Error('# of frames cannot be negative')
378        self._nframes = nframes
379
380    def getnframes(self):
381        return self._nframeswritten
382
383    def setcomptype(self, type, name):
384        if type in ('NONE', 'ULAW'):
385            self._comptype = type
386        else:
387            raise Error('unknown compression type')
388
389    def getcomptype(self):
390        return self._comptype
391
392    def getcompname(self):
393        if self._comptype == 'ULAW':
394            return 'CCITT G.711 u-law'
395        elif self._comptype == 'ALAW':
396            return 'CCITT G.711 A-law'
397        else:
398            return 'not compressed'
399
400    def setparams(self, params):
401        nchannels, sampwidth, framerate, nframes, comptype, compname = params
402        self.setnchannels(nchannels)
403        self.setsampwidth(sampwidth)
404        self.setframerate(framerate)
405        self.setnframes(nframes)
406        self.setcomptype(comptype, compname)
407
408    def getparams(self):
409        return _sunau_params(self.getnchannels(), self.getsampwidth(),
410                  self.getframerate(), self.getnframes(),
411                  self.getcomptype(), self.getcompname())
412
413    def tell(self):
414        return self._nframeswritten
415
416    def writeframesraw(self, data):
417        if not isinstance(data, (bytes, bytearray)):
418            data = memoryview(data).cast('B')
419        self._ensure_header_written()
420        if self._comptype == 'ULAW':
421            import audioop
422            data = audioop.lin2ulaw(data, self._sampwidth)
423        nframes = len(data) // self._framesize
424        self._file.write(data)
425        self._nframeswritten = self._nframeswritten + nframes
426        self._datawritten = self._datawritten + len(data)
427
428    def writeframes(self, data):
429        self.writeframesraw(data)
430        if self._nframeswritten != self._nframes or \
431                  self._datalength != self._datawritten:
432            self._patchheader()
433
434    def close(self):
435        if self._file:
436            try:
437                self._ensure_header_written()
438                if self._nframeswritten != self._nframes or \
439                        self._datalength != self._datawritten:
440                    self._patchheader()
441                self._file.flush()
442            finally:
443                file = self._file
444                self._file = None
445                if self._opened:
446                    file.close()
447
448    #
449    # private methods
450    #
451
452    def _ensure_header_written(self):
453        if not self._nframeswritten:
454            if not self._nchannels:
455                raise Error('# of channels not specified')
456            if not self._sampwidth:
457                raise Error('sample width not specified')
458            if not self._framerate:
459                raise Error('frame rate not specified')
460            self._write_header()
461
462    def _write_header(self):
463        if self._comptype == 'NONE':
464            if self._sampwidth == 1:
465                encoding = AUDIO_FILE_ENCODING_LINEAR_8
466                self._framesize = 1
467            elif self._sampwidth == 2:
468                encoding = AUDIO_FILE_ENCODING_LINEAR_16
469                self._framesize = 2
470            elif self._sampwidth == 3:
471                encoding = AUDIO_FILE_ENCODING_LINEAR_24
472                self._framesize = 3
473            elif self._sampwidth == 4:
474                encoding = AUDIO_FILE_ENCODING_LINEAR_32
475                self._framesize = 4
476            else:
477                raise Error('internal error')
478        elif self._comptype == 'ULAW':
479            encoding = AUDIO_FILE_ENCODING_MULAW_8
480            self._framesize = 1
481        else:
482            raise Error('internal error')
483        self._framesize = self._framesize * self._nchannels
484        _write_u32(self._file, AUDIO_FILE_MAGIC)
485        header_size = 25 + len(self._info)
486        header_size = (header_size + 7) & ~7
487        _write_u32(self._file, header_size)
488        if self._nframes == AUDIO_UNKNOWN_SIZE:
489            length = AUDIO_UNKNOWN_SIZE
490        else:
491            length = self._nframes * self._framesize
492        try:
493            self._form_length_pos = self._file.tell()
494        except (AttributeError, OSError):
495            self._form_length_pos = None
496        _write_u32(self._file, length)
497        self._datalength = length
498        _write_u32(self._file, encoding)
499        _write_u32(self._file, self._framerate)
500        _write_u32(self._file, self._nchannels)
501        self._file.write(self._info)
502        self._file.write(b'\0'*(header_size - len(self._info) - 24))
503
504    def _patchheader(self):
505        if self._form_length_pos is None:
506            raise OSError('cannot seek')
507        self._file.seek(self._form_length_pos)
508        _write_u32(self._file, self._datawritten)
509        self._datalength = self._datawritten
510        self._file.seek(0, 2)
511
512def open(f, mode=None):
513    if mode is None:
514        if hasattr(f, 'mode'):
515            mode = f.mode
516        else:
517            mode = 'rb'
518    if mode in ('r', 'rb'):
519        return Au_read(f)
520    elif mode in ('w', 'wb'):
521        return Au_write(f)
522    else:
523        raise Error("mode must be 'r', 'rb', 'w', or 'wb'")
524
525openfp = open
526