• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2022 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import numpy as np
19import scipy.signal as signal
20import scipy.io.wavfile as wavfile
21import struct
22import argparse
23
24import build.lc3 as lc3
25import tables as T, appendix_c as C
26
27import attdet, ltpf
28import mdct, energy, bwdet, sns, tns, spec
29import bitstream
30
31### ------------------------------------------------------------------------ ###
32
33class Encoder:
34
35    def __init__(self, dt_ms, sr_hz):
36
37        dt = { 7.5: T.DT_7M5, 10: T.DT_10M }[dt_ms]
38
39        sr = {  8000: T.SRATE_8K , 16000: T.SRATE_16K, 24000: T.SRATE_24K,
40               32000: T.SRATE_32K, 48000: T.SRATE_48K }[sr_hz]
41
42        self.ne = T.NE[dt][sr]
43
44        self.attdet = attdet.AttackDetector(dt, sr)
45        self.ltpf = ltpf.Ltpf(dt, sr)
46
47        self.mdct = mdct.Mdct(dt, sr)
48        self.energy = e_energy.EnergyBand(dt, sr)
49        self.bwdet = bwdet.BandwidthDetector(dt, sr)
50        self.sns = sns.SnsAnalysis(dt, sr)
51        self.tns = tns.TnsAnalysis(dt)
52        self.spec = spec.SpectrumEncoder(dt, sr)
53
54    def analyse(self, x, nbytes):
55
56        att = self.attdet.run(nbytes, x)
57
58        pitch_present = self.ltpf.run(x)
59
60        x = self.mdct.forward(x)[:self.ne]
61
62        (e, nn_flag) = self.energy.compute(x)
63        if nn_flag:
64            self.ltpf.disable()
65
66        bw = self.bwdet.run(e)
67
68        x = self.sns.run(e, att, x)
69
70        x = self.tns.run(x, bw, nn_flag, nbytes)
71
72        (xq, lastnz, x) = self.spec.quantize(bw, nbytes,
73            self.bwdet.get_nbits(), self.ltpf.get_nbits(),
74            self.sns.get_nbits(), self.tns.get_nbits(), x)
75
76        return pitch_present
77
78    def encode(self, pitch_present, nbytes):
79
80        b = bitstream.BitstreamWriter(nbytes)
81
82        self.bwdet.store(b)
83
84        self.spec.store(b)
85
86        self.tns.store(b)
87
88        b.write_bit(pitch_present)
89
90        self.sns.store(b)
91
92        if pitch_present:
93            self.ltpf.store_data(b)
94
95        self.spec.encode(b)
96
97        return b.terminate()
98
99    def run(self, x, nbytes):
100
101        pitch_present = self.analyse(x, nbytes)
102
103        data = self.encode(pitch_present, nbytes)
104
105        return data
106
107### ------------------------------------------------------------------------ ###
108
109def check_appendix_c(dt):
110
111    ok = True
112
113    enc_c = lc3.setup_encoder(int(T.DT_MS[dt] * 1000), 16000)
114
115    for i in range(len(C.X_PCM[dt])):
116
117        data = lc3.encode(enc_c, C.X_PCM[dt][i], C.NBYTES[dt])
118        ok = ok and data == C.BYTES_AC[dt][i]
119        if not ok:
120            dump(data)
121            dump(C.BYTES_AC[dt][i])
122
123    return ok
124
125def check():
126
127    ok = True
128
129    for dt in range(T.NUM_DT):
130        ok = ok and check_appendix_c(dt)
131
132    return ok
133
134### ------------------------------------------------------------------------ ###
135
136def dump(data):
137    for i in range(0, len(data), 20):
138        print(''.join('{:02x} '.format(x)
139            for x in data[i:min(i+20, len(data))] ))
140
141if __name__ == "__main__":
142
143    parser = argparse.ArgumentParser(description='LC3 Encoder Test Framework')
144    parser.add_argument('wav_file',
145        help='Input wave file', type=argparse.FileType('r'))
146    parser.add_argument('--bitrate',
147        help='Bitrate in bps', type=int, required=True)
148    parser.add_argument('--dt',
149        help='Frame duration in ms', type=float, default=10)
150    parser.add_argument('--pyout',
151        help='Python output file', type=argparse.FileType('w'))
152    parser.add_argument('--cout',
153        help='C output file', type=argparse.FileType('w'))
154    args = parser.parse_args()
155
156    if args.bitrate < 16000 or args.bitrate > 320000:
157        raise ValueError('Invalid bitate %d bps' % args.bitrate)
158
159    if args.dt not in (7.5, 10):
160        raise ValueError('Invalid frame duration %.1f ms' % args.dt)
161
162    (sr_hz, pcm) = wavfile.read(args.wav_file.name)
163    if sr_hz not in (8000, 16000, 24000, 320000, 48000):
164        raise ValueError('Unsupported input samplerate: %d' % sr_hz)
165
166    ### Setup ###
167
168    enc = Encoder(args.dt, sr_hz)
169    enc_c = lc3.setup_encoder(int(args.dt * 1000), sr_hz)
170
171    frame_samples = int((args.dt * sr_hz) / 1000)
172    frame_nbytes = int((args.bitrate * args.dt) / (1000 * 8))
173
174    ### File Header ###
175
176    f_py = open(args.pyout.name, 'wb') if args.pyout else None
177    f_c  = open(args.cout.name , 'wb') if args.cout  else None
178
179    header = struct.pack('=HHHHHHHI', 0xcc1c, 18,
180        sr_hz // 100, args.bitrate // 100, 1, int(args.dt * 100), 0, len(pcm))
181
182    for f in (f_py, f_c):
183        if f: f.write(header)
184
185    ### Encoding loop ###
186
187    if len(pcm) % frame_samples > 0:
188        pcm = np.append(pcm, np.zeros(frame_samples - (len(pcm) % frame_samples)))
189
190    for i in range(0, len(pcm), frame_samples):
191
192        print('Encoding frame %d' % (i // frame_samples), end='\r')
193
194        frame_pcm = pcm[i:i+frame_samples]
195
196        data = enc.run(frame_pcm, frame_nbytes)
197        data_c = lc3.encode(enc_c, frame_pcm, frame_nbytes)
198
199        for f in (f_py, f_c):
200            if f: f.write(struct.pack('=H', frame_nbytes))
201
202        if f_py: f_py.write(data)
203        if f_c: f_c.write(data_c)
204
205    print('done ! %16s' % '')
206
207    ### Terminate ###
208
209    for f in (f_py, f_c):
210        if f: f.close()
211
212
213### ------------------------------------------------------------------------ ###
214