#!/usr/bin/env python3 """ Script to synchronize (local>remote and viceversa) test data files from/to GCS. //test/data files are not checked in the codebase because they are large binary file and change frequently. Instead we check-in only xxx.sha256 files, which contain the SHA-256 of the actual binary file, and sync them from a GCS bucket. File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 . Usage: ./test_data status # Prints the status of new & modified files. ./test_data download # To sync remote>local (used by install-build-deps). ./test_data upload # To upload newly created and modified files. """ import argparse import logging import os import sys import hashlib import subprocess from multiprocessing.pool import ThreadPool from collections import namedtuple, defaultdict ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) BUCKET = 'gs://perfetto/test_data' SUFFIX = '.sha256' FS_MATCH = 'matches' FS_NEW_FILE = 'needs upload' FS_MODIFIED = 'modified' FS_MISSING = 'needs download' FileStat = namedtuple('FileStat', ['path', 'status', 'actual_digest', 'expected_digest']) args = None def relpath(path): return os.path.relpath(path, ROOT_DIR) def download(url, out_file): subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url]) def list_files(path, scan_new_files=False): """ List files recursively in path. If scan_new_files=False, returns only files with a maching xxx.sha256 tracker. If scan_new_files=True returns all files including untracked ones. """ seen = set() for root, _, files in os.walk(path): for fname in files: if fname.endswith('.swp'): continue # Temporary files left around if CTRL-C-ing while downloading. if fname == "OWNERS": continue # OWNERS file should not be uploaded. fpath = os.path.join(root, fname) if not os.path.isfile(fpath) or fname.startswith('.'): continue if fpath.endswith(SUFFIX): fpath = fpath[:-len(SUFFIX)] elif not scan_new_files: continue if fpath not in seen: seen.add(fpath) yield fpath def hash_file(fpath): hasher = hashlib.sha256() with open(fpath, 'rb') as f: for chunk in iter(lambda: f.read(32768), b''): hasher.update(chunk) return hasher.hexdigest() def map_concurrently(fn, files): done = 0 for fs in ThreadPool(args.jobs).imap_unordered(fn, files): assert (isinstance(fs, FileStat)) done += 1 if not args.quiet: print( '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]), end='\r') if not args.quiet: print('') def get_file_status(fpath): sha_file = fpath + SUFFIX sha_exists = os.path.exists(sha_file) file_exists = os.path.exists(fpath) actual_digest = None expected_digest = None if sha_exists: with open(sha_file, 'r') as f: expected_digest = f.readline().strip() if file_exists: actual_digest = hash_file(fpath) if sha_exists and not file_exists: status = FS_MISSING elif not sha_exists and file_exists: status = FS_NEW_FILE elif not sha_exists and not file_exists: raise Exception(fpath) elif expected_digest == actual_digest: status = FS_MATCH else: status = FS_MODIFIED return FileStat(fpath, status, actual_digest, expected_digest) def cmd_upload(dir): all_files = list_files(dir, scan_new_files=True) files_to_upload = [] for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): if fs.status in (FS_NEW_FILE, FS_MODIFIED): files_to_upload.append(fs) if len(files_to_upload) == 0: if not args.quiet: print('No modified or new files require uploading') return 0 if args.dry_run: return 0 if not args.quiet: print('About to upload %d files:' % len(files_to_upload)) print('\n'.join(relpath(f.path) for f in files_to_upload)) print('') input('Press a key to continue or CTRL-C to abort') def upload_one_file(fs): assert (fs.actual_digest is not None) dst_name = '%s/%s-%s' % (args.bucket, os.path.basename( fs.path), fs.actual_digest) cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name] logging.debug(' '.join(cmd)) subprocess.check_call(cmd) with open(fs.path + SUFFIX + '.swp', 'w') as f: f.write(fs.actual_digest) os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX) return fs map_concurrently(upload_one_file, files_to_upload) return 0 def cmd_download(dir, overwrite_locally_modified=False): files_to_download = [] modified = [] all_files = list_files(dir, scan_new_files=False) for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): if fs.status == FS_MISSING: files_to_download.append(fs) elif fs.status == FS_MODIFIED: modified.append(fs) if len(modified) > 0 and not overwrite_locally_modified: print('WARNING: The following files diverged locally and will NOT be ' + 'overwritten if you continue') print('\n'.join(relpath(f.path) for f in modified)) print('') print('Re run `download --overwrite` to overwrite locally modified files') print('or `upload` to sync them on the GCS bucket') print('') input('Press a key to continue or CTRL-C to abort') elif overwrite_locally_modified: files_to_download += modified if len(files_to_download) == 0: if not args.quiet: print('Nothing to do, all files are synced') return 0 if not args.quiet: print('Downloading %d files in //%s' % (len(files_to_download), relpath(args.dir))) if args.dry_run: print('\n'.join(files_to_download)) return def download_one_file(fs): assert (fs.expected_digest is not None) uri = '%s/%s-%s' % (args.bucket, os.path.basename( fs.path), fs.expected_digest) uri = uri.replace('gs://', 'https://storage.googleapis.com/') logging.debug(uri) tmp_path = fs.path + '.swp' download(uri, tmp_path) digest = hash_file(tmp_path) if digest != fs.expected_digest: raise Exception('Mismatching digest for %s. expected=%s, actual=%s' % (uri, fs.expected_digest, digest)) os.replace(tmp_path, fs.path) return fs map_concurrently(download_one_file, files_to_download) return 0 def cmd_status(dir): files = list_files(dir, scan_new_files=True) file_by_status = defaultdict(list) num_files = 0 num_out_of_sync = 0 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files): file_by_status[fs.status].append(relpath(fs.path)) num_files += 1 for status, rpaths in sorted(file_by_status.items()): if status == FS_NEW_FILE and args.ignore_new: continue if status != FS_MATCH: for rpath in rpaths: num_out_of_sync += 1 if not args.quiet: print('%-15s: %s' % (status, rpath)) if num_out_of_sync == 0: if not args.quiet: print('Scanned %d files in //%s, everything in sync.' % (num_files, relpath(dir))) return 0 return 1 def main(): parser = argparse.ArgumentParser() parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data')) parser.add_argument('--overwrite', action='store_true') parser.add_argument('--bucket', default=BUCKET) parser.add_argument('--jobs', '-j', default=10, type=int) parser.add_argument('--dry-run', '-n', action='store_true') parser.add_argument('--quiet', '-q', action='store_true') parser.add_argument('--verbose', '-v', action='store_true') parser.add_argument('--ignore-new', action='store_true') parser.add_argument('cmd', choices=['status', 'download', 'upload']) global args args = parser.parse_args() logging.basicConfig( format='%(asctime)s %(levelname).1s %(message)s', level=logging.DEBUG if args.verbose else logging.INFO, datefmt=r'%H:%M:%S') if args.cmd == 'status': return cmd_status(args.dir) if args.cmd == 'download': return cmd_download(args.dir, overwrite_locally_modified=args.overwrite) if args.cmd == 'upload': return cmd_upload(args.dir) print('Unknown command: %s' % args.cmd) if __name__ == '__main__': sys.exit(main())