1# Copyright 2014 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Installs certificate on phone with KitKat.""" 16 17import argparse 18import logging 19import os 20import subprocess 21import sys 22 23KEYCODE_ENTER = '66' 24KEYCODE_TAB = '61' 25 26 27class CertInstallError(Exception): 28 pass 29 30 31class CertRemovalError(Exception): 32 pass 33 34 35class AdbShellError(subprocess.CalledProcessError): 36 pass 37 38 39_ANDROID_M_BUILD_VERSION = 23 40 41 42class AndroidCertInstaller(object): 43 """Certificate installer for phones with KitKat.""" 44 45 def __init__(self, device_id, cert_name, cert_path, adb_path=None): 46 if not os.path.exists(cert_path): 47 raise ValueError('Not a valid certificate path') 48 self.adb_path = adb_path or 'adb' 49 self.android_cacerts_path = None 50 self.cert_name = cert_name 51 self.cert_path = cert_path 52 self.device_id = device_id 53 self.file_name = os.path.basename(self.cert_path) 54 self.reformatted_cert_fname = None 55 self.reformatted_cert_path = None 56 57 @staticmethod 58 def _run_cmd(cmd, dirname=None): 59 return subprocess.check_output(cmd, cwd=dirname) 60 61 def _get_adb_cmd(self, *args): 62 cmd = [self.adb_path] 63 if self.device_id: 64 cmd.extend(['-s', self.device_id]) 65 cmd.extend(args) 66 return cmd 67 68 def _adb(self, *args): 69 """Runs the adb command.""" 70 return self._run_cmd(self._get_adb_cmd(*args)) 71 72 def _adb_shell(self, *args): 73 """Runs the adb shell command.""" 74 # We are not using self._adb() because adb shell return 0 even if the 75 # command has failed. This method is taking care of checking the actual 76 # return code of the command line ran on the device. 77 RETURN_CODE_PREFIX = '%%%s%% ' % __file__ 78 adb_cmd = self._get_adb_cmd('shell', '(%s); echo %s$?' % ( 79 subprocess.list2cmdline(args), RETURN_CODE_PREFIX)) 80 process = subprocess.Popen(adb_cmd, stdout=subprocess.PIPE) 81 adb_stdout, _ = process.communicate() 82 if process.returncode != 0: 83 raise subprocess.CalledProcessError( 84 cmd=adb_cmd, returncode=process.returncode, output=adb_stdout) 85 assert adb_stdout[-1] == '\n' 86 prefix_pos = adb_stdout.rfind(RETURN_CODE_PREFIX) 87 assert prefix_pos != -1, \ 88 'Couldn\'t find "%s" at the end of the output of %s' % ( 89 RETURN_CODE_PREFIX, subprocess.list2cmdline(adb_cmd)) 90 returncode = int(adb_stdout[prefix_pos + len(RETURN_CODE_PREFIX):]) 91 stdout = adb_stdout[:prefix_pos] 92 if returncode != 0: 93 raise AdbShellError(cmd=args, returncode=returncode, output=stdout) 94 return stdout 95 96 def _adb_su_shell(self, *args): 97 """Runs command as root.""" 98 build_version_sdk = int(self._get_property('ro.build.version.sdk')) 99 if build_version_sdk >= _ANDROID_M_BUILD_VERSION: 100 cmd = ['su', '0'] 101 else: 102 cmd = ['su', '-c'] 103 cmd.extend(args) 104 return self._adb_shell(*cmd) 105 106 def _get_property(self, prop): 107 return self._adb_shell('getprop', prop).strip() 108 109 def check_device(self): 110 install_warning = False 111 if self._get_property('ro.product.device') != 'hammerhead': 112 logging.warning('Device is not hammerhead') 113 install_warning = True 114 if self._get_property('ro.build.version.release') != '4.4.2': 115 logging.warning('Version is not 4.4.2') 116 install_warning = True 117 if install_warning: 118 logging.warning('Certificate may not install properly') 119 120 def _input_key(self, key): 121 """Inputs a keyevent.""" 122 self._adb_shell('input', 'keyevent', key) 123 124 def _input_text(self, text): 125 """Inputs text.""" 126 self._adb_shell('input', 'text', text) 127 128 @staticmethod 129 def _remove(file_name): 130 """Deletes file.""" 131 if os.path.exists(file_name): 132 os.remove(file_name) 133 134 def _format_hashed_cert(self): 135 """Makes a certificate file that follows the format of files in cacerts.""" 136 self._remove(self.reformatted_cert_path) 137 contents = self._run_cmd(['openssl', 'x509', '-inform', 'PEM', '-text', 138 '-in', self.cert_path]) 139 description, begin_cert, cert_body = contents.rpartition('-----BEGIN ' 140 'CERTIFICATE') 141 contents = ''.join([begin_cert, cert_body, description]) 142 with open(self.reformatted_cert_path, 'w') as cert_file: 143 cert_file.write(contents) 144 145 def _remove_cert_from_cacerts(self): 146 self._adb_su_shell('mount', '-o', 'remount,rw', '/system') 147 self._adb_su_shell('rm', '-f', self.android_cacerts_path) 148 149 def _is_cert_installed(self): 150 try: 151 return (self._adb_su_shell('ls', self.android_cacerts_path).strip() == 152 self.android_cacerts_path) 153 except AdbShellError: 154 return False 155 156 def _generate_reformatted_cert_path(self): 157 # Determine OpenSSL version, string is of the form 158 # 'OpenSSL 0.9.8za 5 Jun 2014' . 159 openssl_version = self._run_cmd(['openssl', 'version']).split() 160 161 if len(openssl_version) < 2: 162 raise ValueError('Unexpected OpenSSL version string: ', openssl_version) 163 164 # subject_hash flag name changed as of OpenSSL version 1.0.0 . 165 is_old_openssl_version = openssl_version[1].startswith('0') 166 subject_hash_flag = ( 167 '-subject_hash' if is_old_openssl_version else '-subject_hash_old') 168 169 output = self._run_cmd(['openssl', 'x509', '-inform', 'PEM', 170 subject_hash_flag, '-in', self.cert_path], 171 os.path.dirname(self.cert_path)) 172 self.reformatted_cert_fname = output.partition('\n')[0].strip() + '.0' 173 self.reformatted_cert_path = os.path.join(os.path.dirname(self.cert_path), 174 self.reformatted_cert_fname) 175 self.android_cacerts_path = ('/system/etc/security/cacerts/%s' % 176 self.reformatted_cert_fname) 177 178 def remove_cert(self): 179 self._generate_reformatted_cert_path() 180 181 if self._is_cert_installed(): 182 self._remove_cert_from_cacerts() 183 184 if self._is_cert_installed(): 185 raise CertRemovalError('Cert Removal Failed') 186 187 def install_cert(self, overwrite_cert=False): 188 """Installs a certificate putting it in /system/etc/security/cacerts.""" 189 self._generate_reformatted_cert_path() 190 191 if self._is_cert_installed(): 192 if overwrite_cert: 193 self._remove_cert_from_cacerts() 194 else: 195 logging.info('cert is already installed') 196 return 197 198 self._format_hashed_cert() 199 self._adb('push', self.reformatted_cert_path, '/sdcard/') 200 self._remove(self.reformatted_cert_path) 201 self._adb_su_shell('mount', '-o', 'remount,rw', '/system') 202 self._adb_su_shell( 203 'cp', '/sdcard/%s' % self.reformatted_cert_fname, 204 '/system/etc/security/cacerts/%s' % self.reformatted_cert_fname) 205 self._adb_su_shell('chmod', '644', self.android_cacerts_path) 206 if not self._is_cert_installed(): 207 raise CertInstallError('Cert Install Failed') 208 209 def install_cert_using_gui(self): 210 """Installs certificate on the device using adb commands.""" 211 self.check_device() 212 # TODO(mruthven): Add a check to see if the certificate is already installed 213 # Install the certificate. 214 logging.info('Installing %s on %s', self.cert_path, self.device_id) 215 self._adb('push', self.cert_path, '/sdcard/') 216 217 # Start credential install intent. 218 self._adb_shell('am', 'start', '-W', '-a', 'android.credentials.INSTALL') 219 220 # Move to and click search button. 221 self._input_key(KEYCODE_TAB) 222 self._input_key(KEYCODE_TAB) 223 self._input_key(KEYCODE_ENTER) 224 225 # Search for certificate and click it. 226 # Search only works with lower case letters 227 self._input_text(self.file_name.lower()) 228 self._input_key(KEYCODE_ENTER) 229 230 # These coordinates work for hammerhead devices. 231 self._adb_shell('input', 'tap', '300', '300') 232 233 # Name the certificate and click enter. 234 self._input_text(self.cert_name) 235 self._input_key(KEYCODE_TAB) 236 self._input_key(KEYCODE_TAB) 237 self._input_key(KEYCODE_TAB) 238 self._input_key(KEYCODE_ENTER) 239 240 # Remove the file. 241 self._adb_shell('rm', '/sdcard/' + self.file_name) 242 243 244def parse_args(): 245 """Parses command line arguments.""" 246 parser = argparse.ArgumentParser(description='Install cert on device.') 247 parser.add_argument( 248 '-n', '--cert-name', default='dummycert', help='certificate name') 249 parser.add_argument( 250 '--overwrite', default=False, action='store_true', 251 help='Overwrite certificate file if it is already installed') 252 parser.add_argument( 253 '--remove', default=False, action='store_true', 254 help='Remove certificate file if it is installed') 255 parser.add_argument( 256 '--device-id', help='device serial number') 257 parser.add_argument( 258 '--adb-path', help='adb binary path') 259 parser.add_argument( 260 'cert_path', help='Certificate file path') 261 return parser.parse_args() 262 263 264def main(): 265 args = parse_args() 266 cert_installer = AndroidCertInstaller(args.device_id, args.cert_name, 267 args.cert_path, adb_path=args.adb_path) 268 if args.remove: 269 cert_installer.remove_cert() 270 else: 271 cert_installer.install_cert(args.overwrite) 272 273 274if __name__ == '__main__': 275 sys.exit(main()) 276