• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2014 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Installs certificate on phone with KitKat."""
16
17import argparse
18import logging
19import os
20import subprocess
21import sys
22
23KEYCODE_ENTER = '66'
24KEYCODE_TAB = '61'
25
26
27class CertInstallError(Exception):
28  pass
29
30
31class CertRemovalError(Exception):
32  pass
33
34
35class AdbShellError(subprocess.CalledProcessError):
36  pass
37
38
39_ANDROID_M_BUILD_VERSION = 23
40
41
42class AndroidCertInstaller(object):
43  """Certificate installer for phones with KitKat."""
44
45  def __init__(self, device_id, cert_name, cert_path, adb_path=None):
46    if not os.path.exists(cert_path):
47      raise ValueError('Not a valid certificate path')
48    self.adb_path = adb_path or 'adb'
49    self.android_cacerts_path = None
50    self.cert_name = cert_name
51    self.cert_path = cert_path
52    self.device_id = device_id
53    self.file_name = os.path.basename(self.cert_path)
54    self.reformatted_cert_fname = None
55    self.reformatted_cert_path = None
56
57  @staticmethod
58  def _run_cmd(cmd, dirname=None):
59    return subprocess.check_output(cmd, cwd=dirname)
60
61  def _get_adb_cmd(self, *args):
62    cmd = [self.adb_path]
63    if self.device_id:
64      cmd.extend(['-s', self.device_id])
65    cmd.extend(args)
66    return cmd
67
68  def _adb(self, *args):
69    """Runs the adb command."""
70    return self._run_cmd(self._get_adb_cmd(*args))
71
72  def _adb_shell(self, *args):
73    """Runs the adb shell command."""
74    # We are not using self._adb() because adb shell return 0 even if the
75    # command has failed. This method is taking care of checking the actual
76    # return code of the command line ran on the device.
77    RETURN_CODE_PREFIX = '%%%s%% ' % __file__
78    adb_cmd = self._get_adb_cmd('shell', '(%s); echo %s$?' % (
79        subprocess.list2cmdline(args), RETURN_CODE_PREFIX))
80    process = subprocess.Popen(adb_cmd, stdout=subprocess.PIPE)
81    adb_stdout, _ = process.communicate()
82    if process.returncode != 0:
83      raise subprocess.CalledProcessError(
84          cmd=adb_cmd, returncode=process.returncode, output=adb_stdout)
85    assert adb_stdout[-1] == '\n'
86    prefix_pos = adb_stdout.rfind(RETURN_CODE_PREFIX)
87    assert prefix_pos != -1, \
88        'Couldn\'t find "%s" at the end of the output of %s' % (
89            RETURN_CODE_PREFIX, subprocess.list2cmdline(adb_cmd))
90    returncode = int(adb_stdout[prefix_pos + len(RETURN_CODE_PREFIX):])
91    stdout = adb_stdout[:prefix_pos]
92    if returncode != 0:
93      raise AdbShellError(cmd=args, returncode=returncode, output=stdout)
94    return stdout
95
96  def _adb_su_shell(self, *args):
97    """Runs command as root."""
98    build_version_sdk = int(self._get_property('ro.build.version.sdk'))
99    if build_version_sdk >= _ANDROID_M_BUILD_VERSION:
100      cmd = ['su', '0']
101    else:
102      cmd = ['su', '-c']
103    cmd.extend(args)
104    return self._adb_shell(*cmd)
105
106  def _get_property(self, prop):
107    return self._adb_shell('getprop', prop).strip()
108
109  def check_device(self):
110    install_warning = False
111    if self._get_property('ro.product.device') != 'hammerhead':
112      logging.warning('Device is not hammerhead')
113      install_warning = True
114    if self._get_property('ro.build.version.release') != '4.4.2':
115      logging.warning('Version is not 4.4.2')
116      install_warning = True
117    if install_warning:
118      logging.warning('Certificate may not install properly')
119
120  def _input_key(self, key):
121    """Inputs a keyevent."""
122    self._adb_shell('input', 'keyevent', key)
123
124  def _input_text(self, text):
125    """Inputs text."""
126    self._adb_shell('input', 'text', text)
127
128  @staticmethod
129  def _remove(file_name):
130    """Deletes file."""
131    if os.path.exists(file_name):
132      os.remove(file_name)
133
134  def _format_hashed_cert(self):
135    """Makes a certificate file that follows the format of files in cacerts."""
136    self._remove(self.reformatted_cert_path)
137    contents = self._run_cmd(['openssl', 'x509', '-inform', 'PEM', '-text',
138                              '-in', self.cert_path])
139    description, begin_cert, cert_body = contents.rpartition('-----BEGIN '
140                                                             'CERTIFICATE')
141    contents = ''.join([begin_cert, cert_body, description])
142    with open(self.reformatted_cert_path, 'w') as cert_file:
143      cert_file.write(contents)
144
145  def _remove_cert_from_cacerts(self):
146    self._adb_su_shell('mount', '-o', 'remount,rw', '/system')
147    self._adb_su_shell('rm', '-f', self.android_cacerts_path)
148
149  def _is_cert_installed(self):
150    try:
151      return (self._adb_su_shell('ls', self.android_cacerts_path).strip() ==
152              self.android_cacerts_path)
153    except AdbShellError:
154      return False
155
156  def _generate_reformatted_cert_path(self):
157    # Determine OpenSSL version, string is of the form
158    # 'OpenSSL 0.9.8za 5 Jun 2014' .
159    openssl_version = self._run_cmd(['openssl', 'version']).split()
160
161    if len(openssl_version) < 2:
162      raise ValueError('Unexpected OpenSSL version string: ', openssl_version)
163
164    # subject_hash flag name changed as of OpenSSL version 1.0.0 .
165    is_old_openssl_version = openssl_version[1].startswith('0')
166    subject_hash_flag = (
167        '-subject_hash' if is_old_openssl_version else '-subject_hash_old')
168
169    output = self._run_cmd(['openssl', 'x509', '-inform', 'PEM',
170                            subject_hash_flag, '-in', self.cert_path],
171                           os.path.dirname(self.cert_path))
172    self.reformatted_cert_fname = output.partition('\n')[0].strip() + '.0'
173    self.reformatted_cert_path = os.path.join(os.path.dirname(self.cert_path),
174                                              self.reformatted_cert_fname)
175    self.android_cacerts_path = ('/system/etc/security/cacerts/%s' %
176                                 self.reformatted_cert_fname)
177
178  def remove_cert(self):
179    self._generate_reformatted_cert_path()
180
181    if self._is_cert_installed():
182      self._remove_cert_from_cacerts()
183
184    if self._is_cert_installed():
185      raise CertRemovalError('Cert Removal Failed')
186
187  def install_cert(self, overwrite_cert=False):
188    """Installs a certificate putting it in /system/etc/security/cacerts."""
189    self._generate_reformatted_cert_path()
190
191    if self._is_cert_installed():
192      if overwrite_cert:
193        self._remove_cert_from_cacerts()
194      else:
195        logging.info('cert is already installed')
196        return
197
198    self._format_hashed_cert()
199    self._adb('push', self.reformatted_cert_path, '/sdcard/')
200    self._remove(self.reformatted_cert_path)
201    self._adb_su_shell('mount', '-o', 'remount,rw', '/system')
202    self._adb_su_shell(
203        'cp', '/sdcard/%s' % self.reformatted_cert_fname,
204        '/system/etc/security/cacerts/%s' % self.reformatted_cert_fname)
205    self._adb_su_shell('chmod', '644', self.android_cacerts_path)
206    if not self._is_cert_installed():
207      raise CertInstallError('Cert Install Failed')
208
209  def install_cert_using_gui(self):
210    """Installs certificate on the device using adb commands."""
211    self.check_device()
212    # TODO(mruthven): Add a check to see if the certificate is already installed
213    # Install the certificate.
214    logging.info('Installing %s on %s', self.cert_path, self.device_id)
215    self._adb('push', self.cert_path, '/sdcard/')
216
217    # Start credential install intent.
218    self._adb_shell('am', 'start', '-W', '-a', 'android.credentials.INSTALL')
219
220    # Move to and click search button.
221    self._input_key(KEYCODE_TAB)
222    self._input_key(KEYCODE_TAB)
223    self._input_key(KEYCODE_ENTER)
224
225    # Search for certificate and click it.
226    # Search only works with lower case letters
227    self._input_text(self.file_name.lower())
228    self._input_key(KEYCODE_ENTER)
229
230    # These coordinates work for hammerhead devices.
231    self._adb_shell('input', 'tap', '300', '300')
232
233    # Name the certificate and click enter.
234    self._input_text(self.cert_name)
235    self._input_key(KEYCODE_TAB)
236    self._input_key(KEYCODE_TAB)
237    self._input_key(KEYCODE_TAB)
238    self._input_key(KEYCODE_ENTER)
239
240    # Remove the file.
241    self._adb_shell('rm', '/sdcard/' + self.file_name)
242
243
244def parse_args():
245  """Parses command line arguments."""
246  parser = argparse.ArgumentParser(description='Install cert on device.')
247  parser.add_argument(
248      '-n', '--cert-name', default='dummycert', help='certificate name')
249  parser.add_argument(
250      '--overwrite', default=False, action='store_true',
251      help='Overwrite certificate file if it is already installed')
252  parser.add_argument(
253      '--remove', default=False, action='store_true',
254      help='Remove certificate file if it is installed')
255  parser.add_argument(
256      '--device-id', help='device serial number')
257  parser.add_argument(
258      '--adb-path', help='adb binary path')
259  parser.add_argument(
260      'cert_path', help='Certificate file path')
261  return parser.parse_args()
262
263
264def main():
265  args = parse_args()
266  cert_installer = AndroidCertInstaller(args.device_id, args.cert_name,
267                                        args.cert_path, adb_path=args.adb_path)
268  if args.remove:
269    cert_installer.remove_cert()
270  else:
271    cert_installer.install_cert(args.overwrite)
272
273
274if __name__ == '__main__':
275  sys.exit(main())
276