• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15
16"""Command to upload benchmark test results to a cloud datastore.
17
18This uploader script is typically run periodically as a cron job.  It locates,
19in a specified data directory, files that contain benchmark test results.  The
20results are written by the "run_and_gather_logs.py" script using the JSON-format
21serialization of the "TestResults" protobuf message (core/util/test_log.proto).
22
23For each file, the uploader reads the "TestResults" data, transforms it into
24the schema used in the datastore (see below), and upload it to the datastore.
25After processing a file, the uploader moves it to a specified archive directory
26for safe-keeping.
27
28The uploader uses file-level exclusive locking (non-blocking flock) which allows
29multiple instances of this script to run concurrently if desired, splitting the
30task among them, each one processing and archiving different files.
31
32The "TestResults" object contains test metadata and multiple benchmark entries.
33The datastore schema splits this information into two Kinds (like tables), one
34holding the test metadata in a single "Test" Entity (like rows), and one holding
35each related benchmark entry in a separate "Entry" Entity.  Datastore create a
36unique ID (retrieval key) for each Entity, and this ID is always returned along
37with the data when an Entity is fetched.
38
39* Test:
40  - test:   unique name of this test (string)
41  - start:  start time of this test run (datetime)
42  - info:   JSON-encoded test metadata (string, not indexed)
43
44* Entry:
45  - test:   unique name of this test (string)
46  - entry:  unique name of this benchmark entry within this test (string)
47  - start:  start time of this test run (datetime)
48  - timing: average time (usec) per iteration of this test/entry run (float)
49  - info:   JSON-encoded entry metadata (string, not indexed)
50
51A few composite indexes are created (upload_test_benchmarks_index.yaml) for fast
52retrieval of benchmark data and reduced I/O to the client without adding a lot
53of indexing and storage burden:
54
55* Test: (test, start) is indexed to fetch recent start times for a given test.
56
57* Entry: (test, entry, start, timing) is indexed to use projection and only
58fetch the recent (start, timing) data for a given test/entry benchmark.
59
60Example retrieval GQL statements:
61
62* Get the recent start times for a given test:
63  SELECT start FROM Test WHERE test = <test-name> AND
64    start >= <recent-datetime> LIMIT <count>
65
66* Get the recent timings for a given benchmark:
67  SELECT start, timing FROM Entry WHERE test = <test-name> AND
68    entry = <entry-name> AND start >= <recent-datetime> LIMIT <count>
69
70* Get all test names uniquified (e.g. display a list of available tests):
71  SELECT DISTINCT ON (test) test FROM Test
72
73* For a given test (from the list above), get all its entry names.  The list of
74  entry names can be extracted from the test "info" metadata for a given test
75  name and start time (e.g. pick the latest start time for that test).
76  SELECT * FROM Test WHERE test = <test-name> AND start = <latest-datetime>
77"""
78
79from __future__ import absolute_import
80from __future__ import division
81from __future__ import print_function
82
83import argparse
84import datetime
85import fcntl
86import json
87import os
88import shutil
89
90from six import text_type
91from google.cloud import datastore
92
93
94def is_real_file(dirpath, fname):
95  fpath = os.path.join(dirpath, fname)
96  return os.path.isfile(fpath) and not os.path.islink(fpath)
97
98
99def get_mtime(dirpath, fname):
100  fpath = os.path.join(dirpath, fname)
101  return os.stat(fpath).st_mtime
102
103
104def list_files_by_mtime(dirpath):
105  """Return a list of files in the directory, sorted in increasing "mtime".
106
107  Return a list of files in the given directory, sorted from older to newer file
108  according to their modification times.  Only return actual files, skipping
109  directories, symbolic links, pipes, etc.
110
111  Args:
112    dirpath: directory pathname
113
114  Returns:
115    A list of file names relative to the given directory path.
116  """
117  files = [f for f in os.listdir(dirpath) if is_real_file(dirpath, f)]
118  return sorted(files, key=lambda f: get_mtime(dirpath, f))
119
120
121# Note: The file locking code uses flock() instead of lockf() because benchmark
122# files are only opened for reading (not writing) and we still want exclusive
123# locks on them.  This imposes the limitation that the data directory must be
124# local, not NFS-mounted.
125def lock(fd):
126  fcntl.flock(fd, fcntl.LOCK_EX)
127
128
129def unlock(fd):
130  fcntl.flock(fd, fcntl.LOCK_UN)
131
132
133def trylock(fd):
134  try:
135    fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
136    return True
137  except Exception:  # pylint: disable=broad-except
138    return False
139
140
141def upload_benchmark_data(client, data):
142  """Parse benchmark data and use the client to upload it to the datastore.
143
144  Parse the given benchmark data from the serialized JSON-format used to write
145  the test results file.  Create the different datastore Entities from that data
146  and upload them to the datastore in a batch using the client connection.
147
148  Args:
149    client: datastore client connection
150    data: JSON-encoded benchmark data
151  """
152  test_result = json.loads(data)
153
154  test_name = text_type(test_result["name"])
155  start_time = datetime.datetime.utcfromtimestamp(
156      float(test_result["startTime"]))
157  batch = []
158
159  # Create the Test Entity containing all the test information as a
160  # non-indexed JSON blob.
161  t_key = client.key("Test")
162  t_val = datastore.Entity(t_key, exclude_from_indexes=["info"])
163  t_val.update({
164      "test": test_name,
165      "start": start_time,
166      "info": text_type(data)
167  })
168  batch.append(t_val)
169
170  # Create one Entry Entity for each benchmark entry.  The wall-clock timing is
171  # the attribute to be fetched and displayed.  The full entry information is
172  # also stored as a non-indexed JSON blob.
173  for ent in test_result["entries"].get("entry", []):
174    ent_name = text_type(ent["name"])
175    e_key = client.key("Entry")
176    e_val = datastore.Entity(e_key, exclude_from_indexes=["info"])
177    e_val.update({
178        "test": test_name,
179        "start": start_time,
180        "entry": ent_name,
181        "timing": ent["wallTime"],
182        "info": text_type(json.dumps(ent))
183    })
184    batch.append(e_val)
185
186  # Put the whole batch of Entities in the datastore.
187  client.put_multi(batch)
188
189
190def upload_benchmark_files(opts):
191  """Find benchmark files, process them, and upload their data to the datastore.
192
193  Locate benchmark files in the data directory, process them, and upload their
194  data to the datastore.  After processing each file, move it to the archive
195  directory for safe-keeping.  Each file is locked for processing, which allows
196  multiple uploader instances to run concurrently if needed, each one handling
197  different benchmark files, skipping those already locked by another.
198
199  Args:
200    opts: command line options object
201
202  Note: To use locking, the file is first opened, then its descriptor is used to
203  lock and read it.  The lock is released when the file is closed.  Do not open
204  that same file a 2nd time while the lock is already held, because when that
205  2nd file descriptor is closed, the lock will be released prematurely.
206  """
207  client = datastore.Client()
208
209  for fname in list_files_by_mtime(opts.datadir):
210    fpath = os.path.join(opts.datadir, fname)
211    try:
212      with open(fpath, "r") as fd:
213        if trylock(fd):
214          upload_benchmark_data(client, fd.read())
215          shutil.move(fpath, os.path.join(opts.archivedir, fname))
216          # unlock(fd) -- When "with open()" closes fd, the lock is released.
217    except Exception as e:  # pylint: disable=broad-except
218      print("Cannot process '%s', skipping. Error: %s" % (fpath, e))
219
220
221def parse_cmd_line():
222  """Parse command line options.
223
224  Returns:
225    The parsed arguments object.
226  """
227  desc = "Upload benchmark results to datastore."
228  opts = [
229      ("-a", "--archivedir", str, None, True,
230       "Directory where benchmark files are archived."),
231      ("-d", "--datadir", str, None, True,
232       "Directory of benchmark files to upload."),
233  ]
234
235  parser = argparse.ArgumentParser(description=desc)
236  for opt in opts:
237    parser.add_argument(opt[0], opt[1], type=opt[2], default=opt[3],
238                        required=opt[4], help=opt[5])
239  return parser.parse_args()
240
241
242def main():
243  options = parse_cmd_line()
244
245  # Check that credentials are specified to access the datastore.
246  if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
247    raise ValueError("GOOGLE_APPLICATION_CREDENTIALS env. var. is not set.")
248
249  upload_benchmark_files(options)
250
251
252if __name__ == "__main__":
253  main()
254