1#!/usr/bin/env python3 2""" 3Script to synchronize (local>remote and viceversa) test data files from/to GCS. 4 5//test/data files are not checked in the codebase because they are large binary 6file and change frequently. Instead we check-in only xxx.sha256 files, which 7contain the SHA-256 of the actual binary file, and sync them from a GCS bucket. 8 9File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 . 10 11Usage: 12./test_data status # Prints the status of new & modified files. 13./test_data download # To sync remote>local (used by install-build-deps). 14./test_data upload # To upload newly created and modified files. 15""" 16 17import argparse 18import logging 19import os 20import sys 21import hashlib 22import subprocess 23 24from multiprocessing.pool import ThreadPool 25from collections import namedtuple, defaultdict 26 27ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 28BUCKET = 'gs://perfetto/test_data' 29SUFFIX = '.sha256' 30 31FS_MATCH = 'matches' 32FS_NEW_FILE = 'needs upload' 33FS_MODIFIED = 'modified' 34FS_MISSING = 'needs download' 35 36FileStat = namedtuple('FileStat', 37 ['path', 'status', 'actual_digest', 'expected_digest']) 38args = None 39 40 41def relpath(path): 42 return os.path.relpath(path, ROOT_DIR) 43 44 45def download(url, out_file): 46 subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url]) 47 48 49def list_files(path, scan_new_files=False): 50 """ List files recursively in path. 51 52 If scan_new_files=False, returns only files with a maching xxx.sha256 tracker. 53 If scan_new_files=True returns all files including untracked ones. 54 """ 55 seen = set() 56 for root, _, files in os.walk(path): 57 for fname in files: 58 if fname.endswith('.swp'): 59 continue # Temporary files left around if CTRL-C-ing while downloading. 60 if fname == "OWNERS": 61 continue # OWNERS file should not be uploaded. 62 fpath = os.path.join(root, fname) 63 if not os.path.isfile(fpath) or fname.startswith('.'): 64 continue 65 if fpath.endswith(SUFFIX): 66 fpath = fpath[:-len(SUFFIX)] 67 elif not scan_new_files: 68 continue 69 if fpath not in seen: 70 seen.add(fpath) 71 yield fpath 72 73 74def hash_file(fpath): 75 hasher = hashlib.sha256() 76 with open(fpath, 'rb') as f: 77 for chunk in iter(lambda: f.read(32768), b''): 78 hasher.update(chunk) 79 return hasher.hexdigest() 80 81 82def map_concurrently(fn, files): 83 done = 0 84 for fs in ThreadPool(args.jobs).imap_unordered(fn, files): 85 assert (isinstance(fs, FileStat)) 86 done += 1 87 if not args.quiet: 88 print( 89 '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]), 90 end='\r') 91 if not args.quiet: 92 print('') 93 94 95def get_file_status(fpath): 96 sha_file = fpath + SUFFIX 97 sha_exists = os.path.exists(sha_file) 98 file_exists = os.path.exists(fpath) 99 actual_digest = None 100 expected_digest = None 101 if sha_exists: 102 with open(sha_file, 'r') as f: 103 expected_digest = f.readline().strip() 104 if file_exists: 105 actual_digest = hash_file(fpath) 106 if sha_exists and not file_exists: 107 status = FS_MISSING 108 elif not sha_exists and file_exists: 109 status = FS_NEW_FILE 110 elif not sha_exists and not file_exists: 111 raise Exception(fpath) 112 elif expected_digest == actual_digest: 113 status = FS_MATCH 114 else: 115 status = FS_MODIFIED 116 return FileStat(fpath, status, actual_digest, expected_digest) 117 118 119def cmd_upload(dir): 120 all_files = list_files(dir, scan_new_files=True) 121 files_to_upload = [] 122 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): 123 if fs.status in (FS_NEW_FILE, FS_MODIFIED): 124 files_to_upload.append(fs) 125 if len(files_to_upload) == 0: 126 if not args.quiet: 127 print('No modified or new files require uploading') 128 return 0 129 if args.dry_run: 130 return 0 131 if not args.quiet: 132 print('About to upload %d files:' % len(files_to_upload)) 133 print('\n'.join(relpath(f.path) for f in files_to_upload)) 134 print('') 135 input('Press a key to continue or CTRL-C to abort') 136 137 def upload_one_file(fs): 138 assert (fs.actual_digest is not None) 139 dst_name = '%s/%s-%s' % (args.bucket, os.path.basename( 140 fs.path), fs.actual_digest) 141 cmd = ['gsutil', '-q', 'cp', '-n', '-a', 'public-read', fs.path, dst_name] 142 logging.debug(' '.join(cmd)) 143 subprocess.check_call(cmd) 144 with open(fs.path + SUFFIX + '.swp', 'w') as f: 145 f.write(fs.actual_digest) 146 os.replace(fs.path + SUFFIX + '.swp', fs.path + SUFFIX) 147 return fs 148 149 map_concurrently(upload_one_file, files_to_upload) 150 return 0 151 152 153def cmd_download(dir, overwrite_locally_modified=False): 154 files_to_download = [] 155 modified = [] 156 all_files = list_files(dir, scan_new_files=False) 157 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files): 158 if fs.status == FS_MISSING: 159 files_to_download.append(fs) 160 elif fs.status == FS_MODIFIED: 161 modified.append(fs) 162 163 if len(modified) > 0 and not overwrite_locally_modified: 164 print('WARNING: The following files diverged locally and will NOT be ' + 165 'overwritten if you continue') 166 print('\n'.join(relpath(f.path) for f in modified)) 167 print('') 168 print('Re run `download --overwrite` to overwrite locally modified files') 169 print('or `upload` to sync them on the GCS bucket') 170 print('') 171 input('Press a key to continue or CTRL-C to abort') 172 elif overwrite_locally_modified: 173 files_to_download += modified 174 175 if len(files_to_download) == 0: 176 if not args.quiet: 177 print('Nothing to do, all files are synced') 178 return 0 179 180 if not args.quiet: 181 print('Downloading %d files in //%s' % 182 (len(files_to_download), relpath(args.dir))) 183 if args.dry_run: 184 print('\n'.join(files_to_download)) 185 return 186 187 def download_one_file(fs): 188 assert (fs.expected_digest is not None) 189 uri = '%s/%s-%s' % (args.bucket, os.path.basename( 190 fs.path), fs.expected_digest) 191 uri = uri.replace('gs://', 'https://storage.googleapis.com/') 192 logging.debug(uri) 193 tmp_path = fs.path + '.swp' 194 download(uri, tmp_path) 195 digest = hash_file(tmp_path) 196 if digest != fs.expected_digest: 197 raise Exception('Mismatching digest for %s. expected=%s, actual=%s' % 198 (uri, fs.expected_digest, digest)) 199 os.replace(tmp_path, fs.path) 200 return fs 201 202 map_concurrently(download_one_file, files_to_download) 203 return 0 204 205 206def cmd_status(dir): 207 files = list_files(dir, scan_new_files=True) 208 file_by_status = defaultdict(list) 209 num_files = 0 210 num_out_of_sync = 0 211 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files): 212 file_by_status[fs.status].append(relpath(fs.path)) 213 num_files += 1 214 for status, rpaths in sorted(file_by_status.items()): 215 if status == FS_NEW_FILE and args.ignore_new: 216 continue 217 if status != FS_MATCH: 218 for rpath in rpaths: 219 num_out_of_sync += 1 220 if not args.quiet: 221 print('%-15s: %s' % (status, rpath)) 222 if num_out_of_sync == 0: 223 if not args.quiet: 224 print('Scanned %d files in //%s, everything in sync.' % 225 (num_files, relpath(dir))) 226 return 0 227 return 1 228 229 230def main(): 231 parser = argparse.ArgumentParser() 232 parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data')) 233 parser.add_argument('--overwrite', action='store_true') 234 parser.add_argument('--bucket', default=BUCKET) 235 parser.add_argument('--jobs', '-j', default=10, type=int) 236 parser.add_argument('--dry-run', '-n', action='store_true') 237 parser.add_argument('--quiet', '-q', action='store_true') 238 parser.add_argument('--verbose', '-v', action='store_true') 239 parser.add_argument('--ignore-new', action='store_true') 240 parser.add_argument('cmd', choices=['status', 'download', 'upload']) 241 global args 242 args = parser.parse_args() 243 logging.basicConfig( 244 format='%(asctime)s %(levelname).1s %(message)s', 245 level=logging.DEBUG if args.verbose else logging.INFO, 246 datefmt=r'%H:%M:%S') 247 if args.cmd == 'status': 248 return cmd_status(args.dir) 249 if args.cmd == 'download': 250 return cmd_download(args.dir, overwrite_locally_modified=args.overwrite) 251 if args.cmd == 'upload': 252 return cmd_upload(args.dir) 253 print('Unknown command: %s' % args.cmd) 254 255 256if __name__ == '__main__': 257 sys.exit(main()) 258