• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2009 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Check the signatures of all APKs in a target_files .zip file.  With
19-c, compare the signatures of each package to the ones in a separate
20target_files (usually a previously distributed build for the same
21device) and flag any changes.
22
23Usage:  check_target_file_signatures [flags] target_files
24
25  -c  (--compare_with)  <other_target_files>
26      Look for compatibility problems between the two sets of target
27      files (eg., packages whose keys have changed).
28
29  -l  (--local_cert_dirs)  <dir,dir,...>
30      Comma-separated list of top-level directories to scan for
31      .x509.pem files.  Defaults to "vendor,build".  Where cert files
32      can be found that match APK signatures, the filename will be
33      printed as the cert name, otherwise a hash of the cert plus its
34      subject string will be printed instead.
35
36  -t  (--text)
37      Dump the certificate information for both packages in comparison
38      mode (this output is normally suppressed).
39
40"""
41
42import logging
43import os
44import re
45import subprocess
46import sys
47import zipfile
48
49import common
50
51if sys.hexversion < 0x02070000:
52  print >> sys.stderr, "Python 2.7 or newer is required."
53  sys.exit(1)
54
55
56logger = logging.getLogger(__name__)
57
58# Work around a bug in Python's zipfile module that prevents opening of zipfiles
59# if any entry has an extra field of between 1 and 3 bytes (which is common with
60# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which
61# contains the bug) with an empty version (since we don't need to decode the
62# extra field anyway).
63# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and
64# Python 3.5.0 alpha 1.
65class MyZipInfo(zipfile.ZipInfo):
66  def _decodeExtra(self):
67    pass
68zipfile.ZipInfo = MyZipInfo
69
70OPTIONS = common.OPTIONS
71
72OPTIONS.text = False
73OPTIONS.compare_with = None
74OPTIONS.local_cert_dirs = ("vendor", "build")
75
76PROBLEMS = []
77PROBLEM_PREFIX = []
78
79def AddProblem(msg):
80  PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
81def Push(msg):
82  PROBLEM_PREFIX.append(msg)
83def Pop():
84  PROBLEM_PREFIX.pop()
85
86
87def Banner(msg):
88  print "-" * 70
89  print "  ", msg
90  print "-" * 70
91
92
93def GetCertSubject(cert):
94  p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
95                 stdin=subprocess.PIPE,
96                 stdout=subprocess.PIPE)
97  out, err = p.communicate(cert)
98  if err and not err.strip():
99    return "(error reading cert subject)"
100  for line in out.split("\n"):
101    line = line.strip()
102    if line.startswith("Subject:"):
103      return line[8:].strip()
104  return "(unknown cert subject)"
105
106
107class CertDB(object):
108  def __init__(self):
109    self.certs = {}
110
111  def Add(self, cert, name=None):
112    if cert in self.certs:
113      if name:
114        self.certs[cert] = self.certs[cert] + "," + name
115    else:
116      if name is None:
117        name = "unknown cert %s (%s)" % (common.sha1(cert).hexdigest()[:12],
118                                         GetCertSubject(cert))
119      self.certs[cert] = name
120
121  def Get(self, cert):
122    """Return the name for a given cert."""
123    return self.certs.get(cert, None)
124
125  def FindLocalCerts(self):
126    to_load = []
127    for top in OPTIONS.local_cert_dirs:
128      for dirpath, _, filenames in os.walk(top):
129        certs = [os.path.join(dirpath, i)
130                 for i in filenames if i.endswith(".x509.pem")]
131        if certs:
132          to_load.extend(certs)
133
134    for i in to_load:
135      f = open(i)
136      cert = common.ParseCertificate(f.read())
137      f.close()
138      name, _ = os.path.splitext(i)
139      name, _ = os.path.splitext(name)
140      self.Add(cert, name)
141
142ALL_CERTS = CertDB()
143
144
145def CertFromPKCS7(data, filename):
146  """Read the cert out of a PKCS#7-format file (which is what is
147  stored in a signed .apk)."""
148  Push(filename + ":")
149  try:
150    p = common.Run(["openssl", "pkcs7",
151                    "-inform", "DER",
152                    "-outform", "PEM",
153                    "-print_certs"],
154                   stdin=subprocess.PIPE,
155                   stdout=subprocess.PIPE)
156    out, err = p.communicate(data)
157    if err and not err.strip():
158      AddProblem("error reading cert:\n" + err)
159      return None
160
161    cert = common.ParseCertificate(out)
162    if not cert:
163      AddProblem("error parsing cert output")
164      return None
165    return cert
166  finally:
167    Pop()
168
169
170class APK(object):
171
172  def __init__(self, full_filename, filename):
173    self.filename = filename
174    self.certs = None
175    self.shared_uid = None
176    self.package = None
177
178    Push(filename+":")
179    try:
180      self.RecordCerts(full_filename)
181      self.ReadManifest(full_filename)
182    finally:
183      Pop()
184
185  def RecordCerts(self, full_filename):
186    out = set()
187    try:
188      f = open(full_filename)
189      apk = zipfile.ZipFile(f, "r")
190      pkcs7 = None
191      for info in apk.infolist():
192        if info.filename.startswith("META-INF/") and \
193           (info.filename.endswith(".DSA") or info.filename.endswith(".RSA")):
194          pkcs7 = apk.read(info.filename)
195          cert = CertFromPKCS7(pkcs7, info.filename)
196          out.add(cert)
197          ALL_CERTS.Add(cert)
198      if not pkcs7:
199        AddProblem("no signature")
200    finally:
201      f.close()
202      self.certs = frozenset(out)
203
204  def ReadManifest(self, full_filename):
205    p = common.Run(["aapt", "dump", "xmltree", full_filename,
206                    "AndroidManifest.xml"],
207                   stdout=subprocess.PIPE)
208    manifest, err = p.communicate()
209    if err:
210      AddProblem("failed to read manifest")
211      return
212
213    self.shared_uid = None
214    self.package = None
215
216    for line in manifest.split("\n"):
217      line = line.strip()
218      m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
219      if m:
220        name = m.group(1)
221        if name == "android:sharedUserId":
222          if self.shared_uid is not None:
223            AddProblem("multiple sharedUserId declarations")
224          self.shared_uid = m.group(2)
225        elif name == "package":
226          if self.package is not None:
227            AddProblem("multiple package declarations")
228          self.package = m.group(2)
229
230    if self.package is None:
231      AddProblem("no package declaration")
232
233
234class TargetFiles(object):
235  def __init__(self):
236    self.max_pkg_len = 30
237    self.max_fn_len = 20
238    self.apks = None
239    self.apks_by_basename = None
240    self.certmap = None
241
242  def LoadZipFile(self, filename):
243    # First read the APK certs file to figure out whether there are compressed
244    # APKs in the archive. If we do have compressed APKs in the archive, then we
245    # must decompress them individually before we perform any analysis.
246
247    # This is the list of wildcards of files we extract from |filename|.
248    apk_extensions = ['*.apk', '*.apex']
249
250    self.certmap, compressed_extension = common.ReadApkCerts(
251        zipfile.ZipFile(filename))
252    if compressed_extension:
253      apk_extensions.append('*.apk' + compressed_extension)
254
255    d = common.UnzipTemp(filename, apk_extensions)
256    self.apks = {}
257    self.apks_by_basename = {}
258    for dirpath, _, filenames in os.walk(d):
259      for fn in filenames:
260        # Decompress compressed APKs before we begin processing them.
261        if compressed_extension and fn.endswith(compressed_extension):
262          # First strip the compressed extension from the file.
263          uncompressed_fn = fn[:-len(compressed_extension)]
264
265          # Decompress the compressed file to the output file.
266          common.Gunzip(os.path.join(dirpath, fn),
267                        os.path.join(dirpath, uncompressed_fn))
268
269          # Finally, delete the compressed file and use the uncompressed file
270          # for further processing. Note that the deletion is not strictly
271          # required, but is done here to ensure that we're not using too much
272          # space in the temporary directory.
273          os.remove(os.path.join(dirpath, fn))
274          fn = uncompressed_fn
275
276        if fn.endswith(('.apk', '.apex')):
277          fullname = os.path.join(dirpath, fn)
278          displayname = fullname[len(d)+1:]
279          apk = APK(fullname, displayname)
280          self.apks[apk.filename] = apk
281          self.apks_by_basename[os.path.basename(apk.filename)] = apk
282
283          self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
284          self.max_fn_len = max(self.max_fn_len, len(apk.filename))
285
286  def CheckSharedUids(self):
287    """Look for any instances where packages signed with different
288    certs request the same sharedUserId."""
289    apks_by_uid = {}
290    for apk in self.apks.itervalues():
291      if apk.shared_uid:
292        apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
293
294    for uid in sorted(apks_by_uid):
295      apks = apks_by_uid[uid]
296      for apk in apks[1:]:
297        if apk.certs != apks[0].certs:
298          break
299      else:
300        # all packages have the same set of certs; this uid is fine.
301        continue
302
303      AddProblem("different cert sets for packages with uid %s" % (uid,))
304
305      print "uid %s is shared by packages with different cert sets:" % (uid,)
306      for apk in apks:
307        print "%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename)
308        for cert in apk.certs:
309          print "   ", ALL_CERTS.Get(cert)
310      print
311
312  def CheckExternalSignatures(self):
313    for apk_filename, certname in self.certmap.iteritems():
314      if certname == "EXTERNAL":
315        # Apps marked EXTERNAL should be signed with the test key
316        # during development, then manually re-signed after
317        # predexopting.  Consider it an error if this app is now
318        # signed with any key that is present in our tree.
319        apk = self.apks_by_basename[apk_filename]
320        name = ALL_CERTS.Get(apk.cert)
321        if not name.startswith("unknown "):
322          Push(apk.filename)
323          AddProblem("hasn't been signed with EXTERNAL cert")
324          Pop()
325
326  def PrintCerts(self):
327    """Display a table of packages grouped by cert."""
328    by_cert = {}
329    for apk in self.apks.itervalues():
330      for cert in apk.certs:
331        by_cert.setdefault(cert, []).append((apk.package, apk))
332
333    order = [(-len(v), k) for (k, v) in by_cert.iteritems()]
334    order.sort()
335
336    for _, cert in order:
337      print "%s:" % (ALL_CERTS.Get(cert),)
338      apks = by_cert[cert]
339      apks.sort()
340      for _, apk in apks:
341        if apk.shared_uid:
342          print "  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
343                                        self.max_pkg_len, apk.package,
344                                        apk.shared_uid)
345        else:
346          print "  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package)
347      print
348
349  def CompareWith(self, other):
350    """Look for instances where a given package that exists in both
351    self and other have different certs."""
352
353    all_apks = set(self.apks.keys())
354    all_apks.update(other.apks.keys())
355
356    max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
357
358    by_certpair = {}
359
360    for i in all_apks:
361      if i in self.apks:
362        if i in other.apks:
363          # in both; should have same set of certs
364          if self.apks[i].certs != other.apks[i].certs:
365            by_certpair.setdefault((other.apks[i].certs,
366                                    self.apks[i].certs), []).append(i)
367        else:
368          print "%s [%s]: new APK (not in comparison target_files)" % (
369              i, self.apks[i].filename)
370      else:
371        if i in other.apks:
372          print "%s [%s]: removed APK (only in comparison target_files)" % (
373              i, other.apks[i].filename)
374
375    if by_certpair:
376      AddProblem("some APKs changed certs")
377      Banner("APK signing differences")
378      for (old, new), packages in sorted(by_certpair.items()):
379        for i, o in enumerate(old):
380          if i == 0:
381            print "was", ALL_CERTS.Get(o)
382          else:
383            print "   ", ALL_CERTS.Get(o)
384        for i, n in enumerate(new):
385          if i == 0:
386            print "now", ALL_CERTS.Get(n)
387          else:
388            print "   ", ALL_CERTS.Get(n)
389        for i in sorted(packages):
390          old_fn = other.apks[i].filename
391          new_fn = self.apks[i].filename
392          if old_fn == new_fn:
393            print "  %-*s  [%s]" % (max_pkg_len, i, old_fn)
394          else:
395            print "  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
396                                                  old_fn, new_fn)
397        print
398
399
400def main(argv):
401  def option_handler(o, a):
402    if o in ("-c", "--compare_with"):
403      OPTIONS.compare_with = a
404    elif o in ("-l", "--local_cert_dirs"):
405      OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
406    elif o in ("-t", "--text"):
407      OPTIONS.text = True
408    else:
409      return False
410    return True
411
412  args = common.ParseOptions(argv, __doc__,
413                             extra_opts="c:l:t",
414                             extra_long_opts=["compare_with=",
415                                              "local_cert_dirs="],
416                             extra_option_handler=option_handler)
417
418  if len(args) != 1:
419    common.Usage(__doc__)
420    sys.exit(1)
421
422  common.InitLogging()
423
424  ALL_CERTS.FindLocalCerts()
425
426  Push("input target_files:")
427  try:
428    target_files = TargetFiles()
429    target_files.LoadZipFile(args[0])
430  finally:
431    Pop()
432
433  compare_files = None
434  if OPTIONS.compare_with:
435    Push("comparison target_files:")
436    try:
437      compare_files = TargetFiles()
438      compare_files.LoadZipFile(OPTIONS.compare_with)
439    finally:
440      Pop()
441
442  if OPTIONS.text or not compare_files:
443    Banner("target files")
444    target_files.PrintCerts()
445  target_files.CheckSharedUids()
446  target_files.CheckExternalSignatures()
447  if compare_files:
448    if OPTIONS.text:
449      Banner("comparison files")
450      compare_files.PrintCerts()
451    target_files.CompareWith(compare_files)
452
453  if PROBLEMS:
454    print "%d problem(s) found:\n" % (len(PROBLEMS),)
455    for p in PROBLEMS:
456      print p
457    return 1
458
459  return 0
460
461
462if __name__ == '__main__':
463  try:
464    r = main(sys.argv[1:])
465    sys.exit(r)
466  except common.ExternalError as e:
467    print
468    print "   ERROR: %s" % (e,)
469    print
470    sys.exit(1)
471  finally:
472    common.Cleanup()
473