• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2"""
3Script to synchronize (local>remote and viceversa) test data files from/to GCS.
4
5//test/data files are not checked in the codebase because they are large binary
6file and change frequently. Instead we check-in only xxx.sha256 files, which
7contain the SHA-256 of the actual binary file, and sync them from a GCS bucket.
8
9File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 .
10
11Usage:
12./test_data status     # Prints the status of new & modified files.
13./test_data download   # To sync remote>local (used by install-build-deps).
14./test_data upload     # To upload newly created and modified files.
15"""
16
17import argparse
18import logging
19import os
20import sys
21import hashlib
22import subprocess
23
24from multiprocessing.pool import ThreadPool
25from collections import namedtuple, defaultdict
26
27ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
28BUCKET = 'gs://perfetto/test_data'
29SUFFIX = '.sha256'
30
31FS_MATCH = 'matches'
32FS_NEW_FILE = 'needs upload'
33FS_MODIFIED = 'modified'
34FS_MISSING = 'needs download'
35
36FileStat = namedtuple('FileStat',
37                      ['path', 'status', 'actual_digest', 'expected_digest'])
38args = None
39
40
41def relpath(path):
42  return os.path.relpath(path, ROOT_DIR)
43
44
45def download(url, out_file):
46  subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url])
47
48
49def list_files(path, scan_new_files=False):
50  """ List files recursively in path.
51
52  If scan_new_files=False, returns only files with a maching xxx.sha256 tracker.
53  If scan_new_files=True returns all files including untracked ones.
54  """
55  seen = set()
56  for root, _, files in os.walk(path):
57    for fname in files:
58      if fname.endswith('.swp'):
59        continue  # Temporary files left around if CTRL-C-ing while downloading.
60      if fname == "OWNERS":
61        continue  # OWNERS file should not be uploaded.
62      fpath = os.path.join(root, fname)
63      if not os.path.isfile(fpath) or fname.startswith('.'):
64        continue
65      if fpath.endswith(SUFFIX):
66        fpath = fpath[:-len(SUFFIX)]
67      elif not scan_new_files:
68        continue
69      if fpath not in seen:
70        seen.add(fpath)
71        yield fpath
72
73
74def hash_file(fpath):
75  hasher = hashlib.sha256()
76  with open(fpath, 'rb') as f:
77    for chunk in iter(lambda: f.read(32768), b''):
78      hasher.update(chunk)
79  return hasher.hexdigest()
80
81
82def map_concurrently(fn, files):
83  done = 0
84  for fs in ThreadPool(args.jobs).imap_unordered(fn, files):
85    assert (isinstance(fs, FileStat))
86    done += 1
87    if not args.quiet:
88      print(
89          '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]),
90          end='\r')
91  if not args.quiet:
92    print('')
93
94
95def get_file_status(fpath):
96  sha_file = fpath + SUFFIX
97  sha_exists = os.path.exists(sha_file)
98  file_exists = os.path.exists(fpath)
99  actual_digest = None
100  expected_digest = None
101  if sha_exists:
102    with open(sha_file, 'r') as f:
103      expected_digest = f.readline().strip()
104  if file_exists:
105    actual_digest = hash_file(fpath)
106  if sha_exists and not file_exists:
107    status = FS_MISSING
108  elif not sha_exists and file_exists:
109    status = FS_NEW_FILE
110  elif not sha_exists and not file_exists:
111    raise Exception(fpath)
112  elif expected_digest == actual_digest:
113    status = FS_MATCH
114  else:
115    status = FS_MODIFIED
116  return FileStat(fpath, status, actual_digest, expected_digest)
117
118
119def cmd_upload(dir):
120  all_files = list_files(dir, scan_new_files=True)
121  files_to_upload = []
122  for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
123    if fs.status in (FS_NEW_FILE, FS_MODIFIED):
124      files_to_upload.append(fs)
125  if len(files_to_upload) == 0:
126    if not args.quiet:
127      print('No modified or new files require uploading')
128    return 0
129  if args.dry_run:
130    return 0
131  if not args.quiet:
132    print('About to upload %d files:' % len(files_to_upload))
133    print('\n'.join(relpath(f.path) for f in files_to_upload))
134    print('')
135    input('Press a key to continue or CTRL-C to abort')
136
137  def upload_one_file(fs):
138    assert (fs.actual_digest is not None)
139    dst_name = '%s/%s-%s' % (args.bucket, os.path.basename(
140        fs.path), fs.actual_digest)
141    cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name]
142    logging.debug(' '.join(cmd))
143    subprocess.check_call(cmd)
144    with open(fs.path + SUFFIX + '.swp', 'w') as f:
145      f.write(fs.actual_digest)
146    os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX)
147    return fs
148
149  map_concurrently(upload_one_file, files_to_upload)
150  return 0
151
152
153def cmd_download(dir, overwrite_locally_modified=False):
154  files_to_download = []
155  modified = []
156  all_files = list_files(dir, scan_new_files=False)
157  for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
158    if fs.status == FS_MISSING:
159      files_to_download.append(fs)
160    elif fs.status == FS_MODIFIED:
161      modified.append(fs)
162
163  if len(modified) > 0 and not overwrite_locally_modified:
164    print('WARNING: The following files diverged locally and will NOT be ' +
165          'overwritten if you continue')
166    print('\n'.join(relpath(f.path) for f in modified))
167    print('')
168    print('Re run `download --overwrite` to overwrite locally modified files')
169    print('or `upload` to sync them on the GCS bucket')
170    print('')
171    input('Press a key to continue or CTRL-C to abort')
172  elif overwrite_locally_modified:
173    files_to_download += modified
174
175  if len(files_to_download) == 0:
176    if not args.quiet:
177      print('Nothing to do, all files are synced')
178    return 0
179
180  if not args.quiet:
181    print('Downloading %d files in //%s' %
182          (len(files_to_download), relpath(args.dir)))
183  if args.dry_run:
184    print('\n'.join(files_to_download))
185    return
186
187  def download_one_file(fs):
188    assert (fs.expected_digest is not None)
189    uri = '%s/%s-%s' % (args.bucket, os.path.basename(
190        fs.path), fs.expected_digest)
191    uri = uri.replace('gs://', 'https://storage.googleapis.com/')
192    logging.debug(uri)
193    tmp_path = fs.path + '.swp'
194    download(uri, tmp_path)
195    digest = hash_file(tmp_path)
196    if digest != fs.expected_digest:
197      raise Exception('Mismatching digest for %s. expected=%s, actual=%s' %
198                      (uri, fs.expected_digest, digest))
199    os.replace(tmp_path, fs.path)
200    return fs
201
202  map_concurrently(download_one_file, files_to_download)
203  return 0
204
205
206def cmd_status(dir):
207  files = list_files(dir, scan_new_files=True)
208  file_by_status = defaultdict(list)
209  num_files = 0
210  num_out_of_sync = 0
211  for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files):
212    file_by_status[fs.status].append(relpath(fs.path))
213    num_files += 1
214  for status, rpaths in sorted(file_by_status.items()):
215    if status == FS_NEW_FILE and args.ignore_new:
216      continue
217    if status != FS_MATCH:
218      for rpath in rpaths:
219        num_out_of_sync += 1
220        if not args.quiet:
221          print('%-15s: %s' % (status, rpath))
222  if num_out_of_sync == 0:
223    if not args.quiet:
224      print('Scanned %d files in //%s, everything in sync.' %
225            (num_files, relpath(dir)))
226    return 0
227  return 1
228
229
230def main():
231  parser = argparse.ArgumentParser()
232  parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data'))
233  parser.add_argument('--overwrite', action='store_true')
234  parser.add_argument('--bucket', default=BUCKET)
235  parser.add_argument('--jobs', '-j', default=10, type=int)
236  parser.add_argument('--dry-run', '-n', action='store_true')
237  parser.add_argument('--quiet', '-q', action='store_true')
238  parser.add_argument('--verbose', '-v', action='store_true')
239  parser.add_argument('--ignore-new', action='store_true')
240  parser.add_argument('cmd', choices=['status', 'download', 'upload'])
241  global args
242  args = parser.parse_args()
243  logging.basicConfig(
244      format='%(asctime)s %(levelname).1s %(message)s',
245      level=logging.DEBUG if args.verbose else logging.INFO,
246      datefmt=r'%H:%M:%S')
247  if args.cmd == 'status':
248    return cmd_status(args.dir)
249  if args.cmd == 'download':
250    return cmd_download(args.dir, overwrite_locally_modified=args.overwrite)
251  if args.cmd == 'upload':
252    return cmd_upload(args.dir)
253  print('Unknown command: %s' % args.cmd)
254
255
256if __name__ == '__main__':
257  sys.exit(main())
258