1# Copyright (C) 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15r"""Read a MultiCarrierSettings file and update CarrierSettings data. 16 17For APNs in the input file, they are simply appended to the apn list of the 18corresponding carrier in CarrierSettings data. If a new carrier (identified by 19canonical_name) appears in input, the other_carriers.textpb will be updated. 20 21How to run: 22 23update_carrier_data.par \ 24--in_file=/tmp/tmpapns.textpb \ 25--data_dir=/tmp/carrier/data 26""" 27 28from __future__ import absolute_import 29from __future__ import division 30from __future__ import print_function 31import argparse 32import copy 33import os 34import compare 35from google.protobuf import text_format 36import carrier_list_pb2 37import carrier_settings_pb2 38 39parser = argparse.ArgumentParser() 40parser.add_argument( 41 '--data_dir', default='./data', help='Folder path for CarrierSettings data') 42parser.add_argument( 43 '--in_file', default='./tmpapns.textpb', help='Temp APN file') 44FLAGS = parser.parse_args() 45 46TIER1_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'tier1_carriers.textpb') 47OTHER_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'other_carriers.textpb') 48 49 50def equals_apn(a, b): 51 """Tell if two ApnItem proto are the same.""" 52 a = compare.NormalizeRepeatedFields(copy.deepcopy(a)) 53 b = compare.NormalizeRepeatedFields(copy.deepcopy(b)) 54 # ignore 'name' field 55 a.ClearField('name') 56 b.ClearField('name') 57 return compare.Proto2Equals(a, b) 58 59 60def find_apn(apn, apns): 61 """Tell if apn is in apns.""" 62 for a in apns: 63 if equals_apn(apn, a): 64 return True 65 return False 66 67 68def merge_mms_apn(a, b): 69 """Try to merge mmsc fields of b into a, if that's the only diff.""" 70 aa = compare.NormalizeRepeatedFields(copy.deepcopy(a)) 71 bb = compare.NormalizeRepeatedFields(copy.deepcopy(b)) 72 # check if any fields other than mms are different 73 for field in ['name', 'mmsc_proxy', 'mmsc_proxy_port']: 74 aa.ClearField(field) 75 bb.ClearField(field) 76 if compare.Proto2Equals(aa, bb): 77 for field in ['mmsc_proxy', 'mmsc_proxy_port']: 78 if b.HasField(field): 79 setattr(a, field, getattr(b, field)) 80 81 82def clean_apn(setting): 83 """Remove duplicated ApnItems from a CarrierSettings proto. 84 85 Args: 86 setting: a CarrierSettings proto 87 88 Returns: 89 None 90 """ 91 if not setting.HasField('apns') or len(setting.apns.apn) <= 1: 92 return 93 apns = setting.apns.apn[:] 94 cleaned_apns = [a for n, a in enumerate(apns) if not find_apn(a, apns[:n])] 95 del setting.apns.apn[:] 96 setting.apns.apn.extend(cleaned_apns) 97 98 99def merge_apns(dest_apns, source_apns): 100 """Merge source_apns into dest_apns.""" 101 for apn in dest_apns: 102 for source in source_apns: 103 merge_mms_apn(apn, source) 104 ret = list(dest_apns) 105 for source in source_apns: 106 if not find_apn(source, ret): 107 ret.append(source) 108 return ret 109 110 111def to_string(cid): 112 """Return the string representation of a CarrierId.""" 113 ret = cid.mcc_mnc 114 if cid.HasField('spn'): 115 ret += 'SPN=' + cid.spn.upper() 116 if cid.HasField('imsi'): 117 ret += 'IMSI=' + cid.imsi.upper() 118 if cid.HasField('gid1'): 119 ret += 'GID1=' + cid.gid1.upper() 120 return ret 121 122 123def to_carrier_id(cid_string): 124 """Return the CarrierId from its string representation.""" 125 cid = carrier_list_pb2.CarrierId() 126 if 'SPN=' in cid_string: 127 ind = cid_string.find('SPN=') 128 cid.mcc_mnc = cid_string[:ind] 129 cid.spn = cid_string[ind + len('SPN='):] 130 elif 'IMSI=' in cid_string: 131 ind = cid_string.find('IMSI=') 132 cid.mcc_mnc = cid_string[:ind] 133 cid.imsi = cid_string[ind + len('IMSI='):] 134 elif 'GID1=' in cid_string: 135 ind = cid_string.find('GID1=') 136 cid.mcc_mnc = cid_string[:ind] 137 cid.gid1 = cid_string[ind + len('GID1='):] 138 else: 139 cid.mcc_mnc = cid_string 140 return cid 141 142 143def get_input(path): 144 """Read input MultiCarrierSettings textpb file. 145 146 Args: 147 path: the path to input MultiCarrierSettings textpb file 148 149 Returns: 150 A MultiCarrierSettings. None when failed. 151 """ 152 mcs = None 153 with open(path, 'r', encoding='utf-8') as f: 154 mcs = carrier_settings_pb2.MultiCarrierSettings() 155 text_format.Merge(f.read(), mcs) 156 157 return mcs 158 159 160def get_knowncarriers(files): 161 """Create a mapping from mccmnc and possible mvno data to canonical name. 162 163 Args: 164 files: list of paths to carrier list textpb files 165 166 Returns: 167 A dict, key is to_string(carrier_id), value is cname. 168 """ 169 ret = dict() 170 for path in files: 171 with open(path, 'r', encoding='utf-8') as f: 172 carriers = carrier_list_pb2.CarrierList() 173 text_format.Merge(f.read(), carriers) 174 for carriermap in carriers.entry: 175 for cid in carriermap.carrier_id: 176 ret[to_string(cid)] = carriermap.canonical_name 177 178 return ret 179 180 181def clear_apn_fields_in_default_value(carrier_settings): 182 183 def clean(apn): 184 if apn.HasField('bearer_bitmask') and apn.bearer_bitmask == '0': 185 apn.ClearField('bearer_bitmask') 186 return apn 187 188 for apn in carrier_settings.apns.apn: 189 clean(apn) 190 return carrier_settings 191 192 193def merge_carrier_settings(patch, carrier_file): 194 """Merge a CarrierSettings into a base CarrierSettings in textpb file. 195 196 This function merge apns only. It assumes that the patch and base have the 197 same canonical_name. 198 199 Args: 200 patch: the carrier_settings to be merged 201 carrier_file: the path to the base carrier_settings file 202 """ 203 # Load base 204 with open(carrier_file, 'r', encoding='utf-8') as f: 205 base_setting = text_format.ParseLines(f, 206 carrier_settings_pb2.CarrierSettings()) 207 208 clean_apn(patch) 209 clean_apn(base_setting) 210 211 # Merge apns 212 apns = base_setting.apns.apn[:] 213 apns = merge_apns(apns, patch.apns.apn[:]) 214 del base_setting.apns.apn[:] 215 base_setting.apns.apn.extend(apns) 216 217 # Write back 218 with open(carrier_file, 'w', encoding='utf-8') as f: 219 text_format.PrintMessage(base_setting, f, as_utf8=True) 220 221 222def merge_multi_carrier_settings(patch_list, carrier_file): 223 """Merge CarrierSettings into a base MultiCarrierSettings in textpb file. 224 225 This function merge apns only. The base may or may not contains an entry with 226 the same canonical_name as the patch. 227 228 Args: 229 patch_list: a list of CarrierSettings to be merged 230 carrier_file: the path to the base MultiCarrierSettings file 231 """ 232 # Load base 233 with open(carrier_file, 'r', encoding='utf-8') as f: 234 base_settings = text_format.ParseLines( 235 f, carrier_settings_pb2.MultiCarrierSettings()) 236 237 for patch in patch_list: 238 clean_apn(patch) 239 # find the (first and only) entry with patch.canonical_name and update it. 240 for setting in base_settings.setting: 241 if setting.canonical_name == patch.canonical_name: 242 clean_apn(setting) 243 apns = setting.apns.apn[:] 244 apns = merge_apns(apns, patch.apns.apn[:]) 245 del setting.apns.apn[:] 246 setting.apns.apn.extend(apns) 247 break 248 # Or if no match, append it to base_settings 249 else: 250 base_settings.setting.extend([patch]) 251 252 # Write back 253 with open(carrier_file, 'w', encoding='utf-8') as f: 254 text_format.PrintMessage(base_settings, f, as_utf8=True) 255 256 257def add_new_carriers(cnames, carrier_list_file): 258 """Add a new carrier into a CarrierList in textpb file. 259 260 The carrier_id of the new carrier is induced from the cname, assuming 261 that the cname is constructed by to_string. 262 263 Args: 264 cnames: a list of canonical_name of new carriers 265 carrier_list_file: the path to the CarrierList textpb file 266 267 Returns: 268 None 269 """ 270 with open(carrier_list_file, 'r', encoding='utf-8') as f: 271 carriers = text_format.ParseLines(f, carrier_list_pb2.CarrierList()) 272 273 for cname in cnames: 274 # Append the new carrier 275 new_carrier = carriers.entry.add() 276 new_carrier.canonical_name = cname 277 new_carrier.carrier_id.extend([to_carrier_id(cname)]) 278 279 tmp = sorted(carriers.entry, key=lambda c: c.canonical_name) 280 del carriers.entry[:] 281 carriers.entry.extend(tmp) 282 283 with open(carrier_list_file, 'w', encoding='utf-8') as f: 284 text_format.PrintMessage(carriers, f, as_utf8=True) 285 286 287def add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers): 288 """Add APNs for carriers in others.textpb that doesn't have APNs, by mccmnc. 289 290 If a carrier defined as mccmnc + mvno_data doesn't hava APNs, it should use 291 APNs from the carrier defined as mccmnc only. 292 293 Modifies others.textpb file in-place. 294 295 If a carriersettingstool.no_apn_for_mvno_bool is defined as true for a MVNO, 296 the APNs from the corresponding MNO(by MCC/MNC) will not be used. 297 298 Args: 299 apns: a list of CarrierSettings message with APNs only. 300 tier1_carriers: parsed tier-1 carriers list; must not contain new carriers. 301 A dict, key is to_string(carrier_id), value is cname. 302 other_carriers: parsed other carriers list; must not contain new carriers. A 303 dict, key is to_string(carrier_id), value is cname. 304 """ 305 # Convert apns from a list to a map, key being the canonical_name 306 apns_dict = { 307 carrier_settings.canonical_name: carrier_settings 308 for carrier_settings in apns 309 } 310 311 others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir 312 with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: 313 others = text_format.ParseLines(others_textpb_file, 314 carrier_settings_pb2.MultiCarrierSettings()) 315 316 for setting in others.setting: 317 if not setting.HasField('apns'): 318 carrier_id = to_carrier_id(setting.canonical_name) 319 if carrier_id.HasField('mvno_data'): 320 # in case we don't need MNO APN for this MVNO 321 skip_mno_apn = False 322 if setting.HasField('configs'): 323 for conf in setting.configs.config: 324 if conf.key == 'carriersettingstool.no_apn_for_mvno_bool': 325 skip_mno_apn = conf.bool_value 326 break 327 if skip_mno_apn: 328 continue 329 carrier_id.ClearField('mvno_data') 330 carrier_id_str_of_mccmnc = to_string(carrier_id) 331 cname_of_mccmnc = tier1_carriers.get( 332 carrier_id_str_of_mccmnc) or other_carriers.get( 333 carrier_id_str_of_mccmnc) 334 if cname_of_mccmnc: 335 apn = apns_dict.get(cname_of_mccmnc) 336 if apn: 337 setting.apns.CopyFrom(apn.apns) 338 339 sanitise_carrier_config(others.setting) 340 341 with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: 342 text_format.PrintMessage(others, others_textpb_file, as_utf8=True) 343 344def sanitise_carrier_config(setting): 345 """Remove temparary carrier config items that's only used for conversion tool""" 346 for carrier_setting in setting: 347 if carrier_setting.HasField('configs'): 348 configs = carrier_setting.configs.config[:] 349 del carrier_setting.configs.config[:] 350 for config in configs: 351 if not config.key.startswith('carriersettingstool.'): 352 carrier_setting.configs.config.append(config) 353 354def add_carrierconfig_for_new_carriers(cnames, tier1_carriers, other_carriers): 355 """Add carrier configs for new (non-tier-1) carriers. 356 357 For new carriers, ie. carriers existing in APN but not CarrierConfig: 358 - for <mccmnc>: copy carrier config of <mcc>. 359 - for <mccmnc>(GID1|SPN|IMSI)=<mvnodata>: copy carrier config of <mccmnc>, 360 or <mcc>. 361 362 Modifies others.textpb file in-place. 363 364 Args: 365 cnames: a list of canonical_name of new carriers. 366 tier1_carriers: parsed tier-1 carriers list; must not contain new carriers. 367 A dict, key is to_string(carrier_id), value is cname. 368 other_carriers: parsed other carriers list; must not contain new carriers. A 369 dict, key is to_string(carrier_id), value is cname. 370 """ 371 carrier_configs_map = {} 372 373 others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir 374 with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: 375 others = text_format.ParseLines(others_textpb_file, 376 carrier_settings_pb2.MultiCarrierSettings()) 377 for setting in others.setting: 378 if setting.canonical_name in other_carriers: 379 carrier_configs_map[setting.canonical_name] = setting.configs 380 for cid_str, cname in tier1_carriers.items(): 381 tier1_textpb = '%s/setting/%s.textpb' % (FLAGS.data_dir, cname) 382 with open(tier1_textpb, 'r', encoding='utf-8') as tier1_textpb_file: 383 tier1 = text_format.ParseLines(tier1_textpb_file, 384 carrier_settings_pb2.CarrierSettings()) 385 carrier_configs_map[cid_str] = tier1.configs 386 387 for setting in others.setting: 388 if setting.canonical_name in cnames: 389 carrier_id = to_carrier_id(setting.canonical_name) 390 mccmnc = carrier_id.mcc_mnc 391 mcc = mccmnc[:3] 392 if mccmnc in carrier_configs_map: 393 setting.configs.config.extend(carrier_configs_map[mccmnc].config[:]) 394 elif mcc in carrier_configs_map: 395 setting.configs.config.extend(carrier_configs_map[mcc].config[:]) 396 397 with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: 398 text_format.PrintMessage(others, others_textpb_file, as_utf8=True) 399 400 401def cleanup_mcc_only_carriers(): 402 """Removes mcc-only carriers from other_carriers.textpb & others.textpb. 403 404 Modifies other_carriers.textpb file & others.textpb file in-place. 405 """ 406 mcc_only_carriers = set() 407 408 with open( 409 OTHER_CARRIERS_TEXTPB, 'r', 410 encoding='utf-8') as other_carriers_textpb_file: 411 other_carriers = text_format.ParseLines(other_carriers_textpb_file, 412 carrier_list_pb2.CarrierList()) 413 414 other_carriers_entry_with_mccmnc = [] 415 for carrier in other_carriers.entry: 416 for carrier_id in carrier.carrier_id: 417 if len(carrier_id.mcc_mnc) == 3: 418 mcc_only_carriers.add(carrier.canonical_name) 419 else: 420 other_carriers_entry_with_mccmnc.append(carrier) 421 del other_carriers.entry[:] 422 other_carriers.entry.extend(other_carriers_entry_with_mccmnc) 423 424 # Finish early if no mcc_only_carriers; that means no file modification 425 # required. 426 if not mcc_only_carriers: 427 return 428 429 with open( 430 OTHER_CARRIERS_TEXTPB, 'w', 431 encoding='utf-8') as other_carriers_textpb_file: 432 text_format.PrintMessage( 433 other_carriers, other_carriers_textpb_file, as_utf8=True) 434 435 others_textpb = os.path.join(FLAGS.data_dir, 'setting', 'others.textpb') 436 with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: 437 others = text_format.ParseLines(others_textpb_file, 438 carrier_settings_pb2.MultiCarrierSettings()) 439 copy_others_setting = others.setting[:] 440 del others.setting[:] 441 others.setting.extend([ 442 setting for setting in copy_others_setting 443 if setting.canonical_name not in mcc_only_carriers 444 ]) 445 446 with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: 447 text_format.PrintMessage(others, others_textpb_file, as_utf8=True) 448 449 450def main(): 451 apns = get_input(FLAGS.in_file).setting 452 tier1_carriers = get_knowncarriers([TIER1_CARRIERS_TEXTPB]) 453 other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB]) 454 new_carriers = [] 455 456 # Step 1a: merge APNs into CarrierConfigs by canonical name. 457 # Also find out "new carriers" existing in APNs but not in CarrierConfigs. 458 other_carriers_patch = [] 459 for carrier_settings in apns: 460 carrier_settings = clear_apn_fields_in_default_value(carrier_settings) 461 462 cname = carrier_settings.canonical_name 463 if cname in tier1_carriers.values(): 464 merge_carrier_settings(carrier_settings, 465 '%s/setting/%s.textpb' % (FLAGS.data_dir, cname)) 466 else: 467 other_carriers_patch.append(carrier_settings) 468 if cname not in other_carriers.values(): 469 new_carriers.append(cname) 470 471 merge_multi_carrier_settings(other_carriers_patch, 472 '%s/setting/others.textpb' % FLAGS.data_dir) 473 474 # Step 1b: populate carrier configs for new carriers. 475 add_carrierconfig_for_new_carriers(new_carriers, tier1_carriers, 476 other_carriers) 477 478 # Step 2: merge new carriers into non-tier1 carrier list. 479 add_new_carriers(new_carriers, OTHER_CARRIERS_TEXTPB) 480 # Update other_carriers map 481 other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB]) 482 483 # Step 3: merge APNs into CarrierConfigs by mccmnc: for a carrier defined 484 # as mccmnc + gid/spn/imsi, if it doesn't have any APNs, it should use APNs 485 # from carrier defined as mccmnc only. 486 # Only handle non-tier1 carriers, as tier1 carriers are assumed to be better 487 # maintained and are already having APNs defined. 488 add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers) 489 490 # Step 4: clean up mcc-only carriers; they're used in step 3 but should not 491 # be in final carrier settings to avoid confusing CarrierSettings app. 492 cleanup_mcc_only_carriers() 493 494 495if __name__ == '__main__': 496 main() 497