# -*- coding: utf-8 -*-
# Copyright 2012 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains the perfdiag gsutil command."""
from __future__ import absolute_import
import calendar
from collections import defaultdict
from collections import namedtuple
import contextlib
import cStringIO
import datetime
import httplib
import json
import logging
import math
import multiprocessing
import os
import random
import re
import socket
import string
import subprocess
import tempfile
import time
import boto
import boto.gs.connection
import gslib
from gslib.cloud_api import NotFoundException
from gslib.cloud_api import ServiceException
from gslib.cloud_api_helper import GetDownloadSerializationData
from gslib.command import Command
from gslib.command import DummyArgChecker
from gslib.command_argument import CommandArgument
from gslib.commands import config
from gslib.cs_api_map import ApiSelector
from gslib.exception import CommandException
from gslib.file_part import FilePart
from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
from gslib.storage_url import StorageUrlFromString
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
from gslib.util import CheckFreeSpace
from gslib.util import DivideAndCeil
from gslib.util import GetCloudApiInstance
from gslib.util import GetFileSize
from gslib.util import GetMaxRetryDelay
from gslib.util import HumanReadableToBytes
from gslib.util import IS_LINUX
from gslib.util import MakeBitsHumanReadable
from gslib.util import MakeHumanReadable
from gslib.util import Percentile
from gslib.util import ResumableThreshold
_SYNOPSIS = """
gsutil perfdiag [-i in.json]
gsutil perfdiag [-o out.json] [-n objects] [-c processes]
[-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
[-t tests] url...
"""
_DETAILED_HELP_TEXT = ("""
SYNOPSIS
""" + _SYNOPSIS + """
DESCRIPTION
The perfdiag command runs a suite of diagnostic tests for a given Google
Storage bucket.
The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
the user has write permission. Several test files will be uploaded to and
downloaded from this bucket. All test files will be deleted at the completion
of the diagnostic if it finishes successfully.
gsutil performance can be impacted by many factors at the client, server,
and in-between, such as: CPU speed; available memory; the access path to the
local disk; network bandwidth; contention and error rates along the path
between gsutil and Google; operating system buffering configuration; and
firewalls and other network elements. The perfdiag command is provided so
that customers can run a known measurement suite when troubleshooting
performance problems.
PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM
If the Google Cloud Storage Team asks you to run a performance diagnostic
please use the following command, and email the output file (output.json)
to gs-team@google.com:
gsutil perfdiag -o output.json gs://your-bucket
OPTIONS
-n Sets the number of objects to use when downloading and uploading
files during tests. Defaults to 5.
-c Sets the number of processes to use while running throughput
experiments. The default value is 1.
-k Sets the number of threads per process to use while running
throughput experiments. Each process will receive an equal number
of threads. The default value is 1.
Note: All specified threads and processes will be created, but may
not by saturated with work if too few objects (specified with -n)
and too few components (specified with -y) are specified.
-p Sets the type of parallelism to be used (only applicable when
threads or processes are specified and threads * processes > 1).
The default is to use fan. Must be one of the following:
fan
Use one thread per object. This is akin to using gsutil -m cp,
with sliced object download / parallel composite upload
disabled.
slice
Use Y (specified with -y) threads for each object, transferring
one object at a time. This is akin to using parallel object
download / parallel composite upload, without -m. Sliced
uploads not supported for s3.
both
Use Y (specified with -y) threads for each object, transferring
multiple objects at a time. This is akin to simultaneously
using sliced object download / parallel composite upload and
gsutil -m cp. Sliced uploads not supported for s3.
-y Sets the number of slices to divide each file/object into while
transferring data. Only applicable with the slice (or both)
parallelism type. The default is 4 slices.
-s Sets the size (in bytes) for each of the N (set with -n) objects
used in the read and write throughput tests. The default is 1 MiB.
This can also be specified using byte suffixes such as 500K or 1M.
Note: these values are interpreted as multiples of 1024 (K=1024,
M=1024*1024, etc.)
Note: If rthru_file or wthru_file are performed, N (set with -n)
times as much disk space as specified will be required for the
operation.
-d Sets the directory to store temporary local files in. If not
specified, a default temporary directory will be used.
-t Sets the list of diagnostic tests to perform. The default is to
run the lat, rthru, and wthru diagnostic tests. Must be a
comma-separated list containing one or more of the following:
lat
For N (set with -n) objects, write the object, retrieve its
metadata, read the object, and finally delete the object.
Record the latency of each operation.
list
Write N (set with -n) objects to the bucket, record how long
it takes for the eventually consistent listing call to return
the N objects in its result, delete the N objects, then record
how long it takes listing to stop returning the N objects.
This test is off by default.
rthru
Runs N (set with -n) read operations, with at most C
(set with -c) reads outstanding at any given time.
rthru_file
The same as rthru, but simultaneously writes data to the disk,
to gauge the performance impact of the local disk on downloads.
wthru
Runs N (set with -n) write operations, with at most C
(set with -c) writes outstanding at any given time.
wthru_file
The same as wthru, but simultaneously reads data from the disk,
to gauge the performance impact of the local disk on uploads.
-m Adds metadata to the result JSON file. Multiple -m values can be
specified. Example:
gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname
Each metadata key will be added to the top-level "metadata"
dictionary in the output JSON file.
-o Writes the results of the diagnostic to an output file. The output
is a JSON file containing system information and performance
diagnostic results. The file can be read and reported later using
the -i option.
-i Reads the JSON output file created using the -o command and prints
a formatted description of the results.
MEASURING AVAILABILITY
The perfdiag command ignores the boto num_retries configuration parameter.
Instead, it always retries on HTTP errors in the 500 range and keeps track of
how many 500 errors were encountered during the test. The availability
measurement is reported at the end of the test.
Note that HTTP responses are only recorded when the request was made in a
single process. When using multiple processes or threads, read and write
throughput measurements are performed in an external process, so the
availability numbers reported won't include the throughput measurements.
NOTE
The perfdiag command collects system information. It collects your IP address,
executes DNS queries to Google servers and collects the results, and collects
network statistics information from the output of netstat -s. It will also
attempt to connect to your proxy server if you have one configured. None of
this information will be sent to Google unless you choose to send it.
""")
FileDataTuple = namedtuple(
'FileDataTuple',
'size md5 data')
# Describes one object in a fanned download. If need_to_slice is specified as
# True, the object should be downloaded with the slice strategy. Other field
# names are the same as documented in PerfDiagCommand.Download.
FanDownloadTuple = namedtuple(
'FanDownloadTuple',
'need_to_slice object_name file_name serialization_data')
# Describes one slice in a sliced download.
# Field names are the same as documented in PerfDiagCommand.Download.
SliceDownloadTuple = namedtuple(
'SliceDownloadTuple',
'object_name file_name serialization_data start_byte end_byte')
# Describes one file in a fanned upload. If need_to_slice is specified as
# True, the file should be uploaded with the slice strategy. Other field
# names are the same as documented in PerfDiagCommand.Upload.
FanUploadTuple = namedtuple(
'FanUploadTuple',
'need_to_slice file_name object_name use_file')
# Describes one slice in a sliced upload.
# Field names are the same as documented in PerfDiagCommand.Upload.
SliceUploadTuple = namedtuple(
'SliceUploadTuple',
'file_name object_name use_file file_start file_size')
# Dict storing file_path:FileDataTuple for each temporary file used by
# perfdiag. This data should be kept outside of the PerfDiagCommand class
# since calls to Apply will make copies of all member data.
temp_file_dict = {}
class Error(Exception):
"""Base exception class for this module."""
pass
class InvalidArgument(Error):
"""Raised on invalid arguments to functions."""
pass
def _DownloadObject(cls, args, thread_state=None):
"""Function argument to apply for performing fanned parallel downloads.
Args:
cls: The calling PerfDiagCommand class instance.
args: A FanDownloadTuple object describing this download.
thread_state: gsutil Cloud API instance to use for the operation.
"""
cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
if args.need_to_slice:
cls.PerformSlicedDownload(args.object_name, args.file_name,
args.serialization_data)
else:
cls.Download(args.object_name, args.file_name, args.serialization_data)
def _DownloadSlice(cls, args, thread_state=None):
"""Function argument to apply for performing sliced downloads.
Args:
cls: The calling PerfDiagCommand class instance.
args: A SliceDownloadTuple object describing this download.
thread_state: gsutil Cloud API instance to use for the operation.
"""
cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
cls.Download(args.object_name, args.file_name, args.serialization_data,
args.start_byte, args.end_byte)
def _UploadObject(cls, args, thread_state=None):
"""Function argument to apply for performing fanned parallel uploads.
Args:
cls: The calling PerfDiagCommand class instance.
args: A FanUploadTuple object describing this upload.
thread_state: gsutil Cloud API instance to use for the operation.
"""
cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
if args.need_to_slice:
cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
else:
cls.Upload(args.file_name, args.object_name, args.use_file)
def _UploadSlice(cls, args, thread_state=None):
"""Function argument to apply for performing sliced parallel uploads.
Args:
cls: The calling PerfDiagCommand class instance.
args: A SliceUploadTuple object describing this upload.
thread_state: gsutil Cloud API instance to use for the operation.
"""
cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
cls.Upload(args.file_name, args.object_name, args.use_file,
args.file_start, args.file_size)
def _DeleteWrapper(cls, object_name, thread_state=None):
"""Function argument to apply for performing parallel object deletions.
Args:
cls: The calling PerfDiagCommand class instance.
object_name: The object name to delete from the test bucket.
thread_state: gsutil Cloud API instance to use for the operation.
"""
cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
cls.Delete(object_name)
def _PerfdiagExceptionHandler(cls, e):
"""Simple exception handler to allow post-completion status."""
cls.logger.error(str(e))
def _DummyTrackerCallback(_):
pass
class DummyFile(object):
"""A dummy, file-like object that throws away everything written to it."""
def write(self, *args, **kwargs): # pylint: disable=invalid-name
pass
def close(self): # pylint: disable=invalid-name
pass
# Many functions in perfdiag re-define a temporary function based on a
# variable from a loop, resulting in a false positive from the linter.
# pylint: disable=cell-var-from-loop
class PerfDiagCommand(Command):
"""Implementation of gsutil perfdiag command."""
# Command specification. See base class for documentation.
command_spec = Command.CreateCommandSpec(
'perfdiag',
command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
usage_synopsis=_SYNOPSIS,
min_args=0,
max_args=1,
supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
file_url_ok=False,
provider_url_ok=False,
urls_start_arg=0,
gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
gs_default_api=ApiSelector.JSON,
argparse_arguments=[
CommandArgument.MakeNCloudBucketURLsArgument(1)
]
)
# Help specification. See help_provider.py for documentation.
help_spec = Command.HelpSpec(
help_name='perfdiag',
help_name_aliases=[],
help_type='command_help',
help_one_line_summary='Run performance diagnostic',
help_text=_DETAILED_HELP_TEXT,
subcommand_help_text={},
)
# Byte sizes to use for latency testing files.
# TODO: Consider letting the user specify these sizes with a configuration
# parameter.
test_lat_file_sizes = (
0, # 0 bytes
1024, # 1 KiB
102400, # 100 KiB
1048576, # 1 MiB
)
# Test names.
RTHRU = 'rthru'
RTHRU_FILE = 'rthru_file'
WTHRU = 'wthru'
WTHRU_FILE = 'wthru_file'
LAT = 'lat'
LIST = 'list'
# Parallelism strategies.
FAN = 'fan'
SLICE = 'slice'
BOTH = 'both'
# List of all diagnostic tests.
ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)
# List of diagnostic tests to run by default.
DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
# List of parallelism strategies.
PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)
# Google Cloud Storage XML API endpoint host.
XML_API_HOST = boto.config.get(
'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
# Google Cloud Storage XML API endpoint port.
XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)
# Maximum number of times to retry requests on 5xx errors.
MAX_SERVER_ERROR_RETRIES = 5
# Maximum number of times to retry requests on more serious errors like
# the socket breaking.
MAX_TOTAL_RETRIES = 10
# The default buffer size in boto's Key object is set to 8 KiB. This becomes a
# bottleneck at high throughput rates, so we increase it.
KEY_BUFFER_SIZE = 16384
# The maximum number of bytes to generate pseudo-randomly before beginning
# to repeat bytes. This number was chosen as the next prime larger than 5 MiB.
MAX_UNIQUE_RANDOM_BYTES = 5242883
# Maximum amount of time, in seconds, we will wait for object listings to
# reflect what we expect in the listing tests.
MAX_LISTING_WAIT_TIME = 60.0
def _Exec(self, cmd, raise_on_error=True, return_output=False,
mute_stderr=False):
"""Executes a command in a subprocess.
Args:
cmd: List containing the command to execute.
raise_on_error: Whether or not to raise an exception when a process exits
with a non-zero return code.
return_output: If set to True, the return value of the function is the
stdout of the process.
mute_stderr: If set to True, the stderr of the process is not printed to
the console.
Returns:
The return code of the process or the stdout if return_output is set.
Raises:
Exception: If raise_on_error is set to True and any process exits with a
non-zero return code.
"""
self.logger.debug('Running command: %s', cmd)
stderr = subprocess.PIPE if mute_stderr else None
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
(stdoutdata, _) = p.communicate()
if raise_on_error and p.returncode:
raise CommandException("Received non-zero return code (%d) from "
"subprocess '%s'." % (p.returncode, ' '.join(cmd)))
return stdoutdata if return_output else p.returncode
def _WarnIfLargeData(self):
"""Outputs a warning message if a large amount of data is being used."""
if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
self.logger.info('This is a large operation, and could take a while.')
def _MakeTempFile(self, file_size=0, mem_metadata=False,
mem_data=False, prefix='gsutil_test_file'):
"""Creates a temporary file of the given size and returns its path.
Args:
file_size: The size of the temporary file to create.
mem_metadata: If true, store md5 and file size in memory at
temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
mem_data: If true, store the file data in memory at
temp_file_dict[fpath].data
prefix: The prefix to use for the temporary file. Defaults to
gsutil_test_file.
Returns:
The file path of the created temporary file.
"""
fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
dir=self.directory, text=False)
with os.fdopen(fd, 'wb') as fp:
random_bytes = os.urandom(min(file_size,
self.MAX_UNIQUE_RANDOM_BYTES))
total_bytes_written = 0
while total_bytes_written < file_size:
num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
file_size - total_bytes_written)
fp.write(random_bytes[:num_bytes])
total_bytes_written += num_bytes
if mem_metadata or mem_data:
with open(fpath, 'rb') as fp:
file_size = GetFileSize(fp) if mem_metadata else None
md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
data = fp.read() if mem_data else None
temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)
self.temporary_files.add(fpath)
return fpath
def _SetUp(self):
"""Performs setup operations needed before diagnostics can be run."""
# Stores test result data.
self.results = {}
# Set of file paths for local temporary files.
self.temporary_files = set()
# Set of names for test objects that exist in the test bucket.
self.temporary_objects = set()
# Total number of HTTP requests made.
self.total_requests = 0
# Total number of HTTP 5xx errors.
self.request_errors = 0
# Number of responses, keyed by response code.
self.error_responses_by_code = defaultdict(int)
# Total number of socket errors.
self.connection_breaks = 0
# Boolean to prevent doing cleanup twice.
self.teardown_completed = False
# Create files for latency test.
if self.LAT in self.diag_tests:
self.latency_files = []
for file_size in self.test_lat_file_sizes:
fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
self.latency_files.append(fpath)
# Create files for throughput tests.
if self.diag_tests.intersection(
(self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
# Create a file for warming up the TCP connection.
self.tcp_warmup_file = self._MakeTempFile(
5 * 1024 * 1024, mem_metadata=True, mem_data=True)
# For in memory tests, throughput tests transfer the same object N times
# instead of creating N objects, in order to avoid excessive memory usage.
if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
self.mem_thru_file_name = self._MakeTempFile(
self.thru_filesize, mem_metadata=True, mem_data=True)
self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)
# For tests that use disk I/O, it is necessary to create N objects in
# in order to properly measure the performance impact of seeks.
if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
# List of file names and corresponding object names to use for file
# throughput tests.
self.thru_file_names = []
self.thru_object_names = []
free_disk_space = CheckFreeSpace(self.directory)
if free_disk_space >= self.thru_filesize * self.num_objects:
self.logger.info('\nCreating %d local files each of size %s.'
% (self.num_objects,
MakeHumanReadable(self.thru_filesize)))
self._WarnIfLargeData()
for _ in range(self.num_objects):
file_name = self._MakeTempFile(self.thru_filesize,
mem_metadata=True)
self.thru_file_names.append(file_name)
self.thru_object_names.append(os.path.basename(file_name))
else:
raise CommandException(
'Not enough free disk space for throughput files: '
'%s of disk space required, but only %s available.'
% (MakeHumanReadable(self.thru_filesize * self.num_objects),
MakeHumanReadable(free_disk_space)))
# Dummy file buffer to use for downloading that goes nowhere.
self.discard_sink = DummyFile()
# Filter out misleading progress callback output and the incorrect
# suggestion to use gsutil -m perfdiag.
self.logger.addFilter(self._PerfdiagFilter())
def _TearDown(self):
"""Performs operations to clean things up after performing diagnostics."""
if not self.teardown_completed:
temp_file_dict.clear()
try:
for fpath in self.temporary_files:
os.remove(fpath)
if self.delete_directory:
os.rmdir(self.directory)
except OSError:
pass
if self.threads > 1 or self.processes > 1:
args = [obj for obj in self.temporary_objects]
self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
arg_checker=DummyArgChecker,
parallel_operations_override=True,
process_count=self.processes, thread_count=self.threads)
else:
for object_name in self.temporary_objects:
self.Delete(object_name)
self.teardown_completed = True
@contextlib.contextmanager
def _Time(self, key, bucket):
"""A context manager that measures time.
A context manager that prints a status message before and after executing
the inner command and times how long the inner command takes. Keeps track of
the timing, aggregated by the given key.
Args:
key: The key to insert the timing value into a dictionary bucket.
bucket: A dictionary to place the timing value in.
Yields:
For the context manager.
"""
self.logger.info('%s starting...', key)
t0 = time.time()
yield
t1 = time.time()
bucket[key].append(t1 - t0)
self.logger.info('%s done.', key)
def _RunOperation(self, func):
"""Runs an operation with retry logic.
Args:
func: The function to run.
Returns:
True if the operation succeeds, False if aborted.
"""
# We retry on httplib exceptions that can happen if the socket was closed
# by the remote party or the connection broke because of network issues.
# Only the BotoServerError is counted as a 5xx error towards the retry
# limit.
success = False
server_error_retried = 0
total_retried = 0
i = 0
return_val = None
while not success:
next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay())
try:
return_val = func()
self.total_requests += 1
success = True
except tuple(self.exceptions) as e:
total_retried += 1
if total_retried > self.MAX_TOTAL_RETRIES:
self.logger.info('Reached maximum total retries. Not retrying.')
break
if isinstance(e, ServiceException):
if e.status >= 500:
self.error_responses_by_code[e.status] += 1
self.total_requests += 1
self.request_errors += 1
server_error_retried += 1
time.sleep(next_sleep)
else:
raise
if server_error_retried > self.MAX_SERVER_ERROR_RETRIES:
self.logger.info(
'Reached maximum server error retries. Not retrying.')
break
else:
self.connection_breaks += 1
return return_val
def _RunLatencyTests(self):
"""Runs latency tests."""
# Stores timing information for each category of operation.
self.results['latency'] = defaultdict(list)
for i in range(self.num_objects):
self.logger.info('\nRunning latency iteration %d...', i+1)
for fpath in self.latency_files:
file_data = temp_file_dict[fpath]
url = self.bucket_url.Clone()
url.object_name = os.path.basename(fpath)
file_size = file_data.size
readable_file_size = MakeHumanReadable(file_size)
self.logger.info(
"\nFile of size %s located on disk at '%s' being diagnosed in the "
"cloud at '%s'.", readable_file_size, fpath, url)
upload_target = StorageUrlToUploadObjectMetadata(url)
def _Upload():
io_fp = cStringIO.StringIO(file_data.data)
with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
self.gsutil_api.UploadObject(
io_fp, upload_target, size=file_size, provider=self.provider,
fields=['name'])
self._RunOperation(_Upload)
def _Metadata():
with self._Time('METADATA_%d' % file_size, self.results['latency']):
return self.gsutil_api.GetObjectMetadata(
url.bucket_name, url.object_name,
provider=self.provider, fields=['name', 'contentType',
'mediaLink', 'size'])
# Download will get the metadata first if we don't pass it in.
download_metadata = self._RunOperation(_Metadata)
serialization_data = GetDownloadSerializationData(download_metadata)
def _Download():
with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
self.gsutil_api.GetObjectMedia(
url.bucket_name, url.object_name, self.discard_sink,
provider=self.provider, serialization_data=serialization_data)
self._RunOperation(_Download)
def _Delete():
with self._Time('DELETE_%d' % file_size, self.results['latency']):
self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
provider=self.provider)
self._RunOperation(_Delete)
class _PerfdiagFilter(logging.Filter):
def filter(self, record):
# Used to prevent unnecessary output when using multiprocessing.
msg = record.getMessage()
return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))
def _PerfdiagExceptionHandler(self, e):
"""Simple exception handler to allow post-completion status."""
self.logger.error(str(e))
def PerformFannedDownload(self, need_to_slice, object_names, file_names,
serialization_data):
"""Performs a parallel download of multiple objects using the fan strategy.
Args:
need_to_slice: If True, additionally apply the slice strategy to each
object in object_names.
object_names: A list of object names to be downloaded. Each object must
already exist in the test bucket.
file_names: A list, corresponding by index to object_names, of file names
for downloaded data. If None, discard downloaded data.
serialization_data: A list, corresponding by index to object_names,
of serialization data for each object.
"""
args = []
for i in range(len(object_names)):
file_name = file_names[i] if file_names else None
args.append(FanDownloadTuple(
need_to_slice, object_names[i], file_name,
serialization_data[i]))
self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
('total_requests', 'request_errors'),
arg_checker=DummyArgChecker, parallel_operations_override=True,
process_count=self.processes, thread_count=self.threads)
def PerformSlicedDownload(self, object_name, file_name, serialization_data):
"""Performs a download of an object using the slice strategy.
Args:
object_name: The name of the object to download.
file_name: The name of the file to download data to, or None if data
should be discarded.
serialization_data: The serialization data for the object.
"""
if file_name:
with open(file_name, 'ab') as fp:
fp.truncate(self.thru_filesize)
component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
args = []
for i in range(self.num_slices):
start_byte = i * component_size
end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
start_byte, end_byte))
self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
('total_requests', 'request_errors'),
arg_checker=DummyArgChecker, parallel_operations_override=True,
process_count=self.processes, thread_count=self.threads)
def PerformFannedUpload(self, need_to_slice, file_names, object_names,
use_file):
"""Performs a parallel upload of multiple files using the fan strategy.
The metadata for file_name should be present in temp_file_dict prior
to calling. Also, the data for file_name should be present in temp_file_dict
if use_file is specified as False.
Args:
need_to_slice: If True, additionally apply the slice strategy to each
file in file_names.
file_names: A list of file names to be uploaded.
object_names: A list, corresponding by by index to file_names, of object
names to upload data to.
use_file: If true, use disk I/O, otherwise read upload data from memory.
"""
args = []
for i in range(len(file_names)):
args.append(FanUploadTuple(
need_to_slice, file_names[i], object_names[i], use_file))
self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
('total_requests', 'request_errors'),
arg_checker=DummyArgChecker, parallel_operations_override=True,
process_count=self.processes, thread_count=self.threads)
def PerformSlicedUpload(self, file_name, object_name, use_file):
"""Performs a parallel upload of a file using the slice strategy.
The metadata for file_name should be present in temp_file_dict prior
to calling. Also, the data from for file_name should be present in
temp_file_dict if use_file is specified as False.
Args:
file_name: The name of the file to upload.
object_name: The name of the object to upload to.
use_file: If true, use disk I/O, otherwise read upload data from memory.
"""
# Divide the file into components.
component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
component_object_names = (
[object_name + str(i) for i in range(self.num_slices)])
args = []
for i in range(self.num_slices):
component_start = i * component_size
component_size = min(component_size,
temp_file_dict[file_name].size - component_start)
args.append(SliceUploadTuple(file_name, component_object_names[i],
use_file, component_start, component_size))
# Upload the components in parallel.
try:
self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
('total_requests', 'request_errors'),
arg_checker=DummyArgChecker, parallel_operations_override=True,
process_count=self.processes, thread_count=self.threads)
# Compose the components into an object.
request_components = []
for i in range(self.num_slices):
src_obj_metadata = (
apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
name=component_object_names[i]))
request_components.append(src_obj_metadata)
dst_obj_metadata = apitools_messages.Object()
dst_obj_metadata.name = object_name
dst_obj_metadata.bucket = self.bucket_url.bucket_name
def _Compose():
self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
provider=self.provider)
self._RunOperation(_Compose)
finally:
# Delete the temporary components.
self.Apply(_DeleteWrapper, component_object_names,
_PerfdiagExceptionHandler,
('total_requests', 'request_errors'),
arg_checker=DummyArgChecker, parallel_operations_override=True,
process_count=self.processes, thread_count=self.threads)
def _RunReadThruTests(self, use_file=False):
"""Runs read throughput tests."""
test_name = 'read_throughput_file' if use_file else 'read_throughput'
file_io_string = 'with file I/O' if use_file else ''
self.logger.info(
'\nRunning read throughput tests %s (%s objects of size %s)' %
(file_io_string, self.num_objects,
MakeHumanReadable(self.thru_filesize)))
self._WarnIfLargeData()
self.results[test_name] = {'file_size': self.thru_filesize,
'processes': self.processes,
'threads': self.threads,
'parallelism': self.parallel_strategy
}
# Copy the file(s) to the test bucket, and also get the serialization data
# so that we can pass it to download.
if use_file:
# For test with file I/O use N files on disk to preserve seek performance.
file_names = self.thru_file_names
object_names = self.thru_object_names
serialization_data = []
for i in range(self.num_objects):
self.temporary_objects.add(self.thru_object_names[i])
if self.WTHRU_FILE in self.diag_tests:
# If we ran the WTHRU_FILE test, then the objects already exist.
obj_metadata = self.gsutil_api.GetObjectMetadata(
self.bucket_url.bucket_name, self.thru_object_names[i],
fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
else:
obj_metadata = self.Upload(self.thru_file_names[i],
self.thru_object_names[i], use_file)
# File overwrite causes performance issues with sliced downloads.
# Delete the file and reopen it for download. This matches what a real
# download would look like.
os.unlink(self.thru_file_names[i])
open(self.thru_file_names[i], 'ab').close()
serialization_data.append(GetDownloadSerializationData(obj_metadata))
else:
# For in-memory test only use one file but copy it num_objects times, to
# allow scalability in num_objects.
self.temporary_objects.add(self.mem_thru_object_name)
obj_metadata = self.Upload(self.mem_thru_file_name,
self.mem_thru_object_name, use_file)
file_names = None
object_names = [self.mem_thru_object_name] * self.num_objects
serialization_data = (
[GetDownloadSerializationData(obj_metadata)] * self.num_objects)
# Warmup the TCP connection.
warmup_obj_name = os.path.basename(self.tcp_warmup_file)
self.temporary_objects.add(warmup_obj_name)
self.Upload(self.tcp_warmup_file, warmup_obj_name)
self.Download(warmup_obj_name)
t0 = time.time()
if self.processes == 1 and self.threads == 1:
for i in range(self.num_objects):
file_name = file_names[i] if use_file else None
self.Download(object_names[i], file_name, serialization_data[i])
else:
if self.parallel_strategy in (self.FAN, self.BOTH):
need_to_slice = (self.parallel_strategy == self.BOTH)
self.PerformFannedDownload(need_to_slice, object_names, file_names,
serialization_data)
elif self.parallel_strategy == self.SLICE:
for i in range(self.num_objects):
file_name = file_names[i] if use_file else None
self.PerformSlicedDownload(
object_names[i], file_name, serialization_data[i])
t1 = time.time()
time_took = t1 - t0
total_bytes_copied = self.thru_filesize * self.num_objects
bytes_per_second = total_bytes_copied / time_took
self.results[test_name]['time_took'] = time_took
self.results[test_name]['total_bytes_copied'] = total_bytes_copied
self.results[test_name]['bytes_per_second'] = bytes_per_second
def _RunWriteThruTests(self, use_file=False):
"""Runs write throughput tests."""
test_name = 'write_throughput_file' if use_file else 'write_throughput'
file_io_string = 'with file I/O' if use_file else ''
self.logger.info(
'\nRunning write throughput tests %s (%s objects of size %s)' %
(file_io_string, self.num_objects,
MakeHumanReadable(self.thru_filesize)))
self._WarnIfLargeData()
self.results[test_name] = {'file_size': self.thru_filesize,
'processes': self.processes,
'threads': self.threads,
'parallelism': self.parallel_strategy}
# Warmup the TCP connection.
warmup_obj_name = os.path.basename(self.tcp_warmup_file)
self.temporary_objects.add(warmup_obj_name)
self.Upload(self.tcp_warmup_file, warmup_obj_name)
if use_file:
# For test with file I/O use N files on disk to preserve seek performance.
file_names = self.thru_file_names
object_names = self.thru_object_names
else:
# For in-memory test only use one file but copy it num_objects times, to
# allow for scalability in num_objects.
file_names = [self.mem_thru_file_name] * self.num_objects
object_names = (
[self.mem_thru_object_name + str(i) for i in range(self.num_objects)])
for object_name in object_names:
self.temporary_objects.add(object_name)
t0 = time.time()
if self.processes == 1 and self.threads == 1:
for i in range(self.num_objects):
self.Upload(file_names[i], object_names[i], use_file)
else:
if self.parallel_strategy in (self.FAN, self.BOTH):
need_to_slice = (self.parallel_strategy == self.BOTH)
self.PerformFannedUpload(need_to_slice, file_names, object_names,
use_file)
elif self.parallel_strategy == self.SLICE:
for i in range(self.num_objects):
self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
t1 = time.time()
time_took = t1 - t0
total_bytes_copied = self.thru_filesize * self.num_objects
bytes_per_second = total_bytes_copied / time_took
self.results[test_name]['time_took'] = time_took
self.results[test_name]['total_bytes_copied'] = total_bytes_copied
self.results[test_name]['bytes_per_second'] = bytes_per_second
def _RunListTests(self):
"""Runs eventual consistency listing latency tests."""
self.results['listing'] = {'num_files': self.num_objects}
# Generate N random objects to put into the bucket.
list_prefix = 'gsutil-perfdiag-list-'
list_fpaths = []
list_objects = []
args = []
for _ in xrange(self.num_objects):
fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
prefix=list_prefix)
list_fpaths.append(fpath)
object_name = os.path.basename(fpath)
list_objects.append(object_name)
args.append(FanUploadTuple(False, fpath, object_name, False))
self.temporary_objects.add(object_name)
# Add the objects to the bucket.
self.logger.info(
'\nWriting %s objects for listing test...', self.num_objects)
self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
arg_checker=DummyArgChecker)
list_latencies = []
files_seen = []
total_start_time = time.time()
expected_objects = set(list_objects)
found_objects = set()
def _List():
"""Lists and returns objects in the bucket. Also records latency."""
t0 = time.time()
objects = list(self.gsutil_api.ListObjects(
self.bucket_url.bucket_name, delimiter='/',
provider=self.provider, fields=['items/name']))
t1 = time.time()
list_latencies.append(t1 - t0)
return set([obj.data.name for obj in objects])
self.logger.info(
'Listing bucket %s waiting for %s objects to appear...',
self.bucket_url.bucket_name, self.num_objects)
while expected_objects - found_objects:
def _ListAfterUpload():
names = _List()
found_objects.update(names & expected_objects)
files_seen.append(len(found_objects))
self._RunOperation(_ListAfterUpload)
if expected_objects - found_objects:
if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
self.logger.warning('Maximum time reached waiting for listing.')
break
total_end_time = time.time()
self.results['listing']['insert'] = {
'num_listing_calls': len(list_latencies),
'list_latencies': list_latencies,
'files_seen_after_listing': files_seen,
'time_took': total_end_time - total_start_time,
}
args = [object_name for object_name in list_objects]
self.logger.info(
'Deleting %s objects for listing test...', self.num_objects)
self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
arg_checker=DummyArgChecker)
self.logger.info(
'Listing bucket %s waiting for %s objects to disappear...',
self.bucket_url.bucket_name, self.num_objects)
list_latencies = []
files_seen = []
total_start_time = time.time()
found_objects = set(list_objects)
while found_objects:
def _ListAfterDelete():
names = _List()
found_objects.intersection_update(names)
files_seen.append(len(found_objects))
self._RunOperation(_ListAfterDelete)
if found_objects:
if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
self.logger.warning('Maximum time reached waiting for listing.')
break
total_end_time = time.time()
self.results['listing']['delete'] = {
'num_listing_calls': len(list_latencies),
'list_latencies': list_latencies,
'files_seen_after_listing': files_seen,
'time_took': total_end_time - total_start_time,
}
def Upload(self, file_name, object_name, use_file=False, file_start=0,
file_size=None):
"""Performs an upload to the test bucket.
The file is uploaded to the bucket referred to by self.bucket_url, and has
name object_name.
Args:
file_name: The path to the local file, and the key to its entry in
temp_file_dict.
object_name: The name of the remote object.
use_file: If true, use disk I/O, otherwise read everything from memory.
file_start: The first byte in the file to upload to the object.
(only should be specified for sliced uploads)
file_size: The size of the file to upload.
(only should be specified for sliced uploads)
Returns:
Uploaded Object Metadata.
"""
fp = None
if file_size is None:
file_size = temp_file_dict[file_name].size
upload_url = self.bucket_url.Clone()
upload_url.object_name = object_name
upload_target = StorageUrlToUploadObjectMetadata(upload_url)
try:
if use_file:
fp = FilePart(file_name, file_start, file_size)
else:
data = temp_file_dict[file_name].data[file_start:file_start+file_size]
fp = cStringIO.StringIO(data)
def _InnerUpload():
if file_size < ResumableThreshold():
return self.gsutil_api.UploadObject(
fp, upload_target, provider=self.provider, size=file_size,
fields=['name', 'mediaLink', 'size'])
else:
return self.gsutil_api.UploadObjectResumable(
fp, upload_target, provider=self.provider, size=file_size,
fields=['name', 'mediaLink', 'size'],
tracker_callback=_DummyTrackerCallback)
return self._RunOperation(_InnerUpload)
finally:
if fp:
fp.close()
def Download(self, object_name, file_name=None, serialization_data=None,
start_byte=0, end_byte=None):
"""Downloads an object from the test bucket.
Args:
object_name: The name of the object (in the test bucket) to download.
file_name: Optional file name to write downloaded data to. If None,
downloaded data is discarded immediately.
serialization_data: Optional serialization data, used so that we don't
have to get the metadata before downloading.
start_byte: The first byte in the object to download.
(only should be specified for sliced downloads)
end_byte: The last byte in the object to download.
(only should be specified for sliced downloads)
"""
fp = None
try:
if file_name is not None:
fp = open(file_name, 'r+b')
fp.seek(start_byte)
else:
fp = self.discard_sink
def _InnerDownload():
self.gsutil_api.GetObjectMedia(
self.bucket_url.bucket_name, object_name, fp,
provider=self.provider, start_byte=start_byte, end_byte=end_byte,
serialization_data=serialization_data)
self._RunOperation(_InnerDownload)
finally:
if fp:
fp.close()
def Delete(self, object_name):
"""Deletes an object from the test bucket.
Args:
object_name: The name of the object to delete.
"""
try:
def _InnerDelete():
self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
provider=self.provider)
self._RunOperation(_InnerDelete)
except NotFoundException:
pass
def _GetDiskCounters(self):
"""Retrieves disk I/O statistics for all disks.
Adapted from the psutil module's psutil._pslinux.disk_io_counters:
http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py
Originally distributed under under a BSD license.
Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola.
Returns:
A dictionary containing disk names mapped to the disk counters from
/disk/diskstats.
"""
# iostat documentation states that sectors are equivalent with blocks and
# have a size of 512 bytes since 2.4 kernels. This value is needed to
# calculate the amount of disk I/O in bytes.
sector_size = 512
partitions = []
with open('/proc/partitions', 'r') as f:
lines = f.readlines()[2:]
for line in lines:
_, _, _, name = line.split()
if name[-1].isdigit():
partitions.append(name)
retdict = {}
with open('/proc/diskstats', 'r') as f:
for line in f:
values = line.split()[:11]
_, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values
if name in partitions:
rbytes = int(rbytes) * sector_size
wbytes = int(wbytes) * sector_size
reads = int(reads)
writes = int(writes)
rtime = int(rtime)
wtime = int(wtime)
retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime)
return retdict
def _GetTcpStats(self):
"""Tries to parse out TCP packet information from netstat output.
Returns:
A dictionary containing TCP information, or None if netstat is not
available.
"""
# netstat return code is non-zero for -s on Linux, so don't raise on error.
try:
netstat_output = self._Exec(['netstat', '-s'], return_output=True,
raise_on_error=False)
except OSError:
self.logger.warning('netstat not found on your system; some measurement '
'data will be missing')
return None
netstat_output = netstat_output.strip().lower()
found_tcp = False
tcp_retransmit = None
tcp_received = None
tcp_sent = None
for line in netstat_output.split('\n'):
# Header for TCP section is "Tcp:" in Linux/Mac and
# "TCP Statistics for" in Windows.
if 'tcp:' in line or 'tcp statistics' in line:
found_tcp = True
# Linux == "segments retransmited" (sic), Mac == "retransmit timeouts"
# Windows == "segments retransmitted".
if (found_tcp and tcp_retransmit is None and
('segments retransmited' in line or 'retransmit timeouts' in line or
'segments retransmitted' in line)):
tcp_retransmit = ''.join(c for c in line if c in string.digits)
# Linux+Windows == "segments received", Mac == "packets received".
if (found_tcp and tcp_received is None and
('segments received' in line or 'packets received' in line)):
tcp_received = ''.join(c for c in line if c in string.digits)
# Linux == "segments send out" (sic), Mac+Windows == "packets sent".
if (found_tcp and tcp_sent is None and
('segments send out' in line or 'packets sent' in line or
'segments sent' in line)):
tcp_sent = ''.join(c for c in line if c in string.digits)
result = {}
try:
result['tcp_retransmit'] = int(tcp_retransmit)
result['tcp_received'] = int(tcp_received)
result['tcp_sent'] = int(tcp_sent)
except (ValueError, TypeError):
result['tcp_retransmit'] = None
result['tcp_received'] = None
result['tcp_sent'] = None
return result
def _CollectSysInfo(self):
"""Collects system information."""
sysinfo = {}
# All exceptions that might be raised from socket module calls.
socket_errors = (
socket.error, socket.herror, socket.gaierror, socket.timeout)
# Find out whether HTTPS is enabled in Boto.
sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True)
# Look up proxy info.
proxy_host = boto.config.get('Boto', 'proxy', None)
proxy_port = boto.config.getint('Boto', 'proxy_port', 0)
sysinfo['using_proxy'] = bool(proxy_host)
if boto.config.get('Boto', 'proxy_rdns', False):
self.logger.info('DNS lookups are disallowed in this environment, so '
'some information is not included in this perfdiag run.')
# Get the local IP address from socket lib.
try:
sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
except socket_errors:
sysinfo['ip_address'] = ''
# Record the temporary directory used since it can affect performance, e.g.
# when on a networked filesystem.
sysinfo['tempdir'] = self.directory
# Produces an RFC 2822 compliant GMT timestamp.
sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
time.gmtime())
# Execute a CNAME lookup on Google DNS to find what Google server
# it's routing to.
cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
try:
nslookup_cname_output = self._Exec(cmd, return_output=True)
m = re.search(r' = (?P[^.]+)\.', nslookup_cname_output)
sysinfo['googserv_route'] = m.group('googserv') if m else None
except (CommandException, OSError):
sysinfo['googserv_route'] = ''
# Try to determine the latency of a DNS lookup for the Google hostname
# endpoint. Note: we don't piggyback on gethostbyname_ex below because
# the _ex version requires an extra RTT.
try:
t0 = time.time()
socket.gethostbyname(self.XML_API_HOST)
t1 = time.time()
sysinfo['google_host_dns_latency'] = t1 - t0
except socket_errors:
pass
# Look up IP addresses for Google Server.
try:
(hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST)
sysinfo['googserv_ips'] = ipaddrlist
except socket_errors:
ipaddrlist = []
sysinfo['googserv_ips'] = []
# Reverse lookup the hostnames for the Google Server IPs.
sysinfo['googserv_hostnames'] = []
for googserv_ip in ipaddrlist:
try:
(hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
sysinfo['googserv_hostnames'].append(hostname)
except socket_errors:
pass
# Query o-o to find out what the Google DNS thinks is the user's IP.
try:
cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.']
nslookup_txt_output = self._Exec(cmd, return_output=True)
m = re.search(r'text\s+=\s+"(?P[\.\d]+)"', nslookup_txt_output)
sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None
except (CommandException, OSError):
sysinfo['dns_o-o_ip'] = ''
# Try to determine the latency of connecting to the Google hostname
# endpoint.
sysinfo['google_host_connect_latencies'] = {}
for googserv_ip in ipaddrlist:
try:
sock = socket.socket()
t0 = time.time()
sock.connect((googserv_ip, self.XML_API_PORT))
t1 = time.time()
sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0
except socket_errors:
pass
# If using a proxy, try to determine the latency of a DNS lookup to resolve
# the proxy hostname and the latency of connecting to the proxy.
if proxy_host:
proxy_ip = None
try:
t0 = time.time()
proxy_ip = socket.gethostbyname(proxy_host)
t1 = time.time()
sysinfo['proxy_dns_latency'] = t1 - t0
except socket_errors:
pass
try:
sock = socket.socket()
t0 = time.time()
sock.connect((proxy_ip or proxy_host, proxy_port))
t1 = time.time()
sysinfo['proxy_host_connect_latency'] = t1 - t0
except socket_errors:
pass
# Try and find the number of CPUs in the system if available.
try:
sysinfo['cpu_count'] = multiprocessing.cpu_count()
except NotImplementedError:
sysinfo['cpu_count'] = None
# For *nix platforms, obtain the CPU load.
try:
sysinfo['load_avg'] = list(os.getloadavg())
except (AttributeError, OSError):
sysinfo['load_avg'] = None
# Try and collect memory information from /proc/meminfo if possible.
mem_total = None
mem_free = None
mem_buffers = None
mem_cached = None
try:
with open('/proc/meminfo', 'r') as f:
for line in f:
if line.startswith('MemTotal'):
mem_total = (int(''.join(c for c in line if c in string.digits))
* 1000)
elif line.startswith('MemFree'):
mem_free = (int(''.join(c for c in line if c in string.digits))
* 1000)
elif line.startswith('Buffers'):
mem_buffers = (int(''.join(c for c in line if c in string.digits))
* 1000)
elif line.startswith('Cached'):
mem_cached = (int(''.join(c for c in line if c in string.digits))
* 1000)
except (IOError, ValueError):
pass
sysinfo['meminfo'] = {'mem_total': mem_total,
'mem_free': mem_free,
'mem_buffers': mem_buffers,
'mem_cached': mem_cached}
# Get configuration attributes from config module.
sysinfo['gsutil_config'] = {}
for attr in dir(config):
attr_value = getattr(config, attr)
# Filter out multiline strings that are not useful.
if attr.isupper() and not (isinstance(attr_value, basestring) and
'\n' in attr_value):
sysinfo['gsutil_config'][attr] = attr_value
sysinfo['tcp_proc_values'] = {}
stats_to_check = [
'/proc/sys/net/core/rmem_default',
'/proc/sys/net/core/rmem_max',
'/proc/sys/net/core/wmem_default',
'/proc/sys/net/core/wmem_max',
'/proc/sys/net/ipv4/tcp_timestamps',
'/proc/sys/net/ipv4/tcp_sack',
'/proc/sys/net/ipv4/tcp_window_scaling',
]
for fname in stats_to_check:
try:
with open(fname, 'r') as f:
value = f.read()
sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip()
except IOError:
pass
self.results['sysinfo'] = sysinfo
def _DisplayStats(self, trials):
"""Prints out mean, standard deviation, median, and 90th percentile."""
n = len(trials)
mean = float(sum(trials)) / n
stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n)
print str(n).rjust(6), '',
print ('%.1f' % (mean * 1000)).rjust(9), '',
print ('%.1f' % (stdev * 1000)).rjust(12), '',
print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '',
print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), ''
def _DisplayResults(self):
"""Displays results collected from diagnostic run."""
print
print '=' * 78
print 'DIAGNOSTIC RESULTS'.center(78)
print '=' * 78
if 'latency' in self.results:
print
print '-' * 78
print 'Latency'.center(78)
print '-' * 78
print ('Operation Size Trials Mean (ms) Std Dev (ms) '
'Median (ms) 90th % (ms)')
print ('========= ========= ====== ========= ============ '
'=========== ===========')
for key in sorted(self.results['latency']):
trials = sorted(self.results['latency'][key])
op, numbytes = key.split('_')
numbytes = int(numbytes)
if op == 'METADATA':
print 'Metadata'.rjust(9), '',
print MakeHumanReadable(numbytes).rjust(9), '',
self._DisplayStats(trials)
if op == 'DOWNLOAD':
print 'Download'.rjust(9), '',
print MakeHumanReadable(numbytes).rjust(9), '',
self._DisplayStats(trials)
if op == 'UPLOAD':
print 'Upload'.rjust(9), '',
print MakeHumanReadable(numbytes).rjust(9), '',
self._DisplayStats(trials)
if op == 'DELETE':
print 'Delete'.rjust(9), '',
print MakeHumanReadable(numbytes).rjust(9), '',
self._DisplayStats(trials)
if 'write_throughput' in self.results:
print
print '-' * 78
print 'Write Throughput'.center(78)
print '-' * 78
write_thru = self.results['write_throughput']
print 'Copied %s %s file(s) for a total transfer size of %s.' % (
self.num_objects,
MakeHumanReadable(write_thru['file_size']),
MakeHumanReadable(write_thru['total_bytes_copied']))
print 'Write throughput: %s/s.' % (
MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
print 'Parallelism strategy: %s' % write_thru['parallelism']
if 'write_throughput_file' in self.results:
print
print '-' * 78
print 'Write Throughput With File I/O'.center(78)
print '-' * 78
write_thru_file = self.results['write_throughput_file']
print 'Copied %s %s file(s) for a total transfer size of %s.' % (
self.num_objects,
MakeHumanReadable(write_thru_file['file_size']),
MakeHumanReadable(write_thru_file['total_bytes_copied']))
print 'Write throughput: %s/s.' % (
MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
print 'Parallelism strategy: %s' % write_thru_file['parallelism']
if 'read_throughput' in self.results:
print
print '-' * 78
print 'Read Throughput'.center(78)
print '-' * 78
read_thru = self.results['read_throughput']
print 'Copied %s %s file(s) for a total transfer size of %s.' % (
self.num_objects,
MakeHumanReadable(read_thru['file_size']),
MakeHumanReadable(read_thru['total_bytes_copied']))
print 'Read throughput: %s/s.' % (
MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
print 'Parallelism strategy: %s' % read_thru['parallelism']
if 'read_throughput_file' in self.results:
print
print '-' * 78
print 'Read Throughput With File I/O'.center(78)
print '-' * 78
read_thru_file = self.results['read_throughput_file']
print 'Copied %s %s file(s) for a total transfer size of %s.' % (
self.num_objects,
MakeHumanReadable(read_thru_file['file_size']),
MakeHumanReadable(read_thru_file['total_bytes_copied']))
print 'Read throughput: %s/s.' % (
MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
print 'Parallelism strategy: %s' % read_thru_file['parallelism']
if 'listing' in self.results:
print
print '-' * 78
print 'Listing'.center(78)
print '-' * 78
listing = self.results['listing']
insert = listing['insert']
delete = listing['delete']
print 'After inserting %s objects:' % listing['num_files']
print (' Total time for objects to appear: %.2g seconds' %
insert['time_took'])
print ' Number of listing calls made: %s' % insert['num_listing_calls']
print (' Individual listing call latencies: [%s]' %
', '.join('%.2gs' % lat for lat in insert['list_latencies']))
print (' Files reflected after each call: [%s]' %
', '.join(map(str, insert['files_seen_after_listing'])))
print 'After deleting %s objects:' % listing['num_files']
print (' Total time for objects to appear: %.2g seconds' %
delete['time_took'])
print ' Number of listing calls made: %s' % delete['num_listing_calls']
print (' Individual listing call latencies: [%s]' %
', '.join('%.2gs' % lat for lat in delete['list_latencies']))
print (' Files reflected after each call: [%s]' %
', '.join(map(str, delete['files_seen_after_listing'])))
if 'sysinfo' in self.results:
print
print '-' * 78
print 'System Information'.center(78)
print '-' * 78
info = self.results['sysinfo']
print 'IP Address: \n %s' % info['ip_address']
print 'Temporary Directory: \n %s' % info['tempdir']
print 'Bucket URI: \n %s' % self.results['bucket_uri']
print 'gsutil Version: \n %s' % self.results.get('gsutil_version',
'Unknown')
print 'boto Version: \n %s' % self.results.get('boto_version', 'Unknown')
if 'gmt_timestamp' in info:
ts_string = info['gmt_timestamp']
timetuple = None
try:
# Convert RFC 2822 string to Linux timestamp.
timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000')
except ValueError:
pass
if timetuple:
# Converts the GMT time tuple to local Linux timestamp.
localtime = calendar.timegm(timetuple)
localdt = datetime.datetime.fromtimestamp(localtime)
print 'Measurement time: \n %s' % localdt.strftime(
'%Y-%m-%d %I:%M:%S %p %Z')
print 'Google Server: \n %s' % info['googserv_route']
print ('Google Server IP Addresses: \n %s' %
('\n '.join(info['googserv_ips'])))
print ('Google Server Hostnames: \n %s' %
('\n '.join(info['googserv_hostnames'])))
print 'Google DNS thinks your IP is: \n %s' % info['dns_o-o_ip']
print 'CPU Count: \n %s' % info['cpu_count']
print 'CPU Load Average: \n %s' % info['load_avg']
try:
print ('Total Memory: \n %s' %
MakeHumanReadable(info['meminfo']['mem_total']))
# Free memory is really MemFree + Buffers + Cached.
print 'Free Memory: \n %s' % MakeHumanReadable(
info['meminfo']['mem_free'] +
info['meminfo']['mem_buffers'] +
info['meminfo']['mem_cached'])
except TypeError:
pass
if 'netstat_end' in info and 'netstat_start' in info:
netstat_after = info['netstat_end']
netstat_before = info['netstat_start']
for tcp_type in ('sent', 'received', 'retransmit'):
try:
delta = (netstat_after['tcp_%s' % tcp_type] -
netstat_before['tcp_%s' % tcp_type])
print 'TCP segments %s during test:\n %d' % (tcp_type, delta)
except TypeError:
pass
else:
print ('TCP segment counts not available because "netstat" was not '
'found during test runs')
if 'disk_counters_end' in info and 'disk_counters_start' in info:
print 'Disk Counter Deltas:\n',
disk_after = info['disk_counters_end']
disk_before = info['disk_counters_start']
print '', 'disk'.rjust(6),
for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime',
'wtime']:
print colname.rjust(8),
print
for diskname in sorted(disk_after):
before = disk_before[diskname]
after = disk_after[diskname]
(reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before
(reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after
print '', diskname.rjust(6),
deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1,
wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1]
for delta in deltas:
print str(delta).rjust(8),
print
if 'tcp_proc_values' in info:
print 'TCP /proc values:\n',
for item in info['tcp_proc_values'].iteritems():
print ' %s = %s' % item
if 'boto_https_enabled' in info:
print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled']
if 'using_proxy' in info:
print 'Requests routed through proxy: \n %s' % info['using_proxy']
if 'google_host_dns_latency' in info:
print ('Latency of the DNS lookup for Google Storage server (ms): '
'\n %.1f' % (info['google_host_dns_latency'] * 1000.0))
if 'google_host_connect_latencies' in info:
print 'Latencies connecting to Google Storage server IPs (ms):'
for ip, latency in info['google_host_connect_latencies'].iteritems():
print ' %s = %.1f' % (ip, latency * 1000.0)
if 'proxy_dns_latency' in info:
print ('Latency of the DNS lookup for the configured proxy (ms): '
'\n %.1f' % (info['proxy_dns_latency'] * 1000.0))
if 'proxy_host_connect_latency' in info:
print ('Latency connecting to the configured proxy (ms): \n %.1f' %
(info['proxy_host_connect_latency'] * 1000.0))
if 'request_errors' in self.results and 'total_requests' in self.results:
print
print '-' * 78
print 'In-Process HTTP Statistics'.center(78)
print '-' * 78
total = int(self.results['total_requests'])
numerrors = int(self.results['request_errors'])
numbreaks = int(self.results['connection_breaks'])
availability = (((total - numerrors) / float(total)) * 100
if total > 0 else 100)
print 'Total HTTP requests made: %d' % total
print 'HTTP 5xx errors: %d' % numerrors
print 'HTTP connections broken: %d' % numbreaks
print 'Availability: %.7g%%' % availability
if 'error_responses_by_code' in self.results:
sorted_codes = sorted(
self.results['error_responses_by_code'].iteritems())
if sorted_codes:
print 'Error responses by code:'
print '\n'.join(' %s: %s' % c for c in sorted_codes)
if self.output_file:
with open(self.output_file, 'w') as f:
json.dump(self.results, f, indent=2)
print
print "Output file written to '%s'." % self.output_file
print
def _ParsePositiveInteger(self, val, msg):
"""Tries to convert val argument to a positive integer.
Args:
val: The value (as a string) to convert to a positive integer.
msg: The error message to place in the CommandException on an error.
Returns:
A valid positive integer.
Raises:
CommandException: If the supplied value is not a valid positive integer.
"""
try:
val = int(val)
if val < 1:
raise CommandException(msg)
return val
except ValueError:
raise CommandException(msg)
def _ParseArgs(self):
"""Parses arguments for perfdiag command."""
# From -n.
self.num_objects = 5
# From -c.
self.processes = 1
# From -k.
self.threads = 1
# From -p
self.parallel_strategy = None
# From -y
self.num_slices = 4
# From -s.
self.thru_filesize = 1048576
# From -d.
self.directory = tempfile.gettempdir()
# Keep track of whether or not to delete the directory upon completion.
self.delete_directory = False
# From -t.
self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
# From -o.
self.output_file = None
# From -i.
self.input_file = None
# From -m.
self.metadata_keys = {}
if self.sub_opts:
for o, a in self.sub_opts:
if o == '-n':
self.num_objects = self._ParsePositiveInteger(
a, 'The -n parameter must be a positive integer.')
if o == '-c':
self.processes = self._ParsePositiveInteger(
a, 'The -c parameter must be a positive integer.')
if o == '-k':
self.threads = self._ParsePositiveInteger(
a, 'The -k parameter must be a positive integer.')
if o == '-p':
if a.lower() in self.PARALLEL_STRATEGIES:
self.parallel_strategy = a.lower()
else:
raise CommandException(
"'%s' is not a valid parallelism strategy." % a)
if o == '-y':
self.num_slices = self._ParsePositiveInteger(
a, 'The -y parameter must be a positive integer.')
if o == '-s':
try:
self.thru_filesize = HumanReadableToBytes(a)
except ValueError:
raise CommandException('Invalid -s parameter.')
if o == '-d':
self.directory = a
if not os.path.exists(self.directory):
self.delete_directory = True
os.makedirs(self.directory)
if o == '-t':
self.diag_tests = set()
for test_name in a.strip().split(','):
if test_name.lower() not in self.ALL_DIAG_TESTS:
raise CommandException("List of test names (-t) contains invalid "
"test name '%s'." % test_name)
self.diag_tests.add(test_name)
if o == '-m':
pieces = a.split(':')
if len(pieces) != 2:
raise CommandException(
"Invalid metadata key-value combination '%s'." % a)
key, value = pieces
self.metadata_keys[key] = value
if o == '-o':
self.output_file = os.path.abspath(a)
if o == '-i':
self.input_file = os.path.abspath(a)
if not os.path.isfile(self.input_file):
raise CommandException("Invalid input file (-i): '%s'." % a)
try:
with open(self.input_file, 'r') as f:
self.results = json.load(f)
self.logger.info("Read input file: '%s'.", self.input_file)
except ValueError:
raise CommandException("Could not decode input file (-i): '%s'." %
a)
return
# If parallelism is specified, default parallelism strategy to fan.
if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
self.parallel_strategy = self.FAN
elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
raise CommandException(
'Cannot specify parallelism strategy (-p) without also specifying '
'multiple threads and/or processes (-c and/or -k).')
if not self.args:
self.RaiseWrongNumberOfArgumentsException()
self.bucket_url = StorageUrlFromString(self.args[0])
self.provider = self.bucket_url.scheme
if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
raise CommandException('The perfdiag command requires a URL that '
'specifies a bucket.\n"%s" is not '
'valid.' % self.args[0])
if (self.thru_filesize > HumanReadableToBytes('2GiB') and
(self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
raise CommandException(
'For in-memory tests maximum file size is 2GiB. For larger file '
'sizes, specify rthru_file and/or wthru_file with the -t option.')
perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
slice_not_available = (
self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
self.WTHRU_FILE))
if perform_slice and slice_not_available:
raise CommandException('Sliced uploads are not available for s3. '
'Use -p fan or sequential uploads for s3.')
# Ensure the bucket exists.
self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
provider=self.bucket_url.scheme,
fields=['id'])
self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
socket.timeout, httplib.BadStatusLine,
ServiceException]
# Command entry point.
def RunCommand(self):
"""Called by gsutil when the command is being invoked."""
self._ParseArgs()
if self.input_file:
self._DisplayResults()
return 0
# We turn off retries in the underlying boto library because the
# _RunOperation function handles errors manually so it can count them.
boto.config.set('Boto', 'num_retries', '0')
self.logger.info(
'Number of iterations to run: %d\n'
'Base bucket URI: %s\n'
'Number of processes: %d\n'
'Number of threads: %d\n'
'Parallelism strategy: %s\n'
'Throughput file size: %s\n'
'Diagnostics to run: %s',
self.num_objects,
self.bucket_url,
self.processes,
self.threads,
self.parallel_strategy,
MakeHumanReadable(self.thru_filesize),
(', '.join(self.diag_tests)))
try:
self._SetUp()
# Collect generic system info.
self._CollectSysInfo()
# Collect netstat info and disk counters before tests (and again later).
netstat_output = self._GetTcpStats()
if netstat_output:
self.results['sysinfo']['netstat_start'] = netstat_output
if IS_LINUX:
self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
# Record bucket URL.
self.results['bucket_uri'] = str(self.bucket_url)
self.results['json_format'] = 'perfdiag'
self.results['metadata'] = self.metadata_keys
if self.LAT in self.diag_tests:
self._RunLatencyTests()
if self.RTHRU in self.diag_tests:
self._RunReadThruTests()
# Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
# will be used in RTHRU_FILE to save time and bandwidth.
if self.WTHRU_FILE in self.diag_tests:
self._RunWriteThruTests(use_file=True)
if self.RTHRU_FILE in self.diag_tests:
self._RunReadThruTests(use_file=True)
if self.WTHRU in self.diag_tests:
self._RunWriteThruTests()
if self.LIST in self.diag_tests:
self._RunListTests()
# Collect netstat info and disk counters after tests.
netstat_output = self._GetTcpStats()
if netstat_output:
self.results['sysinfo']['netstat_end'] = netstat_output
if IS_LINUX:
self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters()
self.results['total_requests'] = self.total_requests
self.results['request_errors'] = self.request_errors
self.results['error_responses_by_code'] = self.error_responses_by_code
self.results['connection_breaks'] = self.connection_breaks
self.results['gsutil_version'] = gslib.VERSION
self.results['boto_version'] = boto.__version__
self._TearDown()
self._DisplayResults()
finally:
# TODO: Install signal handlers so this is performed in response to a
# terminating signal; consider multi-threaded object deletes during
# cleanup so it happens quickly.
self._TearDown()
return 0
def StorageUrlToUploadObjectMetadata(storage_url):
if storage_url.IsCloudUrl() and storage_url.IsObject():
upload_target = apitools_messages.Object()
upload_target.name = storage_url.object_name
upload_target.bucket = storage_url.bucket_name
return upload_target
else:
raise CommandException('Non-cloud URL upload target %s was created in '
'perfdiag implemenation.' % storage_url)