• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#  Copyright (C) 2022 The Android Open Source Project
4#
5#  Licensed under the Apache License, Version 2.0 (the "License");
6#  you may not use this file except in compliance with the License.
7#  You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#  Unless required by applicable law or agreed to in writing, software
12#  distributed under the License is distributed on an "AS IS" BASIS,
13#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#  See the License for the specific language governing permissions and
15#  limitations under the License.
16
17"""The script to upload generated artifacts from build server to CAS."""
18
19import concurrent.futures
20import dataclasses
21import glob
22import json
23import logging
24import os
25import subprocess
26import tempfile
27import uuid
28import time
29from typing import Tuple
30
31import cas_metrics_pb2  # type: ignore
32from google.protobuf import json_format
33
34@dataclasses.dataclass
35class ArtifactConfig:
36    """Configuration of an artifact to be uploaded to CAS.
37
38    Attributes:
39        source_path: path to the artifact that relative to the root of source code.
40        unzip: true if the artifact should be unzipped and uploaded as a directory.
41        chunk: true if the artifact should be uploaded with chunking as a single file.
42        chunk_dir: true if the artifact should be uploaded with chunking as a directory.
43        exclude_filters: a list of regular expressions for files that are excluded from uploading.
44    """
45    source_path: str
46    unzip: bool
47    standard: bool = True
48    chunk: bool = False
49    chunk_dir: bool = False
50    exclude_filters: list[str] = dataclasses.field(default_factory=list)
51
52
53@dataclasses.dataclass
54class CasInfo:
55    """Basic information of CAS server and client.
56
57    Attributes:
58        cas_instance: the instance name of CAS service.
59        cas_service: the address of CAS service.
60        client_path: path to the CAS uploader client.
61        version: version of the CAS uploader client, in turple format.
62    """
63    cas_instance: str
64    cas_service: str
65    client_path: str
66    client_version: tuple
67
68
69@dataclasses.dataclass
70class UploadResult:
71    """Result of uploading a single artifact with CAS client.
72
73    Attributes:
74        digest: root digest of the artifact.
75        content_details: detail information of all uploaded files inside the uploaded artifact.
76    """
77    digest: str
78    content_details: list[dict[str, any]]
79    log_file: str
80
81
82@dataclasses.dataclass
83class UploadTask:
84    """Task of uploading a single artifact with CAS client."""
85    artifact: ArtifactConfig
86    path: str
87    working_dir: str
88    metrics_file: str
89
90
91UPLOADER_TIMEOUT_SECS = 600  # 10 minutes
92AVG_CHUNK_SIZE_IN_KB = 128
93DIGESTS_PATH = 'cas_digests.json'
94CONTENT_DETAILS_PATH = 'logs/cas_content_details.json'
95CHUNKED_ARTIFACT_NAME_PREFIX = "_chunked_"
96CHUNKED_DIR_ARTIFACT_NAME_PREFIX = "_chunked_dir_"
97
98
99class Uploader:
100    """Uploader for uploading artifacts to CAS remote."""
101    def __init__(self, cas_info: CasInfo):
102        """Initialize the Uploader with CAS info."""
103        self._cas_info = cas_info
104
105    @staticmethod
106    def setup_task_logger(working_dir: str) -> Tuple[logging.Logger, str]:
107        """Creates a logger for an individual uploader task."""
108        task_id = uuid.uuid4()
109        logger = logging.getLogger(f"Uploader-{task_id}")
110        logger.setLevel(logging.DEBUG)
111        logger.propagate = False
112
113        log_file = os.path.join(working_dir, f"_uploader_{task_id}.log")
114        file_handler = logging.FileHandler(log_file)
115        formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
116        file_handler.setFormatter(formatter)
117        logger.addHandler(file_handler)
118
119        return logger, log_file
120
121    @staticmethod
122    def read_file(file_path: str) -> str:
123        """Returns contents of file."""
124        try:
125            with open(file_path, "r", encoding="utf-8") as file:
126                return file.read()
127        except FileNotFoundError:
128            return f"Error: File '{file_path}' not found."
129        except Exception as e:
130            return f"Error: {e}"
131
132    @staticmethod
133    def _run_uploader_command(cmd: str, working_dir: str) -> str:
134        """"Run the uploader command using working_dir and returns the output."""
135        log_file = os.path.join(working_dir, f'_casuploader_{uuid.uuid4()}.log')
136        with open(log_file, 'w', encoding='utf8') as outfile:
137            subprocess.run(
138                cmd,
139                check=True,
140                text=True,
141                stdout=outfile,
142                stderr=subprocess.STDOUT,
143                encoding='utf-8',
144                timeout=UPLOADER_TIMEOUT_SECS
145            )
146        return Uploader.read_file(log_file)
147
148    def _upload_artifact(self,
149            artifact: ArtifactConfig,
150            working_dir: str,
151            metrics_file: str,
152    ) -> UploadResult:
153        """Upload the artifact to CAS using casuploader binary.
154
155        Args:
156        artifact: the artifact to be uploaded to CAS.
157        working_dir: the directory for intermediate files.
158        metrics_file: the metrics_file for the artifact.
159
160        Returns: the digest of the uploaded artifact, formatted as "<hash>/<size>".
161        returns None if artifact upload fails.
162        """
163        logger, log_file = Uploader.setup_task_logger(working_dir)
164
165        # `-dump-file-details` only supports on cas uploader V1.0 or later.
166        dump_file_details = self._cas_info.client_version >= (1, 0)
167        if not dump_file_details:
168            logger.warning('-dump-file-details is not enabled')
169
170        # `-dump-metrics` only supports on cas uploader V1.3 or later.
171        dump_metrics = self._cas_info.client_version >= (1, 3)
172        if not dump_metrics:
173            logger.warning('-dump-metrics is not enabled')
174
175        with tempfile.NamedTemporaryFile(mode='w+') as digest_file, tempfile.NamedTemporaryFile(
176        mode='w+') as content_details_file:
177            logger.info(
178                'Uploading %s to CAS instance %s', artifact.source_path, self._cas_info.cas_instance
179            )
180
181            cmd = [
182                self._cas_info.client_path,
183                '-cas-instance',
184                self._cas_info.cas_instance,
185                '-cas-addr',
186                self._cas_info.cas_service,
187                '-dump-digest',
188                digest_file.name,
189                '-use-adc',
190            ]
191
192            cmd = cmd + Uploader._path_flag_for_artifact(artifact)
193
194            if artifact.chunk or artifact.chunk_dir:
195                cmd = cmd + ['-chunk', '-avg-chunk-size', str(AVG_CHUNK_SIZE_IN_KB)]
196
197            for exclude_filter in artifact.exclude_filters:
198                cmd = cmd + ['-exclude-filters', exclude_filter]
199
200            if dump_file_details:
201                cmd = cmd + ['-dump-file-details', content_details_file.name]
202
203            if dump_metrics:
204                cmd = cmd + ['-dump-metrics', metrics_file]
205
206            try:
207                logger.info('Running command: %s', cmd)
208                output = Uploader._run_uploader_command(cmd, working_dir)
209                logger.info('Command output:\n %s', output)
210            except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
211                logger.warning(
212                    'Failed to upload %s to CAS instance %s. Skip.\nError message: %s\nLog: %s',
213                    artifact.source_path, self._cas_info.cas_instance, e, e.stdout,
214                )
215                return None
216            except subprocess.SubprocessError as e:
217                logger.warning('Failed to upload %s to CAS instance %s. Skip.\n. Error %s',
218                    artifact.source_path, self._cas_info.cas_instance, e)
219                return None
220
221            # Read digest of the root directory or file from dumped digest file.
222            digest = digest_file.read()
223            if digest:
224                logger.info('Uploaded %s to CAS. Digest: %s', artifact.source_path, digest)
225            else:
226                logger.warning(
227                    'No digest is dumped for file %s, the uploading may fail.',
228                    artifact.source_path,
229                )
230                return None
231
232            content_details = None
233            if dump_file_details:
234                try:
235                    content_details = json.loads(content_details_file.read())
236                except json.JSONDecodeError as e:
237                    logger.warning('Failed to parse uploaded content details: %s', e)
238
239            return UploadResult(digest, content_details, log_file)
240
241    @staticmethod
242    def _path_flag_for_artifact(artifact: ArtifactConfig) -> list[str]:
243        """Returns the path flag for the artifact."""
244        if artifact.standard:
245            return ['-zip-path' if artifact.unzip else '-file-path', artifact.source_path]
246        if artifact.chunk:
247            return ['-file-path', artifact.source_path]
248        if artifact.chunk_dir:
249            return ['-zip-path', artifact.source_path]
250        # Should neve reach here.
251        return ['-file-path', artifact.source_path]
252
253    def _output_results(
254            self,
255            output_dir: str,
256            digests: dict[str, str],
257            content_details: list[dict[str, any]],
258    ):
259        """Outputs digests and content details."""
260        digests_output = {
261            'cas_instance': self._cas_info.cas_instance,
262            'cas_service': self._cas_info.cas_service,
263            'client_version': '.'.join(map(str, self._cas_info.client_version)),
264            'files': digests,
265        }
266        output_path = os.path.join(output_dir, DIGESTS_PATH)
267        with open(output_path, 'w', encoding='utf8') as writer:
268            writer.write(json.dumps(digests_output, sort_keys=True, indent=2))
269        logging.info('Output digests to %s', output_path)
270
271        output_path = os.path.join(output_dir, CONTENT_DETAILS_PATH)
272        with open(output_path, 'w', encoding='utf8') as writer:
273            writer.write(json.dumps(content_details, sort_keys=True, indent=2))
274        logging.info('Output uploaded content details to %s', output_path)
275
276    def _upload_wrapper(self, task: UploadTask) -> Tuple[UploadResult, UploadTask]:
277        """Returns a wrapper for _upload_artifact that associates the result with the task."""
278        return self._upload_artifact(
279            task.artifact,
280            task.working_dir,
281            task.metrics_file,
282        ), task
283
284    @staticmethod
285    def _glob_wrapper(args):
286        """Wrapper function for multiprocessing"""
287        dist_dir, artifact = args
288
289        files = []
290        if artifact.source_path.startswith("./"):
291            files = glob.glob(dist_dir + artifact.source_path[1:])
292        else:
293            files = glob.glob(dist_dir + '/**/' + artifact.source_path, recursive=True)
294        return (files, artifact)
295
296    def create_upload_tasks(
297            self, artifacts: list[ArtifactConfig], working_dir: str, dist_dir: str
298    ) -> list[UploadTask]:
299        """Creates upload tasks for the artifacts."""
300        start = time.time()
301
302        tasks = []
303        skip_files = []
304        # Glob in parallel. Note that ThreadPoolExecutor doesn't help, likely due to GIL.
305        with concurrent.futures.ProcessPoolExecutor() as executor:
306            results = executor.map(
307                    Uploader._glob_wrapper,
308                    [(dist_dir, artifact) for artifact in artifacts],
309            )
310            for files, artifact in results:
311                for file in files:
312                    if os.path.isdir(file):
313                        logging.warning('Ignore artifact match (dir): %s', file)
314                        continue
315                    rel_path = Uploader._get_relative_path(dist_dir, file)
316                    for task_artifact in Uploader._artifact_variations(rel_path, artifact):
317                        path = Uploader._artifact_path(rel_path, task_artifact)
318                        if path in skip_files:
319                            continue
320                        skip_files.append(path)
321                        task_artifact.source_path = file
322                        _, task_metrics_file = tempfile.mkstemp(dir=working_dir)
323                        task = UploadTask(task_artifact, path, working_dir, task_metrics_file)
324                        tasks.append(task)
325
326        logging.info(
327                'Time of file globbing for all artifact configs: %d seconds',
328                time.time() - start,
329        )
330        return tasks
331
332    @staticmethod
333    def _print_tasks(tasks: list[UploadTask]):
334        """Outputs info for upload tasks."""
335        for task in tasks:
336            unzip = '+' if task.artifact.unzip else '-'
337            print(f"{task.path:<40} {unzip} {task.artifact.source_path}")
338        print(f"Total: {len(tasks)} files.")
339
340    def upload(self, artifacts: list[ArtifactConfig], dist_dir: str,
341               max_works: int, dryrun: bool = False) -> cas_metrics_pb2.CasMetrics:
342        """Uploads artifacts to CAS remote"""
343        file_digests = {}
344        content_details = []
345
346        cas_metrics = cas_metrics_pb2.CasMetrics()
347        with tempfile.TemporaryDirectory() as working_dir:
348            logging.info('The working dir is %s', working_dir)
349
350            tasks = self.create_upload_tasks(artifacts, working_dir, dist_dir)
351            logging.info('Uploading %d files, max workers = %d', len(tasks), max_works)
352            if dryrun:
353                Uploader._print_tasks(tasks)
354                return cas_metrics
355
356            # Upload artifacts in parallel
357            logging.info('==== Start uploading %d artifact(s) in parallel ====\n', len(tasks))
358            with concurrent.futures.ThreadPoolExecutor(max_workers=max_works) as executor:
359                futures = [executor.submit(self._upload_wrapper, task) for task in tasks]
360
361                index = 1
362                for future in concurrent.futures.as_completed(futures):
363                    result, task = future.result()
364                    if result:
365                        output = Uploader.read_file(result.log_file) if result else ''
366                        logging.info('---- %s: %s ----\n\n%s', index, task.path, output)
367                    else:
368                        logging.info('---- %s: %s ----\n\n', index, task.path)
369                    index += 1
370                    if result and result.digest:
371                        file_digests[task.path] = result.digest
372                    else:
373                        logging.warning(
374                            'Skip to save the digest of file %s, the uploading may fail',
375                            task.path,
376                        )
377                    if result and result.content_details:
378                        content_details.append({"artifact": task.path,
379                                                "details": result.content_details})
380                    else:
381                        logging.warning('Skip to save the content details of file %s', task.path)
382
383                    if os.path.exists(task.metrics_file):
384                        Uploader._add_artifact_metrics(task.metrics_file, cas_metrics)
385                        os.remove(task.metrics_file)
386            logging.info('==== Uploading of artifacts completed ====')
387
388        self._output_results(
389            dist_dir,
390            file_digests,
391            content_details,
392        )
393        return cas_metrics
394
395    @staticmethod
396    def _add_artifact_metrics(metrics_file: str, cas_metrics: cas_metrics_pb2.CasMetrics):
397        """Adds artifact metrics from metrics_file to cas_metrics."""
398        try:
399            with open(metrics_file, "r", encoding='utf8') as file:
400                json_str = file.read()  # Read the file contents here
401                if json_str:
402                    json_metrics = json.loads(json_str)
403                    cas_metrics.artifacts.append(
404                        json_format.ParseDict(json_metrics, cas_metrics_pb2.ArtifactMetrics())
405                    )
406                else:
407                    logging.exception("Empty file: %s", metrics_file)
408        except FileNotFoundError:
409            logging.exception("File not found: %s", metrics_file)
410        except json.JSONDecodeError as e:
411            logging.exception("Jason decode error: %s for json contents:\n%s", e, json_str)
412        except json_format.ParseError as e:  # Catch any other unexpected errors
413            logging.exception("Error converting Json to protobuf: %s", e)
414
415    @staticmethod
416    def _get_relative_path(dir: str, path: str) -> str:
417        """Returns the relative path from dir, falls back to basename on error."""
418        try:
419            return os.path.relpath(path, dir)
420        except ValueError as e:
421            logging.exception("Error calculating relative path: %s", e)
422            return os.path.basename(path)
423
424    @staticmethod
425    def _artifact_path(path: str, artifact: ArtifactConfig) -> str:
426        """Returns unique artifact path for saving in cas_digest.json."""
427        if artifact.chunk:
428            return CHUNKED_ARTIFACT_NAME_PREFIX + path
429        if artifact.chunk_dir:
430            return CHUNKED_DIR_ARTIFACT_NAME_PREFIX + path
431        return path
432
433    @staticmethod
434    def _artifact_variations(path: str, artifact: ArtifactConfig) -> list[ArtifactConfig]:
435        """Returns variations of the artifact for upload based on artifact attributes."""
436        variations = []
437        if artifact.standard:
438            variations.append(ArtifactConfig(path, artifact.unzip, True, False, False,
439                                            exclude_filters=artifact.exclude_filters))
440        if artifact.chunk:
441            variations.append(ArtifactConfig(path, False, False, True, False,
442                                            exclude_filters=artifact.exclude_filters))
443        if artifact.chunk_dir:
444            variations.append(ArtifactConfig(path, True, False, False, True,
445                                            exclude_filters=artifact.exclude_filters))
446        return variations
447