1from fontTools.misc.roundTools import noRound, otRound 2from fontTools.ttLib.tables import otTables as ot 3from fontTools.varLib.models import supportScalar 4from fontTools.varLib.builder import (buildVarRegionList, buildVarStore, 5 buildVarRegion, buildVarData) 6from functools import partial 7from collections import defaultdict 8from array import array 9 10 11def _getLocationKey(loc): 12 return tuple(sorted(loc.items(), key=lambda kv: kv[0])) 13 14 15class OnlineVarStoreBuilder(object): 16 17 def __init__(self, axisTags): 18 self._axisTags = axisTags 19 self._regionMap = {} 20 self._regionList = buildVarRegionList([], axisTags) 21 self._store = buildVarStore(self._regionList, []) 22 self._data = None 23 self._model = None 24 self._supports = None 25 self._varDataIndices = {} 26 self._varDataCaches = {} 27 self._cache = {} 28 29 def setModel(self, model): 30 self.setSupports(model.supports) 31 self._model = model 32 33 def setSupports(self, supports): 34 self._model = None 35 self._supports = list(supports) 36 if not self._supports[0]: 37 del self._supports[0] # Drop base master support 38 self._cache = {} 39 self._data = None 40 41 def finish(self, optimize=True): 42 self._regionList.RegionCount = len(self._regionList.Region) 43 self._store.VarDataCount = len(self._store.VarData) 44 for data in self._store.VarData: 45 data.ItemCount = len(data.Item) 46 data.calculateNumShorts(optimize=optimize) 47 return self._store 48 49 def _add_VarData(self): 50 regionMap = self._regionMap 51 regionList = self._regionList 52 53 regions = self._supports 54 regionIndices = [] 55 for region in regions: 56 key = _getLocationKey(region) 57 idx = regionMap.get(key) 58 if idx is None: 59 varRegion = buildVarRegion(region, self._axisTags) 60 idx = regionMap[key] = len(regionList.Region) 61 regionList.Region.append(varRegion) 62 regionIndices.append(idx) 63 64 # Check if we have one already... 65 key = tuple(regionIndices) 66 varDataIdx = self._varDataIndices.get(key) 67 if varDataIdx is not None: 68 self._outer = varDataIdx 69 self._data = self._store.VarData[varDataIdx] 70 self._cache = self._varDataCaches[key] 71 if len(self._data.Item) == 0xFFFF: 72 # This is full. Need new one. 73 varDataIdx = None 74 75 if varDataIdx is None: 76 self._data = buildVarData(regionIndices, [], optimize=False) 77 self._outer = len(self._store.VarData) 78 self._store.VarData.append(self._data) 79 self._varDataIndices[key] = self._outer 80 if key not in self._varDataCaches: 81 self._varDataCaches[key] = {} 82 self._cache = self._varDataCaches[key] 83 84 85 def storeMasters(self, master_values): 86 deltas = self._model.getDeltas(master_values, round=round) 87 base = deltas.pop(0) 88 return base, self.storeDeltas(deltas, round=noRound) 89 90 def storeDeltas(self, deltas, *, round=round): 91 deltas = [round(d) for d in deltas] 92 if len(deltas) == len(self._supports) + 1: 93 deltas = tuple(deltas[1:]) 94 else: 95 assert len(deltas) == len(self._supports) 96 deltas = tuple(deltas) 97 98 varIdx = self._cache.get(deltas) 99 if varIdx is not None: 100 return varIdx 101 102 if not self._data: 103 self._add_VarData() 104 inner = len(self._data.Item) 105 if inner == 0xFFFF: 106 # Full array. Start new one. 107 self._add_VarData() 108 return self.storeDeltas(deltas) 109 self._data.addItem(deltas, round=noRound) 110 111 varIdx = (self._outer << 16) + inner 112 self._cache[deltas] = varIdx 113 return varIdx 114 115def VarData_addItem(self, deltas, *, round=round): 116 deltas = [round(d) for d in deltas] 117 118 countUs = self.VarRegionCount 119 countThem = len(deltas) 120 if countUs + 1 == countThem: 121 deltas = tuple(deltas[1:]) 122 else: 123 assert countUs == countThem, (countUs, countThem) 124 deltas = tuple(deltas) 125 self.Item.append(list(deltas)) 126 self.ItemCount = len(self.Item) 127 128ot.VarData.addItem = VarData_addItem 129 130def VarRegion_get_support(self, fvar_axes): 131 return { 132 fvar_axes[i].axisTag: (reg.StartCoord,reg.PeakCoord,reg.EndCoord) 133 for i, reg in enumerate(self.VarRegionAxis) 134 if reg.PeakCoord != 0 135 } 136 137ot.VarRegion.get_support = VarRegion_get_support 138 139class VarStoreInstancer(object): 140 141 def __init__(self, varstore, fvar_axes, location={}): 142 self.fvar_axes = fvar_axes 143 assert varstore is None or varstore.Format == 1 144 self._varData = varstore.VarData if varstore else [] 145 self._regions = varstore.VarRegionList.Region if varstore else [] 146 self.setLocation(location) 147 148 def setLocation(self, location): 149 self.location = dict(location) 150 self._clearCaches() 151 152 def _clearCaches(self): 153 self._scalars = {} 154 155 def _getScalar(self, regionIdx): 156 scalar = self._scalars.get(regionIdx) 157 if scalar is None: 158 support = self._regions[regionIdx].get_support(self.fvar_axes) 159 scalar = supportScalar(self.location, support) 160 self._scalars[regionIdx] = scalar 161 return scalar 162 163 @staticmethod 164 def interpolateFromDeltasAndScalars(deltas, scalars): 165 delta = 0. 166 for d,s in zip(deltas, scalars): 167 if not s: continue 168 delta += d * s 169 return delta 170 171 def __getitem__(self, varidx): 172 major, minor = varidx >> 16, varidx & 0xFFFF 173 varData = self._varData 174 scalars = [self._getScalar(ri) for ri in varData[major].VarRegionIndex] 175 deltas = varData[major].Item[minor] 176 return self.interpolateFromDeltasAndScalars(deltas, scalars) 177 178 def interpolateFromDeltas(self, varDataIndex, deltas): 179 varData = self._varData 180 scalars = [self._getScalar(ri) for ri in 181 varData[varDataIndex].VarRegionIndex] 182 return self.interpolateFromDeltasAndScalars(deltas, scalars) 183 184 185# 186# Optimizations 187# 188# retainFirstMap - If true, major 0 mappings are retained. Deltas for unused indices are zeroed 189# advIdxes - Set of major 0 indices for advance deltas to be listed first. Other major 0 indices follow. 190 191def VarStore_subset_varidxes(self, varIdxes, optimize=True, retainFirstMap=False, advIdxes=set()): 192 193 # Sort out used varIdxes by major/minor. 194 used = {} 195 for varIdx in varIdxes: 196 major = varIdx >> 16 197 minor = varIdx & 0xFFFF 198 d = used.get(major) 199 if d is None: 200 d = used[major] = set() 201 d.add(minor) 202 del varIdxes 203 204 # 205 # Subset VarData 206 # 207 208 varData = self.VarData 209 newVarData = [] 210 varDataMap = {} 211 for major,data in enumerate(varData): 212 usedMinors = used.get(major) 213 if usedMinors is None: 214 continue 215 newMajor = len(newVarData) 216 newVarData.append(data) 217 218 items = data.Item 219 newItems = [] 220 if major == 0 and retainFirstMap: 221 for minor in range(len(items)): 222 newItems.append(items[minor] if minor in usedMinors else [0] * len(items[minor])) 223 varDataMap[minor] = minor 224 else: 225 if major == 0: 226 minors = sorted(advIdxes) + sorted(usedMinors - advIdxes) 227 else: 228 minors = sorted(usedMinors) 229 for minor in minors: 230 newMinor = len(newItems) 231 newItems.append(items[minor]) 232 varDataMap[(major<<16)+minor] = (newMajor<<16)+newMinor 233 234 data.Item = newItems 235 data.ItemCount = len(data.Item) 236 237 data.calculateNumShorts(optimize=optimize) 238 239 self.VarData = newVarData 240 self.VarDataCount = len(self.VarData) 241 242 self.prune_regions() 243 244 return varDataMap 245 246ot.VarStore.subset_varidxes = VarStore_subset_varidxes 247 248def VarStore_prune_regions(self): 249 """Remove unused VarRegions.""" 250 # 251 # Subset VarRegionList 252 # 253 254 # Collect. 255 usedRegions = set() 256 for data in self.VarData: 257 usedRegions.update(data.VarRegionIndex) 258 # Subset. 259 regionList = self.VarRegionList 260 regions = regionList.Region 261 newRegions = [] 262 regionMap = {} 263 for i in sorted(usedRegions): 264 regionMap[i] = len(newRegions) 265 newRegions.append(regions[i]) 266 regionList.Region = newRegions 267 regionList.RegionCount = len(regionList.Region) 268 # Map. 269 for data in self.VarData: 270 data.VarRegionIndex = [regionMap[i] for i in data.VarRegionIndex] 271 272ot.VarStore.prune_regions = VarStore_prune_regions 273 274 275def _visit(self, func): 276 """Recurse down from self, if type of an object is ot.Device, 277 call func() on it. Works on otData-style classes.""" 278 279 if type(self) == ot.Device: 280 func(self) 281 282 elif isinstance(self, list): 283 for that in self: 284 _visit(that, func) 285 286 elif hasattr(self, 'getConverters') and not hasattr(self, 'postRead'): 287 for conv in self.getConverters(): 288 that = getattr(self, conv.name, None) 289 if that is not None: 290 _visit(that, func) 291 292 elif isinstance(self, ot.ValueRecord): 293 for that in self.__dict__.values(): 294 _visit(that, func) 295 296def _Device_recordVarIdx(self, s): 297 """Add VarIdx in this Device table (if any) to the set s.""" 298 if self.DeltaFormat == 0x8000: 299 s.add((self.StartSize<<16)+self.EndSize) 300 301def Object_collect_device_varidxes(self, varidxes): 302 adder = partial(_Device_recordVarIdx, s=varidxes) 303 _visit(self, adder) 304 305ot.GDEF.collect_device_varidxes = Object_collect_device_varidxes 306ot.GPOS.collect_device_varidxes = Object_collect_device_varidxes 307 308def _Device_mapVarIdx(self, mapping, done): 309 """Map VarIdx in this Device table (if any) through mapping.""" 310 if id(self) in done: 311 return 312 done.add(id(self)) 313 if self.DeltaFormat == 0x8000: 314 varIdx = mapping[(self.StartSize<<16)+self.EndSize] 315 self.StartSize = varIdx >> 16 316 self.EndSize = varIdx & 0xFFFF 317 318def Object_remap_device_varidxes(self, varidxes_map): 319 mapper = partial(_Device_mapVarIdx, mapping=varidxes_map, done=set()) 320 _visit(self, mapper) 321 322ot.GDEF.remap_device_varidxes = Object_remap_device_varidxes 323ot.GPOS.remap_device_varidxes = Object_remap_device_varidxes 324 325 326class _Encoding(object): 327 328 def __init__(self, chars): 329 self.chars = chars 330 self.width = self._popcount(chars) 331 self.overhead = self._characteristic_overhead(chars) 332 self.items = set() 333 334 def append(self, row): 335 self.items.add(row) 336 337 def extend(self, lst): 338 self.items.update(lst) 339 340 def get_room(self): 341 """Maximum number of bytes that can be added to characteristic 342 while still being beneficial to merge it into another one.""" 343 count = len(self.items) 344 return max(0, (self.overhead - 1) // count - self.width) 345 room = property(get_room) 346 347 @property 348 def gain(self): 349 """Maximum possible byte gain from merging this into another 350 characteristic.""" 351 count = len(self.items) 352 return max(0, self.overhead - count * (self.width + 1)) 353 354 def sort_key(self): 355 return self.width, self.chars 356 357 def __len__(self): 358 return len(self.items) 359 360 def can_encode(self, chars): 361 return not (chars & ~self.chars) 362 363 def __sub__(self, other): 364 return self._popcount(self.chars & ~other.chars) 365 366 @staticmethod 367 def _popcount(n): 368 # Apparently this is the fastest native way to do it... 369 # https://stackoverflow.com/a/9831671 370 return bin(n).count('1') 371 372 @staticmethod 373 def _characteristic_overhead(chars): 374 """Returns overhead in bytes of encoding this characteristic 375 as a VarData.""" 376 c = 6 377 while chars: 378 if chars & 3: 379 c += 2 380 chars >>= 2 381 return c 382 383 384 def _find_yourself_best_new_encoding(self, done_by_width): 385 self.best_new_encoding = None 386 for new_width in range(self.width+1, self.width+self.room+1): 387 for new_encoding in done_by_width[new_width]: 388 if new_encoding.can_encode(self.chars): 389 break 390 else: 391 new_encoding = None 392 self.best_new_encoding = new_encoding 393 394 395class _EncodingDict(dict): 396 397 def __missing__(self, chars): 398 r = self[chars] = _Encoding(chars) 399 return r 400 401 def add_row(self, row): 402 chars = self._row_characteristics(row) 403 self[chars].append(row) 404 405 @staticmethod 406 def _row_characteristics(row): 407 """Returns encoding characteristics for a row.""" 408 chars = 0 409 i = 1 410 for v in row: 411 if v: 412 chars += i 413 if not (-128 <= v <= 127): 414 chars += i * 2 415 i <<= 2 416 return chars 417 418 419def VarStore_optimize(self): 420 """Optimize storage. Returns mapping from old VarIdxes to new ones.""" 421 422 # TODO 423 # Check that no two VarRegions are the same; if they are, fold them. 424 425 n = len(self.VarRegionList.Region) # Number of columns 426 zeroes = array('h', [0]*n) 427 428 front_mapping = {} # Map from old VarIdxes to full row tuples 429 430 encodings = _EncodingDict() 431 432 # Collect all items into a set of full rows (with lots of zeroes.) 433 for major,data in enumerate(self.VarData): 434 regionIndices = data.VarRegionIndex 435 436 for minor,item in enumerate(data.Item): 437 438 row = array('h', zeroes) 439 for regionIdx,v in zip(regionIndices, item): 440 row[regionIdx] += v 441 row = tuple(row) 442 443 encodings.add_row(row) 444 front_mapping[(major<<16)+minor] = row 445 446 # Separate encodings that have no gain (are decided) and those having 447 # possible gain (possibly to be merged into others.) 448 encodings = sorted(encodings.values(), key=_Encoding.__len__, reverse=True) 449 done_by_width = defaultdict(list) 450 todo = [] 451 for encoding in encodings: 452 if not encoding.gain: 453 done_by_width[encoding.width].append(encoding) 454 else: 455 todo.append(encoding) 456 457 # For each encoding that is possibly to be merged, find the best match 458 # in the decided encodings, and record that. 459 todo.sort(key=_Encoding.get_room) 460 for encoding in todo: 461 encoding._find_yourself_best_new_encoding(done_by_width) 462 463 # Walk through todo encodings, for each, see if merging it with 464 # another todo encoding gains more than each of them merging with 465 # their best decided encoding. If yes, merge them and add resulting 466 # encoding back to todo queue. If not, move the enconding to decided 467 # list. Repeat till done. 468 while todo: 469 encoding = todo.pop() 470 best_idx = None 471 best_gain = 0 472 for i,other_encoding in enumerate(todo): 473 combined_chars = other_encoding.chars | encoding.chars 474 combined_width = _Encoding._popcount(combined_chars) 475 combined_overhead = _Encoding._characteristic_overhead(combined_chars) 476 combined_gain = ( 477 + encoding.overhead 478 + other_encoding.overhead 479 - combined_overhead 480 - (combined_width - encoding.width) * len(encoding) 481 - (combined_width - other_encoding.width) * len(other_encoding) 482 ) 483 this_gain = 0 if encoding.best_new_encoding is None else ( 484 + encoding.overhead 485 - (encoding.best_new_encoding.width - encoding.width) * len(encoding) 486 ) 487 other_gain = 0 if other_encoding.best_new_encoding is None else ( 488 + other_encoding.overhead 489 - (other_encoding.best_new_encoding.width - other_encoding.width) * len(other_encoding) 490 ) 491 separate_gain = this_gain + other_gain 492 493 if combined_gain > separate_gain: 494 best_idx = i 495 best_gain = combined_gain - separate_gain 496 497 if best_idx is None: 498 # Encoding is decided as is 499 done_by_width[encoding.width].append(encoding) 500 else: 501 other_encoding = todo[best_idx] 502 combined_chars = other_encoding.chars | encoding.chars 503 combined_encoding = _Encoding(combined_chars) 504 combined_encoding.extend(encoding.items) 505 combined_encoding.extend(other_encoding.items) 506 combined_encoding._find_yourself_best_new_encoding(done_by_width) 507 del todo[best_idx] 508 todo.append(combined_encoding) 509 510 # Assemble final store. 511 back_mapping = {} # Mapping from full rows to new VarIdxes 512 encodings = sum(done_by_width.values(), []) 513 encodings.sort(key=_Encoding.sort_key) 514 self.VarData = [] 515 for major,encoding in enumerate(encodings): 516 data = ot.VarData() 517 self.VarData.append(data) 518 data.VarRegionIndex = range(n) 519 data.VarRegionCount = len(data.VarRegionIndex) 520 data.Item = sorted(encoding.items) 521 for minor,item in enumerate(data.Item): 522 back_mapping[item] = (major<<16)+minor 523 524 # Compile final mapping. 525 varidx_map = {} 526 for k,v in front_mapping.items(): 527 varidx_map[k] = back_mapping[v] 528 529 # Remove unused regions. 530 self.prune_regions() 531 532 # Recalculate things and go home. 533 self.VarRegionList.RegionCount = len(self.VarRegionList.Region) 534 self.VarDataCount = len(self.VarData) 535 for data in self.VarData: 536 data.ItemCount = len(data.Item) 537 data.optimize() 538 539 return varidx_map 540 541ot.VarStore.optimize = VarStore_optimize 542 543 544def main(args=None): 545 """Optimize a font's GDEF variation store""" 546 from argparse import ArgumentParser 547 from fontTools import configLogger 548 from fontTools.ttLib import TTFont 549 from fontTools.ttLib.tables.otBase import OTTableWriter 550 551 parser = ArgumentParser(prog='varLib.varStore', description= main.__doc__) 552 parser.add_argument('fontfile') 553 parser.add_argument('outfile', nargs='?') 554 options = parser.parse_args(args) 555 556 # TODO: allow user to configure logging via command-line options 557 configLogger(level="INFO") 558 559 fontfile = options.fontfile 560 outfile = options.outfile 561 562 font = TTFont(fontfile) 563 gdef = font['GDEF'] 564 store = gdef.table.VarStore 565 566 writer = OTTableWriter() 567 store.compile(writer, font) 568 size = len(writer.getAllData()) 569 print("Before: %7d bytes" % size) 570 571 varidx_map = store.optimize() 572 573 gdef.table.remap_device_varidxes(varidx_map) 574 if 'GPOS' in font: 575 font['GPOS'].table.remap_device_varidxes(varidx_map) 576 577 writer = OTTableWriter() 578 store.compile(writer, font) 579 size = len(writer.getAllData()) 580 print("After: %7d bytes" % size) 581 582 if outfile is not None: 583 font.save(outfile) 584 585 586if __name__ == "__main__": 587 import sys 588 if len(sys.argv) > 1: 589 sys.exit(main()) 590 import doctest 591 sys.exit(doctest.testmod().failed) 592