1#!/usr/bin/env python3 2# 3# Copyright (C) 2022 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""The script to upload generated artifacts from build server to CAS.""" 18 19import concurrent.futures 20import dataclasses 21import glob 22import json 23import logging 24import os 25import subprocess 26import tempfile 27import uuid 28import time 29from typing import Tuple 30 31import cas_metrics_pb2 # type: ignore 32from google.protobuf import json_format 33 34@dataclasses.dataclass 35class ArtifactConfig: 36 """Configuration of an artifact to be uploaded to CAS. 37 38 Attributes: 39 source_path: path to the artifact that relative to the root of source code. 40 unzip: true if the artifact should be unzipped and uploaded as a directory. 41 chunk: true if the artifact should be uploaded with chunking as a single file. 42 chunk_dir: true if the artifact should be uploaded with chunking as a directory. 43 exclude_filters: a list of regular expressions for files that are excluded from uploading. 44 """ 45 source_path: str 46 unzip: bool 47 standard: bool = True 48 chunk: bool = False 49 chunk_dir: bool = False 50 exclude_filters: list[str] = dataclasses.field(default_factory=list) 51 52 53@dataclasses.dataclass 54class CasInfo: 55 """Basic information of CAS server and client. 56 57 Attributes: 58 cas_instance: the instance name of CAS service. 59 cas_service: the address of CAS service. 60 client_path: path to the CAS uploader client. 61 version: version of the CAS uploader client, in turple format. 62 """ 63 cas_instance: str 64 cas_service: str 65 client_path: str 66 client_version: tuple 67 68 69@dataclasses.dataclass 70class UploadResult: 71 """Result of uploading a single artifact with CAS client. 72 73 Attributes: 74 digest: root digest of the artifact. 75 content_details: detail information of all uploaded files inside the uploaded artifact. 76 """ 77 digest: str 78 content_details: list[dict[str, any]] 79 log_file: str 80 81 82@dataclasses.dataclass 83class UploadTask: 84 """Task of uploading a single artifact with CAS client.""" 85 artifact: ArtifactConfig 86 path: str 87 working_dir: str 88 metrics_file: str 89 90 91UPLOADER_TIMEOUT_SECS = 600 # 10 minutes 92AVG_CHUNK_SIZE_IN_KB = 128 93DIGESTS_PATH = 'cas_digests.json' 94CONTENT_DETAILS_PATH = 'logs/cas_content_details.json' 95CHUNKED_ARTIFACT_NAME_PREFIX = "_chunked_" 96CHUNKED_DIR_ARTIFACT_NAME_PREFIX = "_chunked_dir_" 97 98 99class Uploader: 100 """Uploader for uploading artifacts to CAS remote.""" 101 def __init__(self, cas_info: CasInfo): 102 """Initialize the Uploader with CAS info.""" 103 self._cas_info = cas_info 104 105 @staticmethod 106 def setup_task_logger(working_dir: str) -> Tuple[logging.Logger, str]: 107 """Creates a logger for an individual uploader task.""" 108 task_id = uuid.uuid4() 109 logger = logging.getLogger(f"Uploader-{task_id}") 110 logger.setLevel(logging.DEBUG) 111 logger.propagate = False 112 113 log_file = os.path.join(working_dir, f"_uploader_{task_id}.log") 114 file_handler = logging.FileHandler(log_file) 115 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') 116 file_handler.setFormatter(formatter) 117 logger.addHandler(file_handler) 118 119 return logger, log_file 120 121 @staticmethod 122 def read_file(file_path: str) -> str: 123 """Returns contents of file.""" 124 try: 125 with open(file_path, "r", encoding="utf-8") as file: 126 return file.read() 127 except FileNotFoundError: 128 return f"Error: File '{file_path}' not found." 129 except Exception as e: 130 return f"Error: {e}" 131 132 @staticmethod 133 def _run_uploader_command(cmd: str, working_dir: str) -> str: 134 """"Run the uploader command using working_dir and returns the output.""" 135 log_file = os.path.join(working_dir, f'_casuploader_{uuid.uuid4()}.log') 136 with open(log_file, 'w', encoding='utf8') as outfile: 137 subprocess.run( 138 cmd, 139 check=True, 140 text=True, 141 stdout=outfile, 142 stderr=subprocess.STDOUT, 143 encoding='utf-8', 144 timeout=UPLOADER_TIMEOUT_SECS 145 ) 146 return Uploader.read_file(log_file) 147 148 def _upload_artifact(self, 149 artifact: ArtifactConfig, 150 working_dir: str, 151 metrics_file: str, 152 ) -> UploadResult: 153 """Upload the artifact to CAS using casuploader binary. 154 155 Args: 156 artifact: the artifact to be uploaded to CAS. 157 working_dir: the directory for intermediate files. 158 metrics_file: the metrics_file for the artifact. 159 160 Returns: the digest of the uploaded artifact, formatted as "<hash>/<size>". 161 returns None if artifact upload fails. 162 """ 163 logger, log_file = Uploader.setup_task_logger(working_dir) 164 165 # `-dump-file-details` only supports on cas uploader V1.0 or later. 166 dump_file_details = self._cas_info.client_version >= (1, 0) 167 if not dump_file_details: 168 logger.warning('-dump-file-details is not enabled') 169 170 # `-dump-metrics` only supports on cas uploader V1.3 or later. 171 dump_metrics = self._cas_info.client_version >= (1, 3) 172 if not dump_metrics: 173 logger.warning('-dump-metrics is not enabled') 174 175 with tempfile.NamedTemporaryFile(mode='w+') as digest_file, tempfile.NamedTemporaryFile( 176 mode='w+') as content_details_file: 177 logger.info( 178 'Uploading %s to CAS instance %s', artifact.source_path, self._cas_info.cas_instance 179 ) 180 181 cmd = [ 182 self._cas_info.client_path, 183 '-cas-instance', 184 self._cas_info.cas_instance, 185 '-cas-addr', 186 self._cas_info.cas_service, 187 '-dump-digest', 188 digest_file.name, 189 '-use-adc', 190 ] 191 192 cmd = cmd + Uploader._path_flag_for_artifact(artifact) 193 194 if artifact.chunk or artifact.chunk_dir: 195 cmd = cmd + ['-chunk', '-avg-chunk-size', str(AVG_CHUNK_SIZE_IN_KB)] 196 197 for exclude_filter in artifact.exclude_filters: 198 cmd = cmd + ['-exclude-filters', exclude_filter] 199 200 if dump_file_details: 201 cmd = cmd + ['-dump-file-details', content_details_file.name] 202 203 if dump_metrics: 204 cmd = cmd + ['-dump-metrics', metrics_file] 205 206 try: 207 logger.info('Running command: %s', cmd) 208 output = Uploader._run_uploader_command(cmd, working_dir) 209 logger.info('Command output:\n %s', output) 210 except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: 211 logger.warning( 212 'Failed to upload %s to CAS instance %s. Skip.\nError message: %s\nLog: %s', 213 artifact.source_path, self._cas_info.cas_instance, e, e.stdout, 214 ) 215 return None 216 except subprocess.SubprocessError as e: 217 logger.warning('Failed to upload %s to CAS instance %s. Skip.\n. Error %s', 218 artifact.source_path, self._cas_info.cas_instance, e) 219 return None 220 221 # Read digest of the root directory or file from dumped digest file. 222 digest = digest_file.read() 223 if digest: 224 logger.info('Uploaded %s to CAS. Digest: %s', artifact.source_path, digest) 225 else: 226 logger.warning( 227 'No digest is dumped for file %s, the uploading may fail.', 228 artifact.source_path, 229 ) 230 return None 231 232 content_details = None 233 if dump_file_details: 234 try: 235 content_details = json.loads(content_details_file.read()) 236 except json.JSONDecodeError as e: 237 logger.warning('Failed to parse uploaded content details: %s', e) 238 239 return UploadResult(digest, content_details, log_file) 240 241 @staticmethod 242 def _path_flag_for_artifact(artifact: ArtifactConfig) -> list[str]: 243 """Returns the path flag for the artifact.""" 244 if artifact.standard: 245 return ['-zip-path' if artifact.unzip else '-file-path', artifact.source_path] 246 if artifact.chunk: 247 return ['-file-path', artifact.source_path] 248 if artifact.chunk_dir: 249 return ['-zip-path', artifact.source_path] 250 # Should neve reach here. 251 return ['-file-path', artifact.source_path] 252 253 def _output_results( 254 self, 255 output_dir: str, 256 digests: dict[str, str], 257 content_details: list[dict[str, any]], 258 ): 259 """Outputs digests and content details.""" 260 digests_output = { 261 'cas_instance': self._cas_info.cas_instance, 262 'cas_service': self._cas_info.cas_service, 263 'client_version': '.'.join(map(str, self._cas_info.client_version)), 264 'files': digests, 265 } 266 output_path = os.path.join(output_dir, DIGESTS_PATH) 267 with open(output_path, 'w', encoding='utf8') as writer: 268 writer.write(json.dumps(digests_output, sort_keys=True, indent=2)) 269 logging.info('Output digests to %s', output_path) 270 271 output_path = os.path.join(output_dir, CONTENT_DETAILS_PATH) 272 with open(output_path, 'w', encoding='utf8') as writer: 273 writer.write(json.dumps(content_details, sort_keys=True, indent=2)) 274 logging.info('Output uploaded content details to %s', output_path) 275 276 def _upload_wrapper(self, task: UploadTask) -> Tuple[UploadResult, UploadTask]: 277 """Returns a wrapper for _upload_artifact that associates the result with the task.""" 278 return self._upload_artifact( 279 task.artifact, 280 task.working_dir, 281 task.metrics_file, 282 ), task 283 284 @staticmethod 285 def _glob_wrapper(args): 286 """Wrapper function for multiprocessing""" 287 dist_dir, artifact = args 288 289 files = [] 290 if artifact.source_path.startswith("./"): 291 files = glob.glob(dist_dir + artifact.source_path[1:]) 292 else: 293 files = glob.glob(dist_dir + '/**/' + artifact.source_path, recursive=True) 294 return (files, artifact) 295 296 def create_upload_tasks( 297 self, artifacts: list[ArtifactConfig], working_dir: str, dist_dir: str 298 ) -> list[UploadTask]: 299 """Creates upload tasks for the artifacts.""" 300 start = time.time() 301 302 tasks = [] 303 skip_files = [] 304 # Glob in parallel. Note that ThreadPoolExecutor doesn't help, likely due to GIL. 305 with concurrent.futures.ProcessPoolExecutor() as executor: 306 results = executor.map( 307 Uploader._glob_wrapper, 308 [(dist_dir, artifact) for artifact in artifacts], 309 ) 310 for files, artifact in results: 311 for file in files: 312 if os.path.isdir(file): 313 logging.warning('Ignore artifact match (dir): %s', file) 314 continue 315 rel_path = Uploader._get_relative_path(dist_dir, file) 316 for task_artifact in Uploader._artifact_variations(rel_path, artifact): 317 path = Uploader._artifact_path(rel_path, task_artifact) 318 if path in skip_files: 319 continue 320 skip_files.append(path) 321 task_artifact.source_path = file 322 _, task_metrics_file = tempfile.mkstemp(dir=working_dir) 323 task = UploadTask(task_artifact, path, working_dir, task_metrics_file) 324 tasks.append(task) 325 326 logging.info( 327 'Time of file globbing for all artifact configs: %d seconds', 328 time.time() - start, 329 ) 330 return tasks 331 332 @staticmethod 333 def _print_tasks(tasks: list[UploadTask]): 334 """Outputs info for upload tasks.""" 335 for task in tasks: 336 unzip = '+' if task.artifact.unzip else '-' 337 print(f"{task.path:<40} {unzip} {task.artifact.source_path}") 338 print(f"Total: {len(tasks)} files.") 339 340 def upload(self, artifacts: list[ArtifactConfig], dist_dir: str, 341 max_works: int, dryrun: bool = False) -> cas_metrics_pb2.CasMetrics: 342 """Uploads artifacts to CAS remote""" 343 file_digests = {} 344 content_details = [] 345 346 cas_metrics = cas_metrics_pb2.CasMetrics() 347 with tempfile.TemporaryDirectory() as working_dir: 348 logging.info('The working dir is %s', working_dir) 349 350 tasks = self.create_upload_tasks(artifacts, working_dir, dist_dir) 351 logging.info('Uploading %d files, max workers = %d', len(tasks), max_works) 352 if dryrun: 353 Uploader._print_tasks(tasks) 354 return cas_metrics 355 356 # Upload artifacts in parallel 357 logging.info('==== Start uploading %d artifact(s) in parallel ====\n', len(tasks)) 358 with concurrent.futures.ThreadPoolExecutor(max_workers=max_works) as executor: 359 futures = [executor.submit(self._upload_wrapper, task) for task in tasks] 360 361 index = 1 362 for future in concurrent.futures.as_completed(futures): 363 result, task = future.result() 364 if result: 365 output = Uploader.read_file(result.log_file) if result else '' 366 logging.info('---- %s: %s ----\n\n%s', index, task.path, output) 367 else: 368 logging.info('---- %s: %s ----\n\n', index, task.path) 369 index += 1 370 if result and result.digest: 371 file_digests[task.path] = result.digest 372 else: 373 logging.warning( 374 'Skip to save the digest of file %s, the uploading may fail', 375 task.path, 376 ) 377 if result and result.content_details: 378 content_details.append({"artifact": task.path, 379 "details": result.content_details}) 380 else: 381 logging.warning('Skip to save the content details of file %s', task.path) 382 383 if os.path.exists(task.metrics_file): 384 Uploader._add_artifact_metrics(task.metrics_file, cas_metrics) 385 os.remove(task.metrics_file) 386 logging.info('==== Uploading of artifacts completed ====') 387 388 self._output_results( 389 dist_dir, 390 file_digests, 391 content_details, 392 ) 393 return cas_metrics 394 395 @staticmethod 396 def _add_artifact_metrics(metrics_file: str, cas_metrics: cas_metrics_pb2.CasMetrics): 397 """Adds artifact metrics from metrics_file to cas_metrics.""" 398 try: 399 with open(metrics_file, "r", encoding='utf8') as file: 400 json_str = file.read() # Read the file contents here 401 if json_str: 402 json_metrics = json.loads(json_str) 403 cas_metrics.artifacts.append( 404 json_format.ParseDict(json_metrics, cas_metrics_pb2.ArtifactMetrics()) 405 ) 406 else: 407 logging.exception("Empty file: %s", metrics_file) 408 except FileNotFoundError: 409 logging.exception("File not found: %s", metrics_file) 410 except json.JSONDecodeError as e: 411 logging.exception("Jason decode error: %s for json contents:\n%s", e, json_str) 412 except json_format.ParseError as e: # Catch any other unexpected errors 413 logging.exception("Error converting Json to protobuf: %s", e) 414 415 @staticmethod 416 def _get_relative_path(dir: str, path: str) -> str: 417 """Returns the relative path from dir, falls back to basename on error.""" 418 try: 419 return os.path.relpath(path, dir) 420 except ValueError as e: 421 logging.exception("Error calculating relative path: %s", e) 422 return os.path.basename(path) 423 424 @staticmethod 425 def _artifact_path(path: str, artifact: ArtifactConfig) -> str: 426 """Returns unique artifact path for saving in cas_digest.json.""" 427 if artifact.chunk: 428 return CHUNKED_ARTIFACT_NAME_PREFIX + path 429 if artifact.chunk_dir: 430 return CHUNKED_DIR_ARTIFACT_NAME_PREFIX + path 431 return path 432 433 @staticmethod 434 def _artifact_variations(path: str, artifact: ArtifactConfig) -> list[ArtifactConfig]: 435 """Returns variations of the artifact for upload based on artifact attributes.""" 436 variations = [] 437 if artifact.standard: 438 variations.append(ArtifactConfig(path, artifact.unzip, True, False, False, 439 exclude_filters=artifact.exclude_filters)) 440 if artifact.chunk: 441 variations.append(ArtifactConfig(path, False, False, True, False, 442 exclude_filters=artifact.exclude_filters)) 443 if artifact.chunk_dir: 444 variations.append(ArtifactConfig(path, True, False, False, True, 445 exclude_filters=artifact.exclude_filters)) 446 return variations 447