1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""Compare_contacts accepts 2 vcf files, extracts full name, email, and 17telephone numbers from each and reports how many unique cards it finds across 18the two files. 19""" 20 21from mmap import ACCESS_READ 22from mmap import mmap 23import logging 24import re 25import random 26import string 27import time 28from acts.utils import exe_cmd 29import queue 30 31# CallLog types 32INCOMMING_CALL_TYPE = "1" 33OUTGOING_CALL_TYPE = "2" 34MISSED_CALL_TYPE = "3" 35 36# Callback strings. 37CONTACTS_CHANGED_CALLBACK = "ContactsChanged" 38CALL_LOG_CHANGED = "CallLogChanged" 39CONTACTS_ERASED_CALLBACK = "ContactsErased" 40 41# URI for contacts database on Nexus. 42CONTACTS_URI = "content://com.android.contacts/data/phones" 43 44# Path for temporary file storage on device. 45STORAGE_PATH = "/storage/emulated/0/Download/" 46 47PBAP_SYNC_TIME = 30 48 49log = logging 50 51 52def parse_contacts(file_name): 53 """Read vcf file and generate a list of contacts. 54 55 Contacts full name, prefered email, and all phone numbers are extracted. 56 """ 57 58 vcard_regex = re.compile(b"^BEGIN:VCARD((\n*?.*?)*?)END:VCARD", 59 re.MULTILINE) 60 fullname_regex = re.compile(b"^FN:(.*)", re.MULTILINE) 61 email_regex = re.compile(b"^EMAIL;PREF:(.*)", re.MULTILINE) 62 tel_regex = re.compile(b"^TEL;(.*):(.*)", re.MULTILINE) 63 64 with open(file_name, "r") as contacts_file: 65 contacts = [] 66 contacts_map = mmap( 67 contacts_file.fileno(), length=0, access=ACCESS_READ) 68 new_contact = None 69 70 # Find all VCARDs in the input file, then extract the first full name, 71 # first email address, and all phone numbers from it. If there is at 72 # least a full name add it to the contact list. 73 for current_vcard in vcard_regex.findall(contacts_map): 74 new_contact = VCard() 75 76 fullname = fullname_regex.search(current_vcard[0]) 77 if fullname is not None: 78 new_contact.name = fullname.group(1) 79 80 email = email_regex.search(current_vcard[0]) 81 if email is not None: 82 new_contact.email = email.group(1) 83 84 for phone_number in tel_regex.findall(current_vcard[0]): 85 new_contact.add_phone_number( 86 PhoneNumber(phone_number[0], phone_number[1])) 87 88 contacts.append(new_contact) 89 90 return contacts 91 92 93def phone_number_count(destination_path, file_name): 94 """Counts number of phone numbers in a VCF. 95 """ 96 tel_regex = re.compile(b"^TEL;(.*):(.*)", re.MULTILINE) 97 with open("{}{}".format(destination_path, file_name), 98 "r") as contacts_file: 99 contacts_map = mmap( 100 contacts_file.fileno(), length=0, access=ACCESS_READ) 101 numbers = tel_regex.findall(contacts_map) 102 return len(numbers) 103 104 105def count_contacts_with_differences(destination_path, 106 pce_contacts_vcf_file_name, 107 pse_contacts_vcf_file_name): 108 """Compare two contact files and report the number of differences. 109 110 Difference count is returned, and the differences are logged, this is order 111 independent. 112 """ 113 114 pce_contacts = parse_contacts("{}{}".format(destination_path, 115 pce_contacts_vcf_file_name)) 116 pse_contacts = parse_contacts("{}{}".format(destination_path, 117 pse_contacts_vcf_file_name)) 118 119 differences = set(pce_contacts).symmetric_difference(set(pse_contacts)) 120 if not differences: 121 log.info("All {} contacts in the phonebooks match".format( 122 str(len(pce_contacts)))) 123 else: 124 log.info("{} contacts match, but ".format( 125 str(len(set(pce_contacts).intersection(set(pse_contacts)))))) 126 log.info("the following {} entries don't match:".format( 127 str(len(differences)))) 128 for current_vcard in differences: 129 log.info(current_vcard) 130 return len(differences) 131 132 133class PhoneNumber(object): 134 """Simple class for maintaining a phone number entry and type with only the 135 digits. 136 """ 137 138 def __init__(self, phone_type, phone_number): 139 self.phone_type = phone_type 140 # remove non digits from phone_number 141 self.phone_number = re.sub(r"\D", "", str(phone_number)) 142 143 def __eq__(self, other): 144 return (self.phone_type == other.phone_type and 145 self.phone_number == other.phone_number) 146 147 def __hash__(self): 148 return hash(self.phone_type) ^ hash(self.phone_number) 149 150 151class VCard(object): 152 """Contains name, email, and phone numbers. 153 """ 154 155 def __init__(self): 156 self.name = None 157 self.first_name = None 158 self.last_name = None 159 self.email = None 160 self.phone_numbers = [] 161 self.photo = None 162 163 def __lt__(self, other): 164 return self.name < other.name 165 166 def __hash__(self): 167 result = hash(self.name) ^ hash(self.email) ^ hash(self.photo == None) 168 for number in self.phone_numbers: 169 result ^= hash(number) 170 return result 171 172 def __eq__(self, other): 173 return hash(self) == hash(other) 174 175 def __ne__(self, other): 176 return not self.__eq__(other) 177 178 def __str__(self): 179 vcard_strings = ["BEGIN:VCARD\n", "VERSION:2.1\n"] 180 181 if self.first_name or self.last_name: 182 vcard_strings.append("N:{};{};;;\nFN:{} {}\n".format( 183 self.last_name, self.first_name, self.first_name, 184 self.last_name)) 185 elif self.name: 186 vcard_strings.append("FN:{}\n".format(self.name)) 187 188 if self.phone_numbers: 189 for phone in self.phone_numbers: 190 vcard_strings.append("TEL;{}:{}\n".format( 191 str(phone.phone_type), phone.phone_number)) 192 193 if self.email: 194 vcard_strings.append("EMAIL;PREF:{}\n".format(self.email)) 195 196 vcard_strings.append("END:VCARD\n") 197 return "".join(vcard_strings) 198 199 def add_phone_number(self, phone_number): 200 if phone_number not in self.phone_numbers: 201 self.phone_numbers.append(phone_number) 202 203 204def generate_random_phone_number(): 205 """Generate a random phone number/type 206 """ 207 return PhoneNumber("CELL", 208 "+{0:010d}".format(random.randint(0, 9999999999))) 209 210 211def generate_random_string(length=8, 212 charset="{}{}{}".format(string.digits, 213 string.ascii_letters, 214 string.punctuation)): 215 """Generate a random string of specified length from the characterset 216 """ 217 # Remove ; since that would make 2 words. 218 charset = charset.replace(";", "") 219 name = [] 220 for i in range(length): 221 name.append(random.choice(charset)) 222 return "".join(name) 223 224 225def generate_contact_list(destination_path, 226 file_name, 227 contact_count, 228 phone_number_count=1): 229 """Generate a simple VCF file for count contacts with basic content. 230 231 An example with count = 1 and local_number = 2] 232 233 BEGIN:VCARD 234 VERSION:2.1 235 N:Person;1;;; 236 FN:1 Person 237 TEL;CELL:+1-555-555-1234 238 TEL;CELL:+1-555-555-4321 239 EMAIL;PREF:person1@gmail.com 240 END:VCARD 241 """ 242 vcards = [] 243 for i in range(contact_count): 244 current_contact = VCard() 245 current_contact.first_name = generate_random_string( 246 random.randint(1, 19)) 247 current_contact.last_name = generate_random_string( 248 random.randint(1, 19)) 249 current_contact.email = "{}{}@{}.{}".format( 250 current_contact.last_name, current_contact.first_name, 251 generate_random_string(random.randint(1, 19)), 252 generate_random_string(random.randint(1, 4))) 253 for number in range(phone_number_count): 254 current_contact.add_phone_number(generate_random_phone_number()) 255 vcards.append(current_contact) 256 create_new_contacts_vcf_from_vcards(destination_path, file_name, vcards) 257 258 259def create_new_contacts_vcf_from_vcards(destination_path, vcf_file_name, 260 vcards): 261 """Create a new file with filename 262 """ 263 contact_file = open("{}{}".format(destination_path, vcf_file_name), "w+") 264 for card in vcards: 265 contact_file.write(str(card)) 266 contact_file.close() 267 268 269def get_contact_count(device): 270 """Returns the number of name:phone number pairs. 271 """ 272 contact_list = device.droid.contactsQueryContent( 273 CONTACTS_URI, ["display_name", "data1"], "", [], "display_name") 274 return len(contact_list) 275 276 277def import_device_contacts_from_vcf(device, destination_path, vcf_file, timeout=10): 278 """Uploads and import vcf file to device. 279 """ 280 number_count = phone_number_count(destination_path, vcf_file) 281 device.log.info("Trying to add {} phone numbers.".format(number_count)) 282 local_phonebook_path = "{}{}".format(destination_path, vcf_file) 283 phone_phonebook_path = "{}{}".format(STORAGE_PATH, vcf_file) 284 device.adb.push("{} {}".format(local_phonebook_path, phone_phonebook_path)) 285 device.droid.importVcf("file://{}{}".format(STORAGE_PATH, vcf_file)) 286 start_time = time.time() 287 while time.time() < start_time + timeout: 288 #TODO: use unattended way to bypass contact import module instead of keyevent 289 if "ImportVCardActivity" in device.get_my_current_focus_window(): 290 # keyevent to allow contacts import from vcf file 291 for key in ["DPAD_RIGHT", "DPAD_RIGHT", "ENTER"]: 292 device.adb.shell("input keyevent KEYCODE_{}".format(key)) 293 break 294 time.sleep(1) 295 if wait_for_phone_number_update_complete(device, number_count): 296 return number_count 297 else: 298 return 0 299 300 301def export_device_contacts_to_vcf(device, destination_path, vcf_file): 302 """Export and download vcf file from device. 303 """ 304 path_on_phone = "{}{}".format(STORAGE_PATH, vcf_file) 305 device.droid.exportVcf("{}".format(path_on_phone)) 306 # Download and then remove file from device 307 device.adb.pull("{} {}".format(path_on_phone, destination_path)) 308 return True 309 310 311def delete_vcf_files(device): 312 """Deletes all files with .vcf extension 313 """ 314 files = device.adb.shell("ls {}".format(STORAGE_PATH)) 315 for file_name in files.split(): 316 if ".vcf" in file_name: 317 device.adb.shell("rm -f {}{}".format(STORAGE_PATH, file_name)) 318 319 320def erase_contacts(device): 321 """Erase all contacts out of devices contact database. 322 """ 323 device.log.info("Erasing contacts.") 324 if get_contact_count(device) > 0: 325 device.droid.contactsEraseAll() 326 try: 327 device.ed.pop_event(CONTACTS_ERASED_CALLBACK, PBAP_SYNC_TIME) 328 except queue.Empty: 329 log.error("Phone book not empty.") 330 return False 331 return True 332 333 334def wait_for_phone_number_update_complete(device, expected_count): 335 """Check phone_number count on device and wait for updates until it has the 336 expected number of phone numbers in its contact database. 337 """ 338 update_completed = True 339 try: 340 while (expected_count != get_contact_count(device) and 341 device.ed.pop_event(CONTACTS_CHANGED_CALLBACK, PBAP_SYNC_TIME)): 342 pass 343 except queue.Empty: 344 log.error("Contacts failed to update.") 345 update_completed = False 346 device.log.info("Found {} out of the expected {} contacts.".format( 347 get_contact_count(device), expected_count)) 348 return update_completed 349 350 351def wait_for_call_log_update_complete(device, expected_count): 352 """Check call log count on device and wait for updates until it has the 353 expected number of calls in its call log database. 354 """ 355 update_completed = True 356 try: 357 while (expected_count != device.droid.callLogGetCount() and 358 device.ed.pop_event(CALL_LOG_CHANGED, PBAP_SYNC_TIME)): 359 pass 360 except queue.Empty: 361 log.error("Call Log failed to update.") 362 update_completed = False 363 device.log.info("Found {} out of the expected {} call logs.".format( 364 device.droid.callLogGetCount(), expected_count)) 365 return 366 367 368def add_call_log(device, call_log_type, phone_number, call_time): 369 """Add call number and time to specified log. 370 """ 371 new_call_log = {} 372 new_call_log["type"] = str(call_log_type) 373 new_call_log["number"] = phone_number 374 new_call_log["time"] = str(call_time) 375 device.droid.callLogsPut(new_call_log) 376 377 378def get_and_compare_call_logs(pse, pce, call_log_type): 379 """Gather and compare call logs from PSE and PCE for the specified type. 380 """ 381 pse_call_log = pse.droid.callLogsGet(call_log_type) 382 pce_call_log = pce.droid.callLogsGet(call_log_type) 383 return compare_call_logs(pse_call_log, pce_call_log) 384 385 386def normalize_phonenumber(phone_number): 387 """Remove all non-digits from phone_number 388 """ 389 return re.sub(r"\D", "", phone_number) 390 391 392def compare_call_logs(pse_call_log, pce_call_log): 393 """Gather and compare call logs from PSE and PCE for the specified type. 394 """ 395 call_logs_match = True 396 if len(pse_call_log) == len(pce_call_log): 397 for i in range(len(pse_call_log)): 398 # Compare the phone number 399 if normalize_phonenumber(pse_call_log[i][ 400 "number"]) != normalize_phonenumber(pce_call_log[i][ 401 "number"]): 402 log.warning("Call Log numbers differ") 403 call_logs_match = False 404 405 # Compare which log it was taken from (Incomming, Outgoing, Missed 406 if pse_call_log[i]["type"] != pce_call_log[i]["type"]: 407 log.warning("Call Log types differ") 408 call_logs_match = False 409 410 # Compare time to truncated second. 411 if int(pse_call_log[i]["date"]) // 1000 != int(pce_call_log[i][ 412 "date"]) // 1000: 413 log.warning("Call log times don't match, check timezone.") 414 call_logs_match = False 415 416 else: 417 log.warning("Call Log lengths differ {}:{}".format( 418 len(pse_call_log), len(pce_call_log))) 419 call_logs_match = False 420 421 if not call_logs_match: 422 log.info("PSE Call Log:") 423 log.info(pse_call_log) 424 log.info("PCE Call Log:") 425 log.info(pce_call_log) 426 427 return call_logs_match 428 429