• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""deapexer is a tool that prints out content of an APEX.
17
18To print content of an APEX to stdout:
19  deapexer list foo.apex
20
21To extract content of an APEX to the given directory:
22  deapexer extract foo.apex dest
23"""
24from __future__ import print_function
25
26import argparse
27import apex_manifest
28import enum
29import os
30import shutil
31import sys
32import subprocess
33import tempfile
34import zipfile
35
36BLOCK_SIZE = 4096
37
38class ApexImageEntry(object):
39
40  def __init__(self, name, base_dir, permissions, size, ino, extents,
41               is_directory, is_symlink, security_context):
42    self._name = name
43    self._base_dir = base_dir
44    self._permissions = permissions
45    self._size = size
46    self._is_directory = is_directory
47    self._is_symlink = is_symlink
48    self._ino = ino
49    self._extents = extents
50    self._security_context = security_context
51
52  @property
53  def name(self):
54    return self._name
55
56  @property
57  def root(self):
58    return self._base_dir == './' and self._name == '.'
59
60  @property
61  def full_path(self):
62    if self.root:
63      return self._base_dir  # './'
64    path = os.path.join(self._base_dir, self._name)
65    if self.is_directory:
66      path += '/'
67    return path
68
69  @property
70  def is_directory(self):
71    return self._is_directory
72
73  @property
74  def is_symlink(self):
75    return self._is_symlink
76
77  @property
78  def is_regular_file(self):
79    return not self.is_directory and not self.is_symlink
80
81  @property
82  def permissions(self):
83    return self._permissions
84
85  @property
86  def size(self):
87    return self._size
88
89  @property
90  def ino(self):
91    return self._ino
92
93  @property
94  def extents(self):
95    return self._extents
96
97  @property
98  def security_context(self):
99    return self._security_context
100
101  def __str__(self):
102    ret = ''
103    if self._is_directory:
104      ret += 'd'
105    elif self._is_symlink:
106      ret += 'l'
107    else:
108      ret += '-'
109
110    def mask_as_string(m):
111      ret = 'r' if m & 4 == 4 else '-'
112      ret += 'w' if m & 2 == 2 else '-'
113      ret += 'x' if m & 1 == 1 else '-'
114      return ret
115
116    ret += mask_as_string(self._permissions >> 6)
117    ret += mask_as_string((self._permissions >> 3) & 7)
118    ret += mask_as_string(self._permissions & 7)
119
120    return ret + ' ' + self._size + ' ' + self._name
121
122
123class ApexImageDirectory(object):
124
125  def __init__(self, path, entries, apex):
126    self._path = path
127    self._entries = sorted(entries, key=lambda e: e.name)
128    self._apex = apex
129
130  def list(self, is_recursive=False):
131    for e in self._entries:
132      yield e
133      if e.is_directory and e.name != '.' and e.name != '..':
134        for ce in self.enter_subdir(e).list(is_recursive):
135          yield ce
136
137  def enter_subdir(self, entry):
138    return self._apex._list(self._path + entry.name + '/')
139
140  def extract(self, dest):
141    path = self._path
142    self._apex._extract(self._path, dest)
143
144
145class Apex(object):
146
147  def __init__(self, args):
148    self._debugfs = args.debugfs_path
149    self._fsckerofs = args.fsckerofs_path
150    self._blkid = args.blkid_path
151    self._apex = args.apex
152    self._tempdir = tempfile.mkdtemp()
153    # TODO(b/139125405): support flattened APEXes.
154    with zipfile.ZipFile(self._apex, 'r') as zip_ref:
155      self._payload = zip_ref.extract('apex_payload.img', path=self._tempdir)
156    self._cache = {}
157
158  def __del__(self):
159    shutil.rmtree(self._tempdir)
160
161  def __enter__(self):
162    return self._list('./')
163
164  def __exit__(self, type, value, traceback):
165    pass
166
167  def _list(self, path):
168    if path in self._cache:
169      return self._cache[path]
170    process = subprocess.Popen([self._debugfs, '-R', 'ls -l -p %s' % path, self._payload],
171                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
172                               universal_newlines=True)
173    stdout, _ = process.communicate()
174    res = str(stdout)
175    entries = []
176    for line in res.split('\n'):
177      if not line:
178        continue
179      parts = line.split('/')
180      if len(parts) != 8:
181        continue
182      name = parts[5]
183      if not name:
184        continue
185      ino = parts[1]
186      bits = parts[2]
187      size = parts[6]
188      extents = []
189      is_symlink = bits[1]=='2'
190      is_directory=bits[1]=='4'
191
192      if not is_symlink and not is_directory:
193        process = subprocess.Popen([self._debugfs, '-R', 'dump_extents <%s>' % ino,
194                                    self._payload], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
195                                    universal_newlines=True)
196        stdout, _ = process.communicate()
197        # Output of dump_extents for an inode fragmented in 3 blocks (length and addresses represent
198        # block-sized sections):
199        # Level Entries       Logical      Physical Length Flags
200        # 0/ 0   1/  3     0 -     0    18 -    18      1
201        # 0/ 0   2/  3     1 -    15    20 -    34     15
202        # 0/ 0   3/  3    16 -  1863    37 -  1884   1848
203        res = str(stdout).splitlines()
204        res.pop(0) # the first line contains only columns names
205        left_length = int(size)
206        try: # dump_extents sometimes has an unexpected output
207          for line in res:
208            tokens = line.split()
209            offset = int(tokens[7]) * BLOCK_SIZE
210            length = min(int(tokens[-1]) * BLOCK_SIZE, left_length)
211            left_length -= length
212            extents.append((offset, length))
213          if (left_length != 0): # dump_extents sometimes fails to display "hole" blocks
214            raise ValueError
215        except:
216          extents = [] # [] means that we failed to retrieve the file location successfully
217
218      # get 'security.selinux' attribute
219      entry_path = os.path.join(path, name)
220      stdout = subprocess.check_output([
221        self._debugfs,
222        '-R',
223        f'ea_get -V {entry_path} security.selinux',
224        self._payload
225      ], text=True, stderr=subprocess.DEVNULL)
226      security_context = stdout.rstrip('\n\x00')
227
228      entries.append(ApexImageEntry(name,
229                                    base_dir=path,
230                                    permissions=int(bits[3:], 8),
231                                    size=size,
232                                    is_directory=is_directory,
233                                    is_symlink=is_symlink,
234                                    ino=ino,
235                                    extents=extents,
236                                    security_context=security_context))
237
238    return ApexImageDirectory(path, entries, self)
239
240  def _extract(self, path, dest):
241    # get filesystem type
242    process = subprocess.Popen([self._blkid, '-o', 'value', '-s', 'TYPE', self._payload],
243                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
244                               universal_newlines=True)
245    output, stderr = process.communicate()
246    if process.returncode != 0:
247      print(stderr, file=sys.stderr)
248
249    if output.rstrip() == 'erofs':
250      process = subprocess.Popen([self._fsckerofs, '--extract=%s' % (dest), '--overwrite', self._payload],
251                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
252                                 universal_newlines=True)
253    else:
254      process = subprocess.Popen([self._debugfs, '-R', 'rdump %s %s' % (path, dest), self._payload],
255                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
256                                 universal_newlines=True)
257
258    _, stderr = process.communicate()
259    if process.returncode != 0:
260      print(stderr, file=sys.stderr)
261
262
263def RunList(args):
264  if GetType(args.apex) == ApexType.COMPRESSED:
265    with tempfile.TemporaryDirectory() as temp:
266      decompressed_apex = os.path.join(temp, 'temp.apex')
267      decompress(args.apex, decompressed_apex)
268      args.apex = decompressed_apex
269
270      RunList(args)
271      return
272
273  with Apex(args) as apex:
274    for e in apex.list(is_recursive=True):
275      # dot(., ..) directories
276      if not e.root and e.name in ('.', '..'):
277          continue
278      res = ''
279      if args.size:
280        res += e.size + ' '
281      res += e.full_path
282      if args.extents:
283        res += ' [' + '-'.join(str(x) for x in e.extents) + ']'
284      if args.contexts:
285        res += ' ' + e.security_context
286      print(res)
287
288
289def RunExtract(args):
290  if GetType(args.apex) == ApexType.COMPRESSED:
291    with tempfile.TemporaryDirectory() as temp:
292      decompressed_apex = os.path.join(temp, "temp.apex")
293      decompress(args.apex, decompressed_apex)
294      args.apex = decompressed_apex
295
296      RunExtract(args)
297      return
298
299  with Apex(args) as apex:
300    if not os.path.exists(args.dest):
301      os.makedirs(args.dest, mode=0o755)
302    apex.extract(args.dest)
303    if os.path.isdir(os.path.join(args.dest, "lost+found")):
304      shutil.rmtree(os.path.join(args.dest, "lost+found"))
305
306class ApexType(enum.Enum):
307  INVALID = 0
308  UNCOMPRESSED = 1
309  COMPRESSED = 2
310
311
312def GetType(apex_path):
313  with zipfile.ZipFile(apex_path, 'r') as zip_file:
314    names = zip_file.namelist()
315    has_payload = 'apex_payload.img' in names
316    has_original_apex = 'original_apex' in names
317    if has_payload and has_original_apex:
318      return ApexType.INVALID
319    if has_payload:
320      return ApexType.UNCOMPRESSED
321    if has_original_apex:
322      return ApexType.COMPRESSED
323    return ApexType.INVALID
324
325
326def RunInfo(args):
327  if args.print_type:
328    res = GetType(args.apex)
329    if res == ApexType.INVALID:
330      print(args.apex + ' is not a valid apex')
331      sys.exit(1)
332    print(res.name)
333  else:
334    manifest = apex_manifest.fromApex(args.apex)
335    print(apex_manifest.toJsonString(manifest))
336
337
338def RunDecompress(args):
339  """RunDecompress takes path to compressed APEX and decompresses it to
340  produce the original uncompressed APEX at give output path
341
342  See apex_compression_tool.py#RunCompress for details on compressed APEX
343  structure.
344
345  Args:
346      args.input: file path to compressed APEX
347      args.output: file path to where decompressed APEX will be placed
348  """
349  compressed_apex_fp = args.input
350  decompressed_apex_fp = args.output
351  return decompress(compressed_apex_fp, decompressed_apex_fp)
352
353def decompress(compressed_apex_fp, decompressed_apex_fp):
354  if os.path.exists(decompressed_apex_fp):
355    print("Output path '" + decompressed_apex_fp + "' already exists")
356    sys.exit(1)
357
358  with zipfile.ZipFile(compressed_apex_fp, 'r') as zip_obj:
359    if 'original_apex' not in zip_obj.namelist():
360      print(compressed_apex_fp + ' is not a compressed APEX. Missing '
361                                 "'original_apex' file inside it.")
362      sys.exit(1)
363    # Rename original_apex file to what user provided as output filename
364    original_apex_info = zip_obj.getinfo('original_apex')
365    original_apex_info.filename = os.path.basename(decompressed_apex_fp)
366    # Extract the original_apex as desired name
367    zip_obj.extract(original_apex_info,
368                    path=os.path.dirname(decompressed_apex_fp))
369
370
371def main(argv):
372  parser = argparse.ArgumentParser()
373
374  debugfs_default = None
375  fsckerofs_default = None
376  blkid_default = None
377  if 'ANDROID_HOST_OUT' in os.environ:
378    debugfs_default = '%s/bin/debugfs_static' % os.environ['ANDROID_HOST_OUT']
379    fsckerofs_default = '%s/bin/fsck.erofs' % os.environ['ANDROID_HOST_OUT']
380    blkid_default = '%s/bin/blkid_static' % os.environ['ANDROID_HOST_OUT']
381  parser.add_argument('--debugfs_path', help='The path to debugfs binary', default=debugfs_default)
382  parser.add_argument('--fsckerofs_path', help='The path to fsck.erofs binary', default=fsckerofs_default)
383  parser.add_argument('--blkid_path', help='The path to blkid binary', default=blkid_default)
384
385  subparsers = parser.add_subparsers(required=True, dest='cmd')
386
387  parser_list = subparsers.add_parser('list', help='prints content of an APEX to stdout')
388  parser_list.add_argument('apex', type=str, help='APEX file')
389  parser_list.add_argument('--size', help='also show the size of the files', action="store_true")
390  parser_list.add_argument('--extents', help='also show the location of the files', action="store_true")
391  parser_list.add_argument('-Z', '--contexts',
392                           help='also show the security context of the files',
393                           action='store_true')
394  parser_list.set_defaults(func=RunList)
395
396  parser_extract = subparsers.add_parser('extract', help='extracts content of an APEX to the given '
397                                                         'directory')
398  parser_extract.add_argument('apex', type=str, help='APEX file')
399  parser_extract.add_argument('dest', type=str, help='Directory to extract content of APEX to')
400  parser_extract.set_defaults(func=RunExtract)
401
402  parser_info = subparsers.add_parser('info', help='prints APEX manifest')
403  parser_info.add_argument('apex', type=str, help='APEX file')
404  parser_info.add_argument('--print-type',
405                           help='Prints type of the apex (COMPRESSED or UNCOMPRESSED)',
406                           action='store_true')
407  parser_info.set_defaults(func=RunInfo)
408
409  # Handle sub-command "decompress"
410  parser_decompress = subparsers.add_parser('decompress',
411                                            help='decompresses a compressed '
412                                                 'APEX')
413  parser_decompress.add_argument('--input', type=str, required=True,
414                                 help='path to compressed APEX file that '
415                                      'will be decompressed')
416  parser_decompress.add_argument('--output', type=str, required=True,
417                                 help='output directory path where '
418                                      'decompressed APEX will be extracted')
419  parser_decompress.set_defaults(func=RunDecompress)
420
421  args = parser.parse_args(argv)
422
423  debugfs_required_for_cmd = ['list', 'extract']
424  if args.cmd in debugfs_required_for_cmd and not args.debugfs_path:
425    print('ANDROID_HOST_OUT environment variable is not defined, --debugfs_path must be set',
426          file=sys.stderr)
427    sys.exit(1)
428
429  if args.cmd == 'extract':
430    if not args.blkid_path:
431      print('ANDROID_HOST_OUT environment variable is not defined, --blkid_path must be set',
432            file=sys.stderr)
433      sys.exit(1)
434
435    if not os.path.isfile(args.blkid_path):
436      print(f'Cannot find blkid specified at {args.blkid_path}',
437            file=sys.stderr)
438      sys.exit(1)
439
440    if not args.fsckerofs_path:
441      print('ANDROID_HOST_OUT environment variable is not defined, --fsckerofs_path must be set',
442            file=sys.stderr)
443      sys.exit(1)
444
445    if not os.path.isfile(args.fsckerofs_path):
446      print(f'Cannot find fsck.erofs specified at {args.fsckerofs_path}',
447            file=sys.stderr)
448      sys.exit(1)
449
450  args.func(args)
451
452
453if __name__ == '__main__':
454  main(sys.argv[1:])
455