• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2009 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Check the signatures of all APKs in a target_files .zip file.  With
19-c, compare the signatures of each package to the ones in a separate
20target_files (usually a previously distributed build for the same
21device) and flag any changes.
22
23Usage:  check_target_file_signatures [flags] target_files
24
25  -c  (--compare_with)  <other_target_files>
26      Look for compatibility problems between the two sets of target
27      files (eg., packages whose keys have changed).
28
29  -l  (--local_cert_dirs)  <dir,dir,...>
30      Comma-separated list of top-level directories to scan for
31      .x509.pem files.  Defaults to "vendor,build".  Where cert files
32      can be found that match APK signatures, the filename will be
33      printed as the cert name, otherwise a hash of the cert plus its
34      subject string will be printed instead.
35
36  -t  (--text)
37      Dump the certificate information for both packages in comparison
38      mode (this output is normally suppressed).
39
40"""
41
42from __future__ import print_function
43
44import logging
45import os
46import os.path
47import re
48import subprocess
49import sys
50import zipfile
51
52import common
53
54if sys.hexversion < 0x02070000:
55  print("Python 2.7 or newer is required.", file=sys.stderr)
56  sys.exit(1)
57
58
59logger = logging.getLogger(__name__)
60
61# Work around a bug in Python's zipfile module that prevents opening of zipfiles
62# if any entry has an extra field of between 1 and 3 bytes (which is common with
63# zipaligned APKs). This overrides the ZipInfo._decodeExtra() method (which
64# contains the bug) with an empty version (since we don't need to decode the
65# extra field anyway).
66# Issue #14315: https://bugs.python.org/issue14315, fixed in Python 2.7.8 and
67# Python 3.5.0 alpha 1.
68
69
70class MyZipInfo(zipfile.ZipInfo):
71  def _decodeExtra(self):
72    pass
73
74
75zipfile.ZipInfo = MyZipInfo
76
77
78OPTIONS = common.OPTIONS
79
80OPTIONS.text = False
81OPTIONS.compare_with = None
82OPTIONS.local_cert_dirs = ("vendor", "build")
83
84PROBLEMS = []
85PROBLEM_PREFIX = []
86
87
88def AddProblem(msg):
89  logger.error(msg)
90  PROBLEMS.append(" ".join(PROBLEM_PREFIX) + " " + msg)
91
92
93def Push(msg):
94  PROBLEM_PREFIX.append(msg)
95
96
97def Pop():
98  PROBLEM_PREFIX.pop()
99
100
101def Banner(msg):
102  print("-" * 70)
103  print("  ", msg)
104  print("-" * 70)
105
106
107def GetCertSubject(cert):
108  p = common.Run(["openssl", "x509", "-inform", "DER", "-text"],
109                 stdin=subprocess.PIPE,
110                 stdout=subprocess.PIPE,
111                 universal_newlines=False)
112  out, err = p.communicate(cert)
113  if err and not err.strip():
114    return "(error reading cert subject)"
115  for line in out.decode().split("\n"):
116    line = line.strip()
117    if line.startswith("Subject:"):
118      return line[8:].strip()
119  return "(unknown cert subject)"
120
121
122class CertDB(object):
123
124  def __init__(self):
125    self.certs = {}
126
127  def Add(self, cert_digest, subject, name=None):
128    if cert_digest in self.certs:
129      if name:
130        self.certs[cert_digest] = self.certs[cert_digest] + "," + name
131    else:
132      if name is None:
133        name = "unknown cert %s (%s)" % (cert_digest[:12], subject)
134      self.certs[cert_digest] = name
135
136  def Get(self, cert_digest):
137    """Return the name for a given cert digest."""
138    return self.certs.get(cert_digest, None)
139
140  def FindLocalCerts(self):
141    to_load = []
142    for top in OPTIONS.local_cert_dirs:
143      for dirpath, _, filenames in os.walk(top):
144        certs = [os.path.join(dirpath, i)
145                 for i in filenames if i.endswith(".x509.pem")]
146        if certs:
147          to_load.extend(certs)
148
149    for i in to_load:
150      with open(i) as f:
151        cert = common.ParseCertificate(f.read())
152      name, _ = os.path.splitext(i)
153      name, _ = os.path.splitext(name)
154
155      cert_sha1 = common.sha1(cert).hexdigest()
156      cert_subject = GetCertSubject(cert)
157      self.Add(cert_sha1, cert_subject, name)
158
159
160ALL_CERTS = CertDB()
161
162
163def CertFromPKCS7(data, filename):
164  """Read the cert out of a PKCS#7-format file (which is what is
165  stored in a signed .apk)."""
166  Push(filename + ":")
167  try:
168    p = common.Run(["openssl", "pkcs7",
169                    "-inform", "DER",
170                    "-outform", "PEM",
171                    "-print_certs"],
172                   stdin=subprocess.PIPE,
173                   stdout=subprocess.PIPE,
174                   universal_newlines=False)
175    out, err = p.communicate(data)
176    if err and not err.strip():
177      AddProblem("error reading cert:\n" + err.decode())
178      return None
179
180    cert = common.ParseCertificate(out.decode())
181    if not cert:
182      AddProblem("error parsing cert output")
183      return None
184    return cert
185  finally:
186    Pop()
187
188
189class APK(object):
190
191  def __init__(self, full_filename, filename):
192    self.filename = filename
193    self.cert_digests = frozenset()
194    self.shared_uid = None
195    self.package = None
196
197    Push(filename+":")
198    try:
199      self.RecordCerts(full_filename)
200      self.ReadManifest(full_filename)
201    finally:
202      Pop()
203
204  def ReadCertsDeprecated(self, full_filename):
205    print("reading certs in deprecated way for {}".format(full_filename))
206    cert_digests = set()
207    with zipfile.ZipFile(full_filename) as apk:
208      for info in apk.infolist():
209        filename = info.filename
210        if (filename.startswith("META-INF/") and
211                info.filename.endswith((".DSA", ".RSA"))):
212          pkcs7 = apk.read(filename)
213          cert = CertFromPKCS7(pkcs7, filename)
214          if not cert:
215            continue
216          cert_sha1 = common.sha1(cert).hexdigest()
217          cert_subject = GetCertSubject(cert)
218          ALL_CERTS.Add(cert_sha1, cert_subject)
219          cert_digests.add(cert_sha1)
220    if not cert_digests:
221      AddProblem("No signature found")
222      return
223    self.cert_digests = frozenset(cert_digests)
224
225  def RecordCerts(self, full_filename):
226    """Parse and save the signature of an apk file."""
227
228    # Dump the cert info with apksigner
229    cmd = ["apksigner", "verify", "--print-certs", full_filename]
230    p = common.Run(cmd, stdout=subprocess.PIPE)
231    output, _ = p.communicate()
232    if p.returncode != 0:
233      self.ReadCertsDeprecated(full_filename)
234      return
235
236    # Sample output:
237    # Signer #1 certificate DN: ...
238    # Signer #1 certificate SHA-256 digest: ...
239    # Signer #1 certificate SHA-1 digest: ...
240    # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-256 digest: 56be132b780656fe2444cd34326eb5d7aac91d2096abf0fe673a99270622ec87
241    # Signer (minSdkVersion=24, maxSdkVersion=32) certificate SHA-1 digest: 19da94896ce4078c38ca695701f1dec741ec6d67
242    # ...
243    certs_info = {}
244    certificate_regex = re.compile(r"(Signer (?:#[0-9]+|\(.*\))) (certificate .*):(.*)")
245    for line in output.splitlines():
246      m = certificate_regex.match(line)
247      if not m:
248        continue
249      signer, key, val = m.group(1), m.group(2), m.group(3)
250      if certs_info.get(signer):
251        certs_info[signer].update({key.strip(): val.strip()})
252      else:
253        certs_info.update({signer: {key.strip(): val.strip()}})
254    if not certs_info:
255      AddProblem("Failed to parse cert info")
256      return
257
258    cert_digests = set()
259    for signer, props in certs_info.items():
260      subject = props.get("certificate DN")
261      digest = props.get("certificate SHA-1 digest")
262      if not subject or not digest:
263        AddProblem("Failed to parse cert subject or digest")
264        return
265      ALL_CERTS.Add(digest, subject)
266      cert_digests.add(digest)
267    self.cert_digests = frozenset(cert_digests)
268
269  def ReadManifest(self, full_filename):
270    p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file",
271                    "AndroidManifest.xml"],
272                   stdout=subprocess.PIPE)
273    manifest, err = p.communicate()
274    if err:
275      AddProblem("failed to read manifest " + full_filename)
276      return
277
278    self.shared_uid = None
279    self.package = None
280
281    for line in manifest.split("\n"):
282      line = line.strip()
283      m = re.search(r'A: (\S*?)(?:\(0x[0-9a-f]+\))?="(.*?)" \(Raw', line)
284      if m:
285        name = m.group(1)
286        if name == "android:sharedUserId":
287          if self.shared_uid is not None:
288            AddProblem("multiple sharedUserId declarations " + full_filename)
289          self.shared_uid = m.group(2)
290        elif name == "package":
291          if self.package is not None:
292            AddProblem("multiple package declarations " + full_filename)
293          self.package = m.group(2)
294
295    if self.package is None:
296      AddProblem("no package declaration " + full_filename)
297
298
299class TargetFiles(object):
300  def __init__(self):
301    self.max_pkg_len = 30
302    self.max_fn_len = 20
303    self.apks = None
304    self.apks_by_basename = None
305    self.certmap = None
306
307  def LoadZipFile(self, filename):
308    # First read the APK certs file to figure out whether there are compressed
309    # APKs in the archive. If we do have compressed APKs in the archive, then we
310    # must decompress them individually before we perform any analysis.
311
312    # This is the list of wildcards of files we extract from |filename|.
313    apk_extensions = ['*.apk', '*.apex']
314
315    with zipfile.ZipFile(filename) as input_zip:
316      self.certmap, compressed_extension = common.ReadApkCerts(input_zip)
317    if compressed_extension:
318      apk_extensions.append('*.apk' + compressed_extension)
319
320    d = common.UnzipTemp(filename, apk_extensions)
321    self.apks = {}
322    self.apks_by_basename = {}
323    for dirpath, _, filenames in os.walk(d):
324      for fn in filenames:
325        # Decompress compressed APKs before we begin processing them.
326        if compressed_extension and fn.endswith(compressed_extension):
327          # First strip the compressed extension from the file.
328          uncompressed_fn = fn[:-len(compressed_extension)]
329
330          # Decompress the compressed file to the output file.
331          common.Gunzip(os.path.join(dirpath, fn),
332                        os.path.join(dirpath, uncompressed_fn))
333
334          # Finally, delete the compressed file and use the uncompressed file
335          # for further processing. Note that the deletion is not strictly
336          # required, but is done here to ensure that we're not using too much
337          # space in the temporary directory.
338          os.remove(os.path.join(dirpath, fn))
339          fn = uncompressed_fn
340
341        if fn.endswith(('.apk', '.apex')):
342          fullname = os.path.join(dirpath, fn)
343          displayname = fullname[len(d)+1:]
344          apk = APK(fullname, displayname)
345          self.apks[apk.filename] = apk
346          self.apks_by_basename[os.path.basename(apk.filename)] = apk
347          if apk.package:
348            self.max_pkg_len = max(self.max_pkg_len, len(apk.package))
349          self.max_fn_len = max(self.max_fn_len, len(apk.filename))
350
351  def CheckSharedUids(self):
352    """Look for any instances where packages signed with different
353    certs request the same sharedUserId."""
354    apks_by_uid = {}
355    for apk in self.apks.values():
356      if apk.shared_uid:
357        apks_by_uid.setdefault(apk.shared_uid, []).append(apk)
358
359    for uid in sorted(apks_by_uid):
360      apks = apks_by_uid[uid]
361      for apk in apks[1:]:
362        if apk.certs != apks[0].certs:
363          break
364      else:
365        # all packages have the same set of certs; this uid is fine.
366        continue
367
368      AddProblem("different cert sets for packages with uid %s" % (uid,))
369
370      print("uid %s is shared by packages with different cert sets:" % (uid,))
371      for apk in apks:
372        print("%-*s  [%s]" % (self.max_pkg_len, apk.package, apk.filename))
373        for digest in apk.cert_digests:
374          print("   ", ALL_CERTS.Get(digest))
375      print()
376
377  def CheckExternalSignatures(self):
378    for apk_filename, certname in self.certmap.items():
379      if certname == "EXTERNAL":
380        # Apps marked EXTERNAL should be signed with the test key
381        # during development, then manually re-signed after
382        # predexopting.  Consider it an error if this app is now
383        # signed with any key that is present in our tree.
384        apk = self.apks_by_basename[apk_filename]
385        signed_with_external = False
386        for digest in apk.cert_digests:
387          name = ALL_CERTS.Get(digest)
388          if name and name.startswith("unknown "):
389            signed_with_external = True
390
391        if not signed_with_external:
392          Push(apk.filename)
393          AddProblem("hasn't been signed with EXTERNAL cert")
394          Pop()
395
396  def PrintCerts(self):
397    """Display a table of packages grouped by cert."""
398    by_digest = {}
399    for apk in self.apks.values():
400      for digest in apk.cert_digests:
401        if apk.package:
402          by_digest.setdefault(digest, []).append((apk.package, apk))
403
404    order = [(-len(v), k) for (k, v) in by_digest.items()]
405    order.sort()
406
407    for _, digest in order:
408      print("%s:" % (ALL_CERTS.Get(digest),))
409      apks = by_digest[digest]
410      apks.sort(key=lambda x: x[0])
411      for i in range(1, len(apks)):
412        pkgname, apk = apks[i]
413        if pkgname == apks[i-1][0]:
414          print("Both {} and {} have same package name {}".format(
415              apk.filename, apks[i-1][1].filename, pkgname))
416      for _, apk in apks:
417        if apk.shared_uid:
418          print("  %-*s  %-*s  [%s]" % (self.max_fn_len, apk.filename,
419                                        self.max_pkg_len, apk.package,
420                                        apk.shared_uid))
421        else:
422          print("  %-*s  %s" % (self.max_fn_len, apk.filename, apk.package))
423      print()
424
425  def CompareWith(self, other):
426    """Look for instances where a given package that exists in both
427    self and other have different certs."""
428
429    all_apks = set(self.apks.keys())
430    all_apks.update(other.apks.keys())
431
432    max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
433
434    by_digestpair = {}
435
436    for i in all_apks:
437      if i in self.apks:
438        if i in other.apks:
439          # in both; should have same set of certs
440          if self.apks[i].cert_digests != other.apks[i].cert_digests:
441            by_digestpair.setdefault((other.apks[i].cert_digests,
442                                      self.apks[i].cert_digests), []).append(i)
443        else:
444          print("%s [%s]: new APK (not in comparison target_files)" % (
445              i, self.apks[i].filename))
446      else:
447        if i in other.apks:
448          print("%s [%s]: removed APK (only in comparison target_files)" % (
449              i, other.apks[i].filename))
450
451    if by_digestpair:
452      AddProblem("some APKs changed certs")
453      Banner("APK signing differences")
454      for (old, new), packages in sorted(by_digestpair.items()):
455        for i, o in enumerate(old):
456          if i == 0:
457            print("was", ALL_CERTS.Get(o))
458          else:
459            print("   ", ALL_CERTS.Get(o))
460        for i, n in enumerate(new):
461          if i == 0:
462            print("now", ALL_CERTS.Get(n))
463          else:
464            print("   ", ALL_CERTS.Get(n))
465        for i in sorted(packages):
466          old_fn = other.apks[i].filename
467          new_fn = self.apks[i].filename
468          if old_fn == new_fn:
469            print("  %-*s  [%s]" % (max_pkg_len, i, old_fn))
470          else:
471            print("  %-*s  [was: %s; now: %s]" % (max_pkg_len, i,
472                                                  old_fn, new_fn))
473        print()
474
475
476def main(argv):
477  def option_handler(o, a):
478    if o in ("-c", "--compare_with"):
479      OPTIONS.compare_with = a
480    elif o in ("-l", "--local_cert_dirs"):
481      OPTIONS.local_cert_dirs = [i.strip() for i in a.split(",")]
482    elif o in ("-t", "--text"):
483      OPTIONS.text = True
484    else:
485      return False
486    return True
487
488  args = common.ParseOptions(argv, __doc__,
489                             extra_opts="c:l:t",
490                             extra_long_opts=["compare_with=",
491                                              "local_cert_dirs="],
492                             extra_option_handler=option_handler)
493
494  if len(args) != 1:
495    common.Usage(__doc__)
496    sys.exit(1)
497
498  common.InitLogging()
499
500  ALL_CERTS.FindLocalCerts()
501
502  Push("input target_files:")
503  try:
504    target_files = TargetFiles()
505    target_files.LoadZipFile(args[0])
506  finally:
507    Pop()
508
509  compare_files = None
510  if OPTIONS.compare_with:
511    Push("comparison target_files:")
512    try:
513      compare_files = TargetFiles()
514      compare_files.LoadZipFile(OPTIONS.compare_with)
515    finally:
516      Pop()
517
518  if OPTIONS.text or not compare_files:
519    Banner("target files")
520    target_files.PrintCerts()
521  target_files.CheckSharedUids()
522  target_files.CheckExternalSignatures()
523  if compare_files:
524    if OPTIONS.text:
525      Banner("comparison files")
526      compare_files.PrintCerts()
527    target_files.CompareWith(compare_files)
528
529  if PROBLEMS:
530    print("%d problem(s) found:\n" % (len(PROBLEMS),))
531    for p in PROBLEMS:
532      print(p)
533    return 1
534
535  return 0
536
537
538if __name__ == '__main__':
539  try:
540    r = main(sys.argv[1:])
541    sys.exit(r)
542  finally:
543    common.Cleanup()
544