1#!/usr/bin/env python3 2""" 3Script to synchronize (local>remote and viceversa) test data files from/to GCS. 4 5//test/data files are not checked in the codebase because they are large binary 6file and change frequently. Instead we check-in only xxx.sha256 files, which 7contain the SHA-256 of the actual binary file, and sync them from a GCS bucket. 8 9File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 . 10 11Usage: 12./test_data status # Prints the status of new & modified files. 13./test_data download # To sync remote>local (used by install-build-deps). 14./test_data upload # To upload newly created and modified files. 15""" 16 17import argparse 18import logging 19import os 20import sys 21import hashlib 22import subprocess 23 24from multiprocessing.pool import ThreadPool 25from collections import namedtuple, defaultdict 26 27ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 28BUCKET = 'gs://perfetto/test_data' 29SUFFIX = '.sha256' 30 31FS_MATCH = 'matches' 32FS_NEW_FILE = 'needs upload' 33FS_MODIFIED = 'modified' 34FS_MISSING = 'needs download' 35 36FileStat = namedtuple('FileStat', 37 ['path', 'status', 'actual_digest', 'expected_digest']) 38args = None 39 40 41def relpath(path): 42 return os.path.relpath(path, ROOT_DIR) 43 44 45def download(url, out_file): 46 subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url]) 47 48 49def list_files(path, scan_new_files=False): 50 """ List files recursively in path. 51 52 If scan_new_files=False, returns only files with a maching xxx.sha256 tracker. 53 If scan_new_files=True returns all files including untracked ones. 54 """ 55 seen = set() 56 for root, _, files in os.walk(path): 57 for fname in files: 58 if fname.endswith('.swp'): 59 continue # Temporary files left around if CTRL-C-ing while downloading. 60 fpath = os.path.join(root, fname) 61 if not os.path.isfile(fpath) or fname.startswith('.'): 62 continue 63 if fpath.endswith(SUFFIX): 64 fpath = fpath[:-len(SUFFIX)] 65 elif not scan_new_files: 66 continue 67 if fpath not in seen: 68 seen.add(fpath) 69 yield fpath 70 71 72def hash_file(fpath): 73 hasher = hashlib.sha256() 74 with open(fpath, 'rb') as f: 75 for chunk in iter(lambda: f.read(32768), b''): 76 hasher.update(chunk) 77 return hasher.hexdigest() 78 79 80def map_concurrently(fn, files): 81 done = 0 82 for fs in ThreadPool(args.jobs).imap_unordered(fn, files): 83 assert (isinstance(fs, FileStat)) 84 done += 1 85 if not args.quiet: 86 print( 87 '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]), 88 end='\r') 89 if not args.quiet: 90 print('') 91 92 93def get_file_status(fpath): 94 sha_file = fpath + SUFFIX 95 sha_exists = os.path.exists(sha_file) 96 file_exists = os.path.exists(fpath) 97 actual_digest = None 98 expected_digest = None 99 if sha_exists: 100 with open(sha_file, 'r') as f: 101 expected_digest = f.readline().strip() 102 if file_exists: 103 actual_digest = hash_file(fpath) 104 if sha_exists and not file_exists: 105 status = FS_MISSING 106 elif not sha_exists and file_exists: 107 status = FS_NEW_FILE 108 elif not sha_exists and not file_exists: 109 raise Exception(fpath) 110 elif expected_digest == actual_digest: 111 status = FS_MATCH 112 else: 113 status = FS_MODIFIED 114 return FileStat(fpath, status, actual_digest, expected_digest) 115 116 117def cmd_upload(dir): 118 all_files = list_files(dir, scan_new_files=True) 119 files_to_upload = [] 120 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): 121 if fs.status in (FS_NEW_FILE, FS_MODIFIED): 122 files_to_upload.append(fs) 123 if len(files_to_upload) == 0: 124 if not args.quiet: 125 print('No modified or new files require uploading') 126 return 0 127 if args.dry_run: 128 return 0 129 if not args.quiet: 130 print('About to upload %d files:' % len(files_to_upload)) 131 print('\n'.join(relpath(f.path) for f in files_to_upload)) 132 print('') 133 input('Press a key to continue or CTRL-C to abort') 134 135 def upload_one_file(fs): 136 assert (fs.actual_digest is not None) 137 dst_name = '%s/%s-%s' % (args.bucket, os.path.basename( 138 fs.path), fs.actual_digest) 139 cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name] 140 logging.debug(' '.join(cmd)) 141 subprocess.check_call(cmd) 142 with open(fs.path + SUFFIX + '.swp', 'w') as f: 143 f.write(fs.actual_digest) 144 os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX) 145 return fs 146 147 map_concurrently(upload_one_file, files_to_upload) 148 return 0 149 150 151def cmd_download(dir, overwrite_locally_modified=False): 152 files_to_download = [] 153 modified = [] 154 all_files = list_files(dir, scan_new_files=False) 155 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): 156 if fs.status == FS_MISSING: 157 files_to_download.append(fs) 158 elif fs.status == FS_MODIFIED: 159 modified.append(fs) 160 161 if len(modified) > 0 and not overwrite_locally_modified: 162 print('WARNING: The following files diverged locally and will NOT be ' + 163 'overwritten if you continue') 164 print('\n'.join(relpath(f.path) for f in modified)) 165 print('') 166 print('Re run `download --overwrite` to overwrite locally modified files') 167 print('or `upload` to sync them on the GCS bucket') 168 print('') 169 input('Press a key to continue or CTRL-C to abort') 170 elif overwrite_locally_modified: 171 files_to_download += modified 172 173 if len(files_to_download) == 0: 174 if not args.quiet: 175 print('Nothing to do, all files are synced') 176 return 0 177 178 if not args.quiet: 179 print('Downloading %d files in //%s' % 180 (len(files_to_download), relpath(args.dir))) 181 if args.dry_run: 182 print('\n'.join(files_to_download)) 183 return 184 185 def download_one_file(fs): 186 assert (fs.expected_digest is not None) 187 uri = '%s/%s-%s' % (args.bucket, os.path.basename( 188 fs.path), fs.expected_digest) 189 uri = uri.replace('gs://', 'https://storage.googleapis.com/') 190 logging.debug(uri) 191 tmp_path = fs.path + '.swp' 192 download(uri, tmp_path) 193 digest = hash_file(tmp_path) 194 if digest != fs.expected_digest: 195 raise Exception('Mismatching digest for %s. expected=%s, actual=%s' % 196 (uri, fs.expected_digest, digest)) 197 os.replace(tmp_path, fs.path) 198 return fs 199 200 map_concurrently(download_one_file, files_to_download) 201 return 0 202 203 204def cmd_status(dir): 205 files = list_files(dir, scan_new_files=True) 206 file_by_status = defaultdict(list) 207 num_files = 0 208 num_out_of_sync = 0 209 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files): 210 file_by_status[fs.status].append(relpath(fs.path)) 211 num_files += 1 212 for status, rpaths in sorted(file_by_status.items()): 213 if status == FS_NEW_FILE and args.ignore_new: 214 continue 215 if status != FS_MATCH: 216 for rpath in rpaths: 217 num_out_of_sync += 1 218 if not args.quiet: 219 print('%-15s: %s' % (status, rpath)) 220 if num_out_of_sync == 0: 221 if not args.quiet: 222 print('Scanned %d files in //%s, everything in sync.' % 223 (num_files, relpath(dir))) 224 return 0 225 return 1 226 227 228def main(): 229 parser = argparse.ArgumentParser() 230 parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data')) 231 parser.add_argument('--overwrite', action='store_true') 232 parser.add_argument('--bucket', default=BUCKET) 233 parser.add_argument('--jobs', '-j', default=10, type=int) 234 parser.add_argument('--dry-run', '-n', action='store_true') 235 parser.add_argument('--quiet', '-q', action='store_true') 236 parser.add_argument('--verbose', '-v', action='store_true') 237 parser.add_argument('--ignore-new', action='store_true') 238 parser.add_argument('cmd', choices=['status', 'download', 'upload']) 239 global args 240 args = parser.parse_args() 241 logging.basicConfig( 242 format='%(asctime)s %(levelname).1s %(message)s', 243 level=logging.DEBUG if args.verbose else logging.INFO, 244 datefmt=r'%H:%M:%S') 245 if args.cmd == 'status': 246 return cmd_status(args.dir) 247 if args.cmd == 'download': 248 return cmd_download(args.dir, overwrite_locally_modified=args.overwrite) 249 if args.cmd == 'upload': 250 return cmd_upload(args.dir) 251 print('Unknown command: %s' % args.cmd) 252 253 254if __name__ == '__main__': 255 sys.exit(main()) 256