1#!/usr/bin/env python3 2# 3# Copyright 2022 Google LLC 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import numpy as np 19import scipy.signal as signal 20import scipy.io.wavfile as wavfile 21import struct 22import argparse 23 24import build.lc3 as lc3 25import tables as T, appendix_c as C 26 27import attdet, ltpf 28import mdct, energy, bwdet, sns, tns, spec 29import bitstream 30 31### ------------------------------------------------------------------------ ### 32 33class Encoder: 34 35 def __init__(self, dt_ms, sr_hz): 36 37 dt = { 7.5: T.DT_7M5, 10: T.DT_10M }[dt_ms] 38 39 sr = { 8000: T.SRATE_8K , 16000: T.SRATE_16K, 24000: T.SRATE_24K, 40 32000: T.SRATE_32K, 48000: T.SRATE_48K }[sr_hz] 41 42 self.ne = T.NE[dt][sr] 43 44 self.attdet = attdet.AttackDetector(dt, sr) 45 self.ltpf = ltpf.Ltpf(dt, sr) 46 47 self.mdct = mdct.Mdct(dt, sr) 48 self.energy = e_energy.EnergyBand(dt, sr) 49 self.bwdet = bwdet.BandwidthDetector(dt, sr) 50 self.sns = sns.SnsAnalysis(dt, sr) 51 self.tns = tns.TnsAnalysis(dt) 52 self.spec = spec.SpectrumEncoder(dt, sr) 53 54 def analyse(self, x, nbytes): 55 56 att = self.attdet.run(nbytes, x) 57 58 pitch_present = self.ltpf.run(x) 59 60 x = self.mdct.forward(x)[:self.ne] 61 62 (e, nn_flag) = self.energy.compute(x) 63 if nn_flag: 64 self.ltpf.disable() 65 66 bw = self.bwdet.run(e) 67 68 x = self.sns.run(e, att, x) 69 70 x = self.tns.run(x, bw, nn_flag, nbytes) 71 72 (xq, lastnz, x) = self.spec.quantize(bw, nbytes, 73 self.bwdet.get_nbits(), self.ltpf.get_nbits(), 74 self.sns.get_nbits(), self.tns.get_nbits(), x) 75 76 return pitch_present 77 78 def encode(self, pitch_present, nbytes): 79 80 b = bitstream.BitstreamWriter(nbytes) 81 82 self.bwdet.store(b) 83 84 self.spec.store(b) 85 86 self.tns.store(b) 87 88 b.write_bit(pitch_present) 89 90 self.sns.store(b) 91 92 if pitch_present: 93 self.ltpf.store_data(b) 94 95 self.spec.encode(b) 96 97 return b.terminate() 98 99 def run(self, x, nbytes): 100 101 pitch_present = self.analyse(x, nbytes) 102 103 data = self.encode(pitch_present, nbytes) 104 105 return data 106 107### ------------------------------------------------------------------------ ### 108 109def check_appendix_c(dt): 110 111 ok = True 112 113 enc_c = lc3.setup_encoder(int(T.DT_MS[dt] * 1000), 16000) 114 115 for i in range(len(C.X_PCM[dt])): 116 117 data = lc3.encode(enc_c, C.X_PCM[dt][i], C.NBYTES[dt]) 118 ok = ok and data == C.BYTES_AC[dt][i] 119 if not ok: 120 dump(data) 121 dump(C.BYTES_AC[dt][i]) 122 123 return ok 124 125def check(): 126 127 ok = True 128 129 for dt in range(T.NUM_DT): 130 ok = ok and check_appendix_c(dt) 131 132 return ok 133 134### ------------------------------------------------------------------------ ### 135 136def dump(data): 137 for i in range(0, len(data), 20): 138 print(''.join('{:02x} '.format(x) 139 for x in data[i:min(i+20, len(data))] )) 140 141if __name__ == "__main__": 142 143 parser = argparse.ArgumentParser(description='LC3 Encoder Test Framework') 144 parser.add_argument('wav_file', 145 help='Input wave file', type=argparse.FileType('r')) 146 parser.add_argument('--bitrate', 147 help='Bitrate in bps', type=int, required=True) 148 parser.add_argument('--dt', 149 help='Frame duration in ms', type=float, default=10) 150 parser.add_argument('--pyout', 151 help='Python output file', type=argparse.FileType('w')) 152 parser.add_argument('--cout', 153 help='C output file', type=argparse.FileType('w')) 154 args = parser.parse_args() 155 156 if args.bitrate < 16000 or args.bitrate > 320000: 157 raise ValueError('Invalid bitate %d bps' % args.bitrate) 158 159 if args.dt not in (7.5, 10): 160 raise ValueError('Invalid frame duration %.1f ms' % args.dt) 161 162 (sr_hz, pcm) = wavfile.read(args.wav_file.name) 163 if sr_hz not in (8000, 16000, 24000, 320000, 48000): 164 raise ValueError('Unsupported input samplerate: %d' % sr_hz) 165 166 ### Setup ### 167 168 enc = Encoder(args.dt, sr_hz) 169 enc_c = lc3.setup_encoder(int(args.dt * 1000), sr_hz) 170 171 frame_samples = int((args.dt * sr_hz) / 1000) 172 frame_nbytes = int((args.bitrate * args.dt) / (1000 * 8)) 173 174 ### File Header ### 175 176 f_py = open(args.pyout.name, 'wb') if args.pyout else None 177 f_c = open(args.cout.name , 'wb') if args.cout else None 178 179 header = struct.pack('=HHHHHHHI', 0xcc1c, 18, 180 sr_hz // 100, args.bitrate // 100, 1, int(args.dt * 100), 0, len(pcm)) 181 182 for f in (f_py, f_c): 183 if f: f.write(header) 184 185 ### Encoding loop ### 186 187 if len(pcm) % frame_samples > 0: 188 pcm = np.append(pcm, np.zeros(frame_samples - (len(pcm) % frame_samples))) 189 190 for i in range(0, len(pcm), frame_samples): 191 192 print('Encoding frame %d' % (i // frame_samples), end='\r') 193 194 frame_pcm = pcm[i:i+frame_samples] 195 196 data = enc.run(frame_pcm, frame_nbytes) 197 data_c = lc3.encode(enc_c, frame_pcm, frame_nbytes) 198 199 for f in (f_py, f_c): 200 if f: f.write(struct.pack('=H', frame_nbytes)) 201 202 if f_py: f_py.write(data) 203 if f_c: f_c.write(data_c) 204 205 print('done ! %16s' % '') 206 207 ### Terminate ### 208 209 for f in (f_py, f_c): 210 if f: f.close() 211 212 213### ------------------------------------------------------------------------ ### 214