1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15 16"""Command to upload benchmark test results to a cloud datastore. 17 18This uploader script is typically run periodically as a cron job. It locates, 19in a specified data directory, files that contain benchmark test results. The 20results are written by the "run_and_gather_logs.py" script using the JSON-format 21serialization of the "TestResults" protobuf message (core/util/test_log.proto). 22 23For each file, the uploader reads the "TestResults" data, transforms it into 24the schema used in the datastore (see below), and upload it to the datastore. 25After processing a file, the uploader moves it to a specified archive directory 26for safe-keeping. 27 28The uploader uses file-level exclusive locking (non-blocking flock) which allows 29multiple instances of this script to run concurrently if desired, splitting the 30task among them, each one processing and archiving different files. 31 32The "TestResults" object contains test metadata and multiple benchmark entries. 33The datastore schema splits this information into two Kinds (like tables), one 34holding the test metadata in a single "Test" Entity (like rows), and one holding 35each related benchmark entry in a separate "Entry" Entity. Datastore create a 36unique ID (retrieval key) for each Entity, and this ID is always returned along 37with the data when an Entity is fetched. 38 39* Test: 40 - test: unique name of this test (string) 41 - start: start time of this test run (datetime) 42 - info: JSON-encoded test metadata (string, not indexed) 43 44* Entry: 45 - test: unique name of this test (string) 46 - entry: unique name of this benchmark entry within this test (string) 47 - start: start time of this test run (datetime) 48 - timing: average time (usec) per iteration of this test/entry run (float) 49 - info: JSON-encoded entry metadata (string, not indexed) 50 51A few composite indexes are created (upload_test_benchmarks_index.yaml) for fast 52retrieval of benchmark data and reduced I/O to the client without adding a lot 53of indexing and storage burden: 54 55* Test: (test, start) is indexed to fetch recent start times for a given test. 56 57* Entry: (test, entry, start, timing) is indexed to use projection and only 58fetch the recent (start, timing) data for a given test/entry benchmark. 59 60Example retrieval GQL statements: 61 62* Get the recent start times for a given test: 63 SELECT start FROM Test WHERE test = <test-name> AND 64 start >= <recent-datetime> LIMIT <count> 65 66* Get the recent timings for a given benchmark: 67 SELECT start, timing FROM Entry WHERE test = <test-name> AND 68 entry = <entry-name> AND start >= <recent-datetime> LIMIT <count> 69 70* Get all test names uniquified (e.g. display a list of available tests): 71 SELECT DISTINCT ON (test) test FROM Test 72 73* For a given test (from the list above), get all its entry names. The list of 74 entry names can be extracted from the test "info" metadata for a given test 75 name and start time (e.g. pick the latest start time for that test). 76 SELECT * FROM Test WHERE test = <test-name> AND start = <latest-datetime> 77""" 78 79from __future__ import absolute_import 80from __future__ import division 81from __future__ import print_function 82 83import argparse 84import datetime 85import fcntl 86import json 87import os 88import shutil 89 90from six import text_type 91from google.cloud import datastore 92 93 94def is_real_file(dirpath, fname): 95 fpath = os.path.join(dirpath, fname) 96 return os.path.isfile(fpath) and not os.path.islink(fpath) 97 98 99def get_mtime(dirpath, fname): 100 fpath = os.path.join(dirpath, fname) 101 return os.stat(fpath).st_mtime 102 103 104def list_files_by_mtime(dirpath): 105 """Return a list of files in the directory, sorted in increasing "mtime". 106 107 Return a list of files in the given directory, sorted from older to newer file 108 according to their modification times. Only return actual files, skipping 109 directories, symbolic links, pipes, etc. 110 111 Args: 112 dirpath: directory pathname 113 114 Returns: 115 A list of file names relative to the given directory path. 116 """ 117 files = [f for f in os.listdir(dirpath) if is_real_file(dirpath, f)] 118 return sorted(files, key=lambda f: get_mtime(dirpath, f)) 119 120 121# Note: The file locking code uses flock() instead of lockf() because benchmark 122# files are only opened for reading (not writing) and we still want exclusive 123# locks on them. This imposes the limitation that the data directory must be 124# local, not NFS-mounted. 125def lock(fd): 126 fcntl.flock(fd, fcntl.LOCK_EX) 127 128 129def unlock(fd): 130 fcntl.flock(fd, fcntl.LOCK_UN) 131 132 133def trylock(fd): 134 try: 135 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 136 return True 137 except Exception: # pylint: disable=broad-except 138 return False 139 140 141def upload_benchmark_data(client, data): 142 """Parse benchmark data and use the client to upload it to the datastore. 143 144 Parse the given benchmark data from the serialized JSON-format used to write 145 the test results file. Create the different datastore Entities from that data 146 and upload them to the datastore in a batch using the client connection. 147 148 Args: 149 client: datastore client connection 150 data: JSON-encoded benchmark data 151 """ 152 test_result = json.loads(data) 153 154 test_name = text_type(test_result["name"]) 155 start_time = datetime.datetime.utcfromtimestamp( 156 float(test_result["startTime"])) 157 batch = [] 158 159 # Create the Test Entity containing all the test information as a 160 # non-indexed JSON blob. 161 t_key = client.key("Test") 162 t_val = datastore.Entity(t_key, exclude_from_indexes=["info"]) 163 t_val.update({ 164 "test": test_name, 165 "start": start_time, 166 "info": text_type(data) 167 }) 168 batch.append(t_val) 169 170 # Create one Entry Entity for each benchmark entry. The wall-clock timing is 171 # the attribute to be fetched and displayed. The full entry information is 172 # also stored as a non-indexed JSON blob. 173 for ent in test_result["entries"].get("entry", []): 174 ent_name = text_type(ent["name"]) 175 e_key = client.key("Entry") 176 e_val = datastore.Entity(e_key, exclude_from_indexes=["info"]) 177 e_val.update({ 178 "test": test_name, 179 "start": start_time, 180 "entry": ent_name, 181 "timing": ent["wallTime"], 182 "info": text_type(json.dumps(ent)) 183 }) 184 batch.append(e_val) 185 186 # Put the whole batch of Entities in the datastore. 187 client.put_multi(batch) 188 189 190def upload_benchmark_files(opts): 191 """Find benchmark files, process them, and upload their data to the datastore. 192 193 Locate benchmark files in the data directory, process them, and upload their 194 data to the datastore. After processing each file, move it to the archive 195 directory for safe-keeping. Each file is locked for processing, which allows 196 multiple uploader instances to run concurrently if needed, each one handling 197 different benchmark files, skipping those already locked by another. 198 199 Args: 200 opts: command line options object 201 202 Note: To use locking, the file is first opened, then its descriptor is used to 203 lock and read it. The lock is released when the file is closed. Do not open 204 that same file a 2nd time while the lock is already held, because when that 205 2nd file descriptor is closed, the lock will be released prematurely. 206 """ 207 client = datastore.Client() 208 209 for fname in list_files_by_mtime(opts.datadir): 210 fpath = os.path.join(opts.datadir, fname) 211 try: 212 with open(fpath, "r") as fd: 213 if trylock(fd): 214 upload_benchmark_data(client, fd.read()) 215 shutil.move(fpath, os.path.join(opts.archivedir, fname)) 216 # unlock(fd) -- When "with open()" closes fd, the lock is released. 217 except Exception as e: # pylint: disable=broad-except 218 print("Cannot process '%s', skipping. Error: %s" % (fpath, e)) 219 220 221def parse_cmd_line(): 222 """Parse command line options. 223 224 Returns: 225 The parsed arguments object. 226 """ 227 desc = "Upload benchmark results to datastore." 228 opts = [ 229 ("-a", "--archivedir", str, None, True, 230 "Directory where benchmark files are archived."), 231 ("-d", "--datadir", str, None, True, 232 "Directory of benchmark files to upload."), 233 ] 234 235 parser = argparse.ArgumentParser(description=desc) 236 for opt in opts: 237 parser.add_argument(opt[0], opt[1], type=opt[2], default=opt[3], 238 required=opt[4], help=opt[5]) 239 return parser.parse_args() 240 241 242def main(): 243 options = parse_cmd_line() 244 245 # Check that credentials are specified to access the datastore. 246 if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): 247 raise ValueError("GOOGLE_APPLICATION_CREDENTIALS env. var. is not set.") 248 249 upload_benchmark_files(options) 250 251 252if __name__ == "__main__": 253 main() 254