1# Copyright 2018 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import json 6import md5 7import os 8import requests 9 10# ==================== Documents digests 11 12def calculate_digest(doc): 13 """ 14 Calculates digests for given document. 15 16 @param doc: document's content 17 18 @returns calculated digests as a string of hexadecimals 19 20 """ 21 22 if ( doc[0:64].find(b'\x1B%-12345X@PJL') >= 0 23 or doc[0:64].find('%!PS-Adobe') >= 0 ): 24 # PJL or Postscript or PJL with encapsulated Postscript 25 # Split by newline character and filter out problematic lines 26 lines = doc.split('\n') 27 for i, line in enumerate(lines): 28 if ( line.startswith('@PJL SET ') 29 or line.startswith('@PJL COMMENT') 30 or line.startswith('@PJL JOB NAME') 31 or line.startswith('trailer << ') 32 or line.startswith('%%Title:') 33 or line.startswith('%%For:') ): 34 lines[i] = '' 35 doc = '\n'.join(lines) 36 elif doc[0:8] == b'\x24\x01\x00\x00\x07\x00\x00\x00': 37 # LIDIL 38 LIDIL_JOBID_1_OFF = 2348 # first job id, offset from the beginning 39 LIDIL_JOBID_2_OFF = 2339 # second job id, offset from the end 40 nd = len(doc) 41 if nd > LIDIL_JOBID_1_OFF + LIDIL_JOBID_2_OFF + 2: 42 # remove the second JOB ID (at the end) 43 doc = doc[:(nd-LIDIL_JOBID_2_OFF)] + doc[(nd-LIDIL_JOBID_2_OFF+2):] 44 # remove the first JOB ID (at the beginning) 45 doc = doc[:LIDIL_JOBID_1_OFF+1] + doc[LIDIL_JOBID_1_OFF+2:] 46 # Calculates hash 47 return md5.new(doc).hexdigest() 48 49 50def parse_digests_file(path_digests, blacklist): 51 """ 52 Parses digests from file. 53 54 @param path_digests: a path to a file with digests 55 @param blacklist: list of keys to omit 56 57 @returns a dictionary with digests indexed by ppd filenames or an empty 58 dictionary if the given file does not exist 59 60 """ 61 digests = dict() 62 blacklist = set(blacklist) 63 if os.path.isfile(path_digests): 64 with open(path_digests, 'rb') as file_digests: 65 lines = file_digests.read().splitlines() 66 for line in lines: 67 cols = line.split() 68 if len(cols) >= 2 and cols[0] not in blacklist: 69 digests[cols[0]] = cols[1] 70 return digests 71 72 73def save_digests_file(path_digests, digests, blacklist): 74 """ 75 Saves list of digests to file. 76 77 @param digests: dictionary with digests (keys are names) 78 @param blacklist: list of keys to ignore 79 80 @return a content of digests file 81 82 """ 83 digests_content = '' 84 names = sorted(set(digests.keys()).difference(blacklist)) 85 for name in names: 86 digest = digests[name] 87 assert name.find('\t') < 0 and name.find('\n') < 0 88 assert digest.find('\t') < 0 and digest.find('\n') < 0 89 digests_content += name + '\t' + digest + '\n' 90 91 with open(path_digests, 'wb') as file_digests: 92 file_digests.write(digests_content) 93 94 95def load_blacklist(path_blacklist): 96 """ 97 Loads blacklist of outputs to omit. 98 99 Raw outputs generated by some PPD files cannot be verified by digests, 100 because they contain variables like date/time, job id or other non-static 101 parameters. This routine returns list of blacklisted ppds. 102 103 @param path_blacklist: a path to the file with the list of blacklisted 104 PPD files 105 106 @returns a list of ppds to ignore during verification of digests 107 108 """ 109 with open(path_blacklist) as file_blacklist: 110 lines = file_blacklist.readlines() 111 112 blacklist = [] 113 for entry in lines: 114 entry = entry.strip() 115 if entry != '': 116 blacklist.append(entry) 117 118 return blacklist 119 120 121# ===================== PPD files on the SCS server 122 123def get_filenames_from_PPD_index(task_id): 124 """ 125 It downloads an index file from the SCS server and extracts names 126 of PPD files from it. 127 128 @param task_id: an order number of an index file to process; this is 129 an integer from the interval [0..20) 130 131 @returns a list of PPD filenames (may contain duplicates) 132 133 """ 134 # calculates a URL of the index file 135 url_metadata = 'https://www.gstatic.com/chromeos_printing/metadata_v2/' 136 url_ppd_index = url_metadata + ('index-%02d.json' % task_id) 137 # donwloads and parses the index file 138 request = requests.get(url_ppd_index) 139 entries = json.loads(request.content) 140 # extracts PPD filenames (the second element in each index entry) 141 output = [] 142 for entry in entries: 143 output.append(entry[1]) 144 # returns a list of extracted filenames 145 return output 146 147 148def download_PPD_file(ppd_file): 149 """ 150 It downloads a PPD file from the SCS server. 151 152 @param ppd_file: a filename of PPD file (neither path nor URL) 153 154 @returns content of the PPD file 155 """ 156 url_ppds = 'https://www.gstatic.com/chromeos_printing/ppds/' 157 request = requests.get(url_ppds + ppd_file) 158 return request.content 159 160 161# ==================== Local filesystem 162 163def list_entries_from_directory( 164 path, 165 with_suffixes=None, nonempty_results=False, 166 include_files=True, include_directories=True ): 167 """ 168 It returns all filenames from given directory. Results may be filtered 169 by filenames suffixes or entries types. 170 171 @param path: a path to directory to list files from 172 @param with_suffixes: if set, only entries with given suffixes are 173 returned; it must be a tuple 174 @param nonempty_results: if True then Exception is raised if there is no 175 results 176 @param include_files: if False, then regular files and links are omitted 177 @param include_directories: if False, directories are omitted 178 179 @returns a nonempty list of entries meeting given criteria 180 181 @raises Exception if no matching filenames were found and 182 nonempty_results is set to True 183 184 """ 185 # lists all files from the directory and filter them by given criteria 186 list_of_files = [] 187 for filename in os.listdir(path): 188 path_entry = os.path.join(path, filename) 189 # check type 190 if os.path.isfile(path_entry): 191 if not include_files: 192 continue 193 elif os.path.isdir(path_entry): 194 if not include_directories: 195 continue 196 else: 197 continue 198 # check suffix 199 if with_suffixes is not None: 200 if not filename.endswith(with_suffixes): 201 continue 202 list_of_files.append(filename) 203 # throws exception if no files were found 204 if nonempty_results and len(list_of_files) == 0: 205 message = 'Directory %s does not contain any ' % path 206 message += 'entries meeting the criteria' 207 raise Exception(message) 208 # returns a non-empty list 209 return list_of_files 210