# -*- coding: utf-8 -*- # Copyright 2012 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Contains the perfdiag gsutil command.""" from __future__ import absolute_import import calendar from collections import defaultdict from collections import namedtuple import contextlib import cStringIO import datetime import httplib import json import logging import math import multiprocessing import os import random import re import socket import string import subprocess import tempfile import time import boto import boto.gs.connection import gslib from gslib.cloud_api import NotFoundException from gslib.cloud_api import ServiceException from gslib.cloud_api_helper import GetDownloadSerializationData from gslib.command import Command from gslib.command import DummyArgChecker from gslib.command_argument import CommandArgument from gslib.commands import config from gslib.cs_api_map import ApiSelector from gslib.exception import CommandException from gslib.file_part import FilePart from gslib.hashing_helper import CalculateB64EncodedMd5FromContents from gslib.storage_url import StorageUrlFromString from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages from gslib.util import CheckFreeSpace from gslib.util import DivideAndCeil from gslib.util import GetCloudApiInstance from gslib.util import GetFileSize from gslib.util import GetMaxRetryDelay from gslib.util import HumanReadableToBytes from gslib.util import IS_LINUX from gslib.util import MakeBitsHumanReadable from gslib.util import MakeHumanReadable from gslib.util import Percentile from gslib.util import ResumableThreshold _SYNOPSIS = """ gsutil perfdiag [-i in.json] gsutil perfdiag [-o out.json] [-n objects] [-c processes] [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory] [-t tests] url... """ _DETAILED_HELP_TEXT = (""" SYNOPSIS """ + _SYNOPSIS + """ DESCRIPTION The perfdiag command runs a suite of diagnostic tests for a given Google Storage bucket. The 'url' parameter must name an existing bucket (e.g. gs://foo) to which the user has write permission. Several test files will be uploaded to and downloaded from this bucket. All test files will be deleted at the completion of the diagnostic if it finishes successfully. gsutil performance can be impacted by many factors at the client, server, and in-between, such as: CPU speed; available memory; the access path to the local disk; network bandwidth; contention and error rates along the path between gsutil and Google; operating system buffering configuration; and firewalls and other network elements. The perfdiag command is provided so that customers can run a known measurement suite when troubleshooting performance problems. PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM If the Google Cloud Storage Team asks you to run a performance diagnostic please use the following command, and email the output file (output.json) to gs-team@google.com: gsutil perfdiag -o output.json gs://your-bucket OPTIONS -n Sets the number of objects to use when downloading and uploading files during tests. Defaults to 5. -c Sets the number of processes to use while running throughput experiments. The default value is 1. -k Sets the number of threads per process to use while running throughput experiments. Each process will receive an equal number of threads. The default value is 1. Note: All specified threads and processes will be created, but may not by saturated with work if too few objects (specified with -n) and too few components (specified with -y) are specified. -p Sets the type of parallelism to be used (only applicable when threads or processes are specified and threads * processes > 1). The default is to use fan. Must be one of the following: fan Use one thread per object. This is akin to using gsutil -m cp, with sliced object download / parallel composite upload disabled. slice Use Y (specified with -y) threads for each object, transferring one object at a time. This is akin to using parallel object download / parallel composite upload, without -m. Sliced uploads not supported for s3. both Use Y (specified with -y) threads for each object, transferring multiple objects at a time. This is akin to simultaneously using sliced object download / parallel composite upload and gsutil -m cp. Sliced uploads not supported for s3. -y Sets the number of slices to divide each file/object into while transferring data. Only applicable with the slice (or both) parallelism type. The default is 4 slices. -s Sets the size (in bytes) for each of the N (set with -n) objects used in the read and write throughput tests. The default is 1 MiB. This can also be specified using byte suffixes such as 500K or 1M. Note: these values are interpreted as multiples of 1024 (K=1024, M=1024*1024, etc.) Note: If rthru_file or wthru_file are performed, N (set with -n) times as much disk space as specified will be required for the operation. -d Sets the directory to store temporary local files in. If not specified, a default temporary directory will be used. -t Sets the list of diagnostic tests to perform. The default is to run the lat, rthru, and wthru diagnostic tests. Must be a comma-separated list containing one or more of the following: lat For N (set with -n) objects, write the object, retrieve its metadata, read the object, and finally delete the object. Record the latency of each operation. list Write N (set with -n) objects to the bucket, record how long it takes for the eventually consistent listing call to return the N objects in its result, delete the N objects, then record how long it takes listing to stop returning the N objects. This test is off by default. rthru Runs N (set with -n) read operations, with at most C (set with -c) reads outstanding at any given time. rthru_file The same as rthru, but simultaneously writes data to the disk, to gauge the performance impact of the local disk on downloads. wthru Runs N (set with -n) write operations, with at most C (set with -c) writes outstanding at any given time. wthru_file The same as wthru, but simultaneously reads data from the disk, to gauge the performance impact of the local disk on uploads. -m Adds metadata to the result JSON file. Multiple -m values can be specified. Example: gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname Each metadata key will be added to the top-level "metadata" dictionary in the output JSON file. -o Writes the results of the diagnostic to an output file. The output is a JSON file containing system information and performance diagnostic results. The file can be read and reported later using the -i option. -i Reads the JSON output file created using the -o command and prints a formatted description of the results. MEASURING AVAILABILITY The perfdiag command ignores the boto num_retries configuration parameter. Instead, it always retries on HTTP errors in the 500 range and keeps track of how many 500 errors were encountered during the test. The availability measurement is reported at the end of the test. Note that HTTP responses are only recorded when the request was made in a single process. When using multiple processes or threads, read and write throughput measurements are performed in an external process, so the availability numbers reported won't include the throughput measurements. NOTE The perfdiag command collects system information. It collects your IP address, executes DNS queries to Google servers and collects the results, and collects network statistics information from the output of netstat -s. It will also attempt to connect to your proxy server if you have one configured. None of this information will be sent to Google unless you choose to send it. """) FileDataTuple = namedtuple( 'FileDataTuple', 'size md5 data') # Describes one object in a fanned download. If need_to_slice is specified as # True, the object should be downloaded with the slice strategy. Other field # names are the same as documented in PerfDiagCommand.Download. FanDownloadTuple = namedtuple( 'FanDownloadTuple', 'need_to_slice object_name file_name serialization_data') # Describes one slice in a sliced download. # Field names are the same as documented in PerfDiagCommand.Download. SliceDownloadTuple = namedtuple( 'SliceDownloadTuple', 'object_name file_name serialization_data start_byte end_byte') # Describes one file in a fanned upload. If need_to_slice is specified as # True, the file should be uploaded with the slice strategy. Other field # names are the same as documented in PerfDiagCommand.Upload. FanUploadTuple = namedtuple( 'FanUploadTuple', 'need_to_slice file_name object_name use_file') # Describes one slice in a sliced upload. # Field names are the same as documented in PerfDiagCommand.Upload. SliceUploadTuple = namedtuple( 'SliceUploadTuple', 'file_name object_name use_file file_start file_size') # Dict storing file_path:FileDataTuple for each temporary file used by # perfdiag. This data should be kept outside of the PerfDiagCommand class # since calls to Apply will make copies of all member data. temp_file_dict = {} class Error(Exception): """Base exception class for this module.""" pass class InvalidArgument(Error): """Raised on invalid arguments to functions.""" pass def _DownloadObject(cls, args, thread_state=None): """Function argument to apply for performing fanned parallel downloads. Args: cls: The calling PerfDiagCommand class instance. args: A FanDownloadTuple object describing this download. thread_state: gsutil Cloud API instance to use for the operation. """ cls.gsutil_api = GetCloudApiInstance(cls, thread_state) if args.need_to_slice: cls.PerformSlicedDownload(args.object_name, args.file_name, args.serialization_data) else: cls.Download(args.object_name, args.file_name, args.serialization_data) def _DownloadSlice(cls, args, thread_state=None): """Function argument to apply for performing sliced downloads. Args: cls: The calling PerfDiagCommand class instance. args: A SliceDownloadTuple object describing this download. thread_state: gsutil Cloud API instance to use for the operation. """ cls.gsutil_api = GetCloudApiInstance(cls, thread_state) cls.Download(args.object_name, args.file_name, args.serialization_data, args.start_byte, args.end_byte) def _UploadObject(cls, args, thread_state=None): """Function argument to apply for performing fanned parallel uploads. Args: cls: The calling PerfDiagCommand class instance. args: A FanUploadTuple object describing this upload. thread_state: gsutil Cloud API instance to use for the operation. """ cls.gsutil_api = GetCloudApiInstance(cls, thread_state) if args.need_to_slice: cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file) else: cls.Upload(args.file_name, args.object_name, args.use_file) def _UploadSlice(cls, args, thread_state=None): """Function argument to apply for performing sliced parallel uploads. Args: cls: The calling PerfDiagCommand class instance. args: A SliceUploadTuple object describing this upload. thread_state: gsutil Cloud API instance to use for the operation. """ cls.gsutil_api = GetCloudApiInstance(cls, thread_state) cls.Upload(args.file_name, args.object_name, args.use_file, args.file_start, args.file_size) def _DeleteWrapper(cls, object_name, thread_state=None): """Function argument to apply for performing parallel object deletions. Args: cls: The calling PerfDiagCommand class instance. object_name: The object name to delete from the test bucket. thread_state: gsutil Cloud API instance to use for the operation. """ cls.gsutil_api = GetCloudApiInstance(cls, thread_state) cls.Delete(object_name) def _PerfdiagExceptionHandler(cls, e): """Simple exception handler to allow post-completion status.""" cls.logger.error(str(e)) def _DummyTrackerCallback(_): pass class DummyFile(object): """A dummy, file-like object that throws away everything written to it.""" def write(self, *args, **kwargs): # pylint: disable=invalid-name pass def close(self): # pylint: disable=invalid-name pass # Many functions in perfdiag re-define a temporary function based on a # variable from a loop, resulting in a false positive from the linter. # pylint: disable=cell-var-from-loop class PerfDiagCommand(Command): """Implementation of gsutil perfdiag command.""" # Command specification. See base class for documentation. command_spec = Command.CreateCommandSpec( 'perfdiag', command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'], usage_synopsis=_SYNOPSIS, min_args=0, max_args=1, supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:', file_url_ok=False, provider_url_ok=False, urls_start_arg=0, gs_api_support=[ApiSelector.XML, ApiSelector.JSON], gs_default_api=ApiSelector.JSON, argparse_arguments=[ CommandArgument.MakeNCloudBucketURLsArgument(1) ] ) # Help specification. See help_provider.py for documentation. help_spec = Command.HelpSpec( help_name='perfdiag', help_name_aliases=[], help_type='command_help', help_one_line_summary='Run performance diagnostic', help_text=_DETAILED_HELP_TEXT, subcommand_help_text={}, ) # Byte sizes to use for latency testing files. # TODO: Consider letting the user specify these sizes with a configuration # parameter. test_lat_file_sizes = ( 0, # 0 bytes 1024, # 1 KiB 102400, # 100 KiB 1048576, # 1 MiB ) # Test names. RTHRU = 'rthru' RTHRU_FILE = 'rthru_file' WTHRU = 'wthru' WTHRU_FILE = 'wthru_file' LAT = 'lat' LIST = 'list' # Parallelism strategies. FAN = 'fan' SLICE = 'slice' BOTH = 'both' # List of all diagnostic tests. ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST) # List of diagnostic tests to run by default. DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT) # List of parallelism strategies. PARALLEL_STRATEGIES = (FAN, SLICE, BOTH) # Google Cloud Storage XML API endpoint host. XML_API_HOST = boto.config.get( 'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost) # Google Cloud Storage XML API endpoint port. XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80) # Maximum number of times to retry requests on 5xx errors. MAX_SERVER_ERROR_RETRIES = 5 # Maximum number of times to retry requests on more serious errors like # the socket breaking. MAX_TOTAL_RETRIES = 10 # The default buffer size in boto's Key object is set to 8 KiB. This becomes a # bottleneck at high throughput rates, so we increase it. KEY_BUFFER_SIZE = 16384 # The maximum number of bytes to generate pseudo-randomly before beginning # to repeat bytes. This number was chosen as the next prime larger than 5 MiB. MAX_UNIQUE_RANDOM_BYTES = 5242883 # Maximum amount of time, in seconds, we will wait for object listings to # reflect what we expect in the listing tests. MAX_LISTING_WAIT_TIME = 60.0 def _Exec(self, cmd, raise_on_error=True, return_output=False, mute_stderr=False): """Executes a command in a subprocess. Args: cmd: List containing the command to execute. raise_on_error: Whether or not to raise an exception when a process exits with a non-zero return code. return_output: If set to True, the return value of the function is the stdout of the process. mute_stderr: If set to True, the stderr of the process is not printed to the console. Returns: The return code of the process or the stdout if return_output is set. Raises: Exception: If raise_on_error is set to True and any process exits with a non-zero return code. """ self.logger.debug('Running command: %s', cmd) stderr = subprocess.PIPE if mute_stderr else None p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr) (stdoutdata, _) = p.communicate() if raise_on_error and p.returncode: raise CommandException("Received non-zero return code (%d) from " "subprocess '%s'." % (p.returncode, ' '.join(cmd))) return stdoutdata if return_output else p.returncode def _WarnIfLargeData(self): """Outputs a warning message if a large amount of data is being used.""" if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'): self.logger.info('This is a large operation, and could take a while.') def _MakeTempFile(self, file_size=0, mem_metadata=False, mem_data=False, prefix='gsutil_test_file'): """Creates a temporary file of the given size and returns its path. Args: file_size: The size of the temporary file to create. mem_metadata: If true, store md5 and file size in memory at temp_file_dict[fpath].md5, tempfile_data[fpath].file_size. mem_data: If true, store the file data in memory at temp_file_dict[fpath].data prefix: The prefix to use for the temporary file. Defaults to gsutil_test_file. Returns: The file path of the created temporary file. """ fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix, dir=self.directory, text=False) with os.fdopen(fd, 'wb') as fp: random_bytes = os.urandom(min(file_size, self.MAX_UNIQUE_RANDOM_BYTES)) total_bytes_written = 0 while total_bytes_written < file_size: num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES, file_size - total_bytes_written) fp.write(random_bytes[:num_bytes]) total_bytes_written += num_bytes if mem_metadata or mem_data: with open(fpath, 'rb') as fp: file_size = GetFileSize(fp) if mem_metadata else None md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None data = fp.read() if mem_data else None temp_file_dict[fpath] = FileDataTuple(file_size, md5, data) self.temporary_files.add(fpath) return fpath def _SetUp(self): """Performs setup operations needed before diagnostics can be run.""" # Stores test result data. self.results = {} # Set of file paths for local temporary files. self.temporary_files = set() # Set of names for test objects that exist in the test bucket. self.temporary_objects = set() # Total number of HTTP requests made. self.total_requests = 0 # Total number of HTTP 5xx errors. self.request_errors = 0 # Number of responses, keyed by response code. self.error_responses_by_code = defaultdict(int) # Total number of socket errors. self.connection_breaks = 0 # Boolean to prevent doing cleanup twice. self.teardown_completed = False # Create files for latency test. if self.LAT in self.diag_tests: self.latency_files = [] for file_size in self.test_lat_file_sizes: fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True) self.latency_files.append(fpath) # Create files for throughput tests. if self.diag_tests.intersection( (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)): # Create a file for warming up the TCP connection. self.tcp_warmup_file = self._MakeTempFile( 5 * 1024 * 1024, mem_metadata=True, mem_data=True) # For in memory tests, throughput tests transfer the same object N times # instead of creating N objects, in order to avoid excessive memory usage. if self.diag_tests.intersection((self.RTHRU, self.WTHRU)): self.mem_thru_file_name = self._MakeTempFile( self.thru_filesize, mem_metadata=True, mem_data=True) self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name) # For tests that use disk I/O, it is necessary to create N objects in # in order to properly measure the performance impact of seeks. if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)): # List of file names and corresponding object names to use for file # throughput tests. self.thru_file_names = [] self.thru_object_names = [] free_disk_space = CheckFreeSpace(self.directory) if free_disk_space >= self.thru_filesize * self.num_objects: self.logger.info('\nCreating %d local files each of size %s.' % (self.num_objects, MakeHumanReadable(self.thru_filesize))) self._WarnIfLargeData() for _ in range(self.num_objects): file_name = self._MakeTempFile(self.thru_filesize, mem_metadata=True) self.thru_file_names.append(file_name) self.thru_object_names.append(os.path.basename(file_name)) else: raise CommandException( 'Not enough free disk space for throughput files: ' '%s of disk space required, but only %s available.' % (MakeHumanReadable(self.thru_filesize * self.num_objects), MakeHumanReadable(free_disk_space))) # Dummy file buffer to use for downloading that goes nowhere. self.discard_sink = DummyFile() # Filter out misleading progress callback output and the incorrect # suggestion to use gsutil -m perfdiag. self.logger.addFilter(self._PerfdiagFilter()) def _TearDown(self): """Performs operations to clean things up after performing diagnostics.""" if not self.teardown_completed: temp_file_dict.clear() try: for fpath in self.temporary_files: os.remove(fpath) if self.delete_directory: os.rmdir(self.directory) except OSError: pass if self.threads > 1 or self.processes > 1: args = [obj for obj in self.temporary_objects] self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, arg_checker=DummyArgChecker, parallel_operations_override=True, process_count=self.processes, thread_count=self.threads) else: for object_name in self.temporary_objects: self.Delete(object_name) self.teardown_completed = True @contextlib.contextmanager def _Time(self, key, bucket): """A context manager that measures time. A context manager that prints a status message before and after executing the inner command and times how long the inner command takes. Keeps track of the timing, aggregated by the given key. Args: key: The key to insert the timing value into a dictionary bucket. bucket: A dictionary to place the timing value in. Yields: For the context manager. """ self.logger.info('%s starting...', key) t0 = time.time() yield t1 = time.time() bucket[key].append(t1 - t0) self.logger.info('%s done.', key) def _RunOperation(self, func): """Runs an operation with retry logic. Args: func: The function to run. Returns: True if the operation succeeds, False if aborted. """ # We retry on httplib exceptions that can happen if the socket was closed # by the remote party or the connection broke because of network issues. # Only the BotoServerError is counted as a 5xx error towards the retry # limit. success = False server_error_retried = 0 total_retried = 0 i = 0 return_val = None while not success: next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay()) try: return_val = func() self.total_requests += 1 success = True except tuple(self.exceptions) as e: total_retried += 1 if total_retried > self.MAX_TOTAL_RETRIES: self.logger.info('Reached maximum total retries. Not retrying.') break if isinstance(e, ServiceException): if e.status >= 500: self.error_responses_by_code[e.status] += 1 self.total_requests += 1 self.request_errors += 1 server_error_retried += 1 time.sleep(next_sleep) else: raise if server_error_retried > self.MAX_SERVER_ERROR_RETRIES: self.logger.info( 'Reached maximum server error retries. Not retrying.') break else: self.connection_breaks += 1 return return_val def _RunLatencyTests(self): """Runs latency tests.""" # Stores timing information for each category of operation. self.results['latency'] = defaultdict(list) for i in range(self.num_objects): self.logger.info('\nRunning latency iteration %d...', i+1) for fpath in self.latency_files: file_data = temp_file_dict[fpath] url = self.bucket_url.Clone() url.object_name = os.path.basename(fpath) file_size = file_data.size readable_file_size = MakeHumanReadable(file_size) self.logger.info( "\nFile of size %s located on disk at '%s' being diagnosed in the " "cloud at '%s'.", readable_file_size, fpath, url) upload_target = StorageUrlToUploadObjectMetadata(url) def _Upload(): io_fp = cStringIO.StringIO(file_data.data) with self._Time('UPLOAD_%d' % file_size, self.results['latency']): self.gsutil_api.UploadObject( io_fp, upload_target, size=file_size, provider=self.provider, fields=['name']) self._RunOperation(_Upload) def _Metadata(): with self._Time('METADATA_%d' % file_size, self.results['latency']): return self.gsutil_api.GetObjectMetadata( url.bucket_name, url.object_name, provider=self.provider, fields=['name', 'contentType', 'mediaLink', 'size']) # Download will get the metadata first if we don't pass it in. download_metadata = self._RunOperation(_Metadata) serialization_data = GetDownloadSerializationData(download_metadata) def _Download(): with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']): self.gsutil_api.GetObjectMedia( url.bucket_name, url.object_name, self.discard_sink, provider=self.provider, serialization_data=serialization_data) self._RunOperation(_Download) def _Delete(): with self._Time('DELETE_%d' % file_size, self.results['latency']): self.gsutil_api.DeleteObject(url.bucket_name, url.object_name, provider=self.provider) self._RunOperation(_Delete) class _PerfdiagFilter(logging.Filter): def filter(self, record): # Used to prevent unnecessary output when using multiprocessing. msg = record.getMessage() return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg)) def _PerfdiagExceptionHandler(self, e): """Simple exception handler to allow post-completion status.""" self.logger.error(str(e)) def PerformFannedDownload(self, need_to_slice, object_names, file_names, serialization_data): """Performs a parallel download of multiple objects using the fan strategy. Args: need_to_slice: If True, additionally apply the slice strategy to each object in object_names. object_names: A list of object names to be downloaded. Each object must already exist in the test bucket. file_names: A list, corresponding by index to object_names, of file names for downloaded data. If None, discard downloaded data. serialization_data: A list, corresponding by index to object_names, of serialization data for each object. """ args = [] for i in range(len(object_names)): file_name = file_names[i] if file_names else None args.append(FanDownloadTuple( need_to_slice, object_names[i], file_name, serialization_data[i])) self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler, ('total_requests', 'request_errors'), arg_checker=DummyArgChecker, parallel_operations_override=True, process_count=self.processes, thread_count=self.threads) def PerformSlicedDownload(self, object_name, file_name, serialization_data): """Performs a download of an object using the slice strategy. Args: object_name: The name of the object to download. file_name: The name of the file to download data to, or None if data should be discarded. serialization_data: The serialization data for the object. """ if file_name: with open(file_name, 'ab') as fp: fp.truncate(self.thru_filesize) component_size = DivideAndCeil(self.thru_filesize, self.num_slices) args = [] for i in range(self.num_slices): start_byte = i * component_size end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1) args.append(SliceDownloadTuple(object_name, file_name, serialization_data, start_byte, end_byte)) self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler, ('total_requests', 'request_errors'), arg_checker=DummyArgChecker, parallel_operations_override=True, process_count=self.processes, thread_count=self.threads) def PerformFannedUpload(self, need_to_slice, file_names, object_names, use_file): """Performs a parallel upload of multiple files using the fan strategy. The metadata for file_name should be present in temp_file_dict prior to calling. Also, the data for file_name should be present in temp_file_dict if use_file is specified as False. Args: need_to_slice: If True, additionally apply the slice strategy to each file in file_names. file_names: A list of file names to be uploaded. object_names: A list, corresponding by by index to file_names, of object names to upload data to. use_file: If true, use disk I/O, otherwise read upload data from memory. """ args = [] for i in range(len(file_names)): args.append(FanUploadTuple( need_to_slice, file_names[i], object_names[i], use_file)) self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, ('total_requests', 'request_errors'), arg_checker=DummyArgChecker, parallel_operations_override=True, process_count=self.processes, thread_count=self.threads) def PerformSlicedUpload(self, file_name, object_name, use_file): """Performs a parallel upload of a file using the slice strategy. The metadata for file_name should be present in temp_file_dict prior to calling. Also, the data from for file_name should be present in temp_file_dict if use_file is specified as False. Args: file_name: The name of the file to upload. object_name: The name of the object to upload to. use_file: If true, use disk I/O, otherwise read upload data from memory. """ # Divide the file into components. component_size = DivideAndCeil(self.thru_filesize, self.num_slices) component_object_names = ( [object_name + str(i) for i in range(self.num_slices)]) args = [] for i in range(self.num_slices): component_start = i * component_size component_size = min(component_size, temp_file_dict[file_name].size - component_start) args.append(SliceUploadTuple(file_name, component_object_names[i], use_file, component_start, component_size)) # Upload the components in parallel. try: self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler, ('total_requests', 'request_errors'), arg_checker=DummyArgChecker, parallel_operations_override=True, process_count=self.processes, thread_count=self.threads) # Compose the components into an object. request_components = [] for i in range(self.num_slices): src_obj_metadata = ( apitools_messages.ComposeRequest.SourceObjectsValueListEntry( name=component_object_names[i])) request_components.append(src_obj_metadata) dst_obj_metadata = apitools_messages.Object() dst_obj_metadata.name = object_name dst_obj_metadata.bucket = self.bucket_url.bucket_name def _Compose(): self.gsutil_api.ComposeObject(request_components, dst_obj_metadata, provider=self.provider) self._RunOperation(_Compose) finally: # Delete the temporary components. self.Apply(_DeleteWrapper, component_object_names, _PerfdiagExceptionHandler, ('total_requests', 'request_errors'), arg_checker=DummyArgChecker, parallel_operations_override=True, process_count=self.processes, thread_count=self.threads) def _RunReadThruTests(self, use_file=False): """Runs read throughput tests.""" test_name = 'read_throughput_file' if use_file else 'read_throughput' file_io_string = 'with file I/O' if use_file else '' self.logger.info( '\nRunning read throughput tests %s (%s objects of size %s)' % (file_io_string, self.num_objects, MakeHumanReadable(self.thru_filesize))) self._WarnIfLargeData() self.results[test_name] = {'file_size': self.thru_filesize, 'processes': self.processes, 'threads': self.threads, 'parallelism': self.parallel_strategy } # Copy the file(s) to the test bucket, and also get the serialization data # so that we can pass it to download. if use_file: # For test with file I/O use N files on disk to preserve seek performance. file_names = self.thru_file_names object_names = self.thru_object_names serialization_data = [] for i in range(self.num_objects): self.temporary_objects.add(self.thru_object_names[i]) if self.WTHRU_FILE in self.diag_tests: # If we ran the WTHRU_FILE test, then the objects already exist. obj_metadata = self.gsutil_api.GetObjectMetadata( self.bucket_url.bucket_name, self.thru_object_names[i], fields=['size', 'mediaLink'], provider=self.bucket_url.scheme) else: obj_metadata = self.Upload(self.thru_file_names[i], self.thru_object_names[i], use_file) # File overwrite causes performance issues with sliced downloads. # Delete the file and reopen it for download. This matches what a real # download would look like. os.unlink(self.thru_file_names[i]) open(self.thru_file_names[i], 'ab').close() serialization_data.append(GetDownloadSerializationData(obj_metadata)) else: # For in-memory test only use one file but copy it num_objects times, to # allow scalability in num_objects. self.temporary_objects.add(self.mem_thru_object_name) obj_metadata = self.Upload(self.mem_thru_file_name, self.mem_thru_object_name, use_file) file_names = None object_names = [self.mem_thru_object_name] * self.num_objects serialization_data = ( [GetDownloadSerializationData(obj_metadata)] * self.num_objects) # Warmup the TCP connection. warmup_obj_name = os.path.basename(self.tcp_warmup_file) self.temporary_objects.add(warmup_obj_name) self.Upload(self.tcp_warmup_file, warmup_obj_name) self.Download(warmup_obj_name) t0 = time.time() if self.processes == 1 and self.threads == 1: for i in range(self.num_objects): file_name = file_names[i] if use_file else None self.Download(object_names[i], file_name, serialization_data[i]) else: if self.parallel_strategy in (self.FAN, self.BOTH): need_to_slice = (self.parallel_strategy == self.BOTH) self.PerformFannedDownload(need_to_slice, object_names, file_names, serialization_data) elif self.parallel_strategy == self.SLICE: for i in range(self.num_objects): file_name = file_names[i] if use_file else None self.PerformSlicedDownload( object_names[i], file_name, serialization_data[i]) t1 = time.time() time_took = t1 - t0 total_bytes_copied = self.thru_filesize * self.num_objects bytes_per_second = total_bytes_copied / time_took self.results[test_name]['time_took'] = time_took self.results[test_name]['total_bytes_copied'] = total_bytes_copied self.results[test_name]['bytes_per_second'] = bytes_per_second def _RunWriteThruTests(self, use_file=False): """Runs write throughput tests.""" test_name = 'write_throughput_file' if use_file else 'write_throughput' file_io_string = 'with file I/O' if use_file else '' self.logger.info( '\nRunning write throughput tests %s (%s objects of size %s)' % (file_io_string, self.num_objects, MakeHumanReadable(self.thru_filesize))) self._WarnIfLargeData() self.results[test_name] = {'file_size': self.thru_filesize, 'processes': self.processes, 'threads': self.threads, 'parallelism': self.parallel_strategy} # Warmup the TCP connection. warmup_obj_name = os.path.basename(self.tcp_warmup_file) self.temporary_objects.add(warmup_obj_name) self.Upload(self.tcp_warmup_file, warmup_obj_name) if use_file: # For test with file I/O use N files on disk to preserve seek performance. file_names = self.thru_file_names object_names = self.thru_object_names else: # For in-memory test only use one file but copy it num_objects times, to # allow for scalability in num_objects. file_names = [self.mem_thru_file_name] * self.num_objects object_names = ( [self.mem_thru_object_name + str(i) for i in range(self.num_objects)]) for object_name in object_names: self.temporary_objects.add(object_name) t0 = time.time() if self.processes == 1 and self.threads == 1: for i in range(self.num_objects): self.Upload(file_names[i], object_names[i], use_file) else: if self.parallel_strategy in (self.FAN, self.BOTH): need_to_slice = (self.parallel_strategy == self.BOTH) self.PerformFannedUpload(need_to_slice, file_names, object_names, use_file) elif self.parallel_strategy == self.SLICE: for i in range(self.num_objects): self.PerformSlicedUpload(file_names[i], object_names[i], use_file) t1 = time.time() time_took = t1 - t0 total_bytes_copied = self.thru_filesize * self.num_objects bytes_per_second = total_bytes_copied / time_took self.results[test_name]['time_took'] = time_took self.results[test_name]['total_bytes_copied'] = total_bytes_copied self.results[test_name]['bytes_per_second'] = bytes_per_second def _RunListTests(self): """Runs eventual consistency listing latency tests.""" self.results['listing'] = {'num_files': self.num_objects} # Generate N random objects to put into the bucket. list_prefix = 'gsutil-perfdiag-list-' list_fpaths = [] list_objects = [] args = [] for _ in xrange(self.num_objects): fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True, prefix=list_prefix) list_fpaths.append(fpath) object_name = os.path.basename(fpath) list_objects.append(object_name) args.append(FanUploadTuple(False, fpath, object_name, False)) self.temporary_objects.add(object_name) # Add the objects to the bucket. self.logger.info( '\nWriting %s objects for listing test...', self.num_objects) self.Apply(_UploadObject, args, _PerfdiagExceptionHandler, arg_checker=DummyArgChecker) list_latencies = [] files_seen = [] total_start_time = time.time() expected_objects = set(list_objects) found_objects = set() def _List(): """Lists and returns objects in the bucket. Also records latency.""" t0 = time.time() objects = list(self.gsutil_api.ListObjects( self.bucket_url.bucket_name, delimiter='/', provider=self.provider, fields=['items/name'])) t1 = time.time() list_latencies.append(t1 - t0) return set([obj.data.name for obj in objects]) self.logger.info( 'Listing bucket %s waiting for %s objects to appear...', self.bucket_url.bucket_name, self.num_objects) while expected_objects - found_objects: def _ListAfterUpload(): names = _List() found_objects.update(names & expected_objects) files_seen.append(len(found_objects)) self._RunOperation(_ListAfterUpload) if expected_objects - found_objects: if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: self.logger.warning('Maximum time reached waiting for listing.') break total_end_time = time.time() self.results['listing']['insert'] = { 'num_listing_calls': len(list_latencies), 'list_latencies': list_latencies, 'files_seen_after_listing': files_seen, 'time_took': total_end_time - total_start_time, } args = [object_name for object_name in list_objects] self.logger.info( 'Deleting %s objects for listing test...', self.num_objects) self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler, arg_checker=DummyArgChecker) self.logger.info( 'Listing bucket %s waiting for %s objects to disappear...', self.bucket_url.bucket_name, self.num_objects) list_latencies = [] files_seen = [] total_start_time = time.time() found_objects = set(list_objects) while found_objects: def _ListAfterDelete(): names = _List() found_objects.intersection_update(names) files_seen.append(len(found_objects)) self._RunOperation(_ListAfterDelete) if found_objects: if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME: self.logger.warning('Maximum time reached waiting for listing.') break total_end_time = time.time() self.results['listing']['delete'] = { 'num_listing_calls': len(list_latencies), 'list_latencies': list_latencies, 'files_seen_after_listing': files_seen, 'time_took': total_end_time - total_start_time, } def Upload(self, file_name, object_name, use_file=False, file_start=0, file_size=None): """Performs an upload to the test bucket. The file is uploaded to the bucket referred to by self.bucket_url, and has name object_name. Args: file_name: The path to the local file, and the key to its entry in temp_file_dict. object_name: The name of the remote object. use_file: If true, use disk I/O, otherwise read everything from memory. file_start: The first byte in the file to upload to the object. (only should be specified for sliced uploads) file_size: The size of the file to upload. (only should be specified for sliced uploads) Returns: Uploaded Object Metadata. """ fp = None if file_size is None: file_size = temp_file_dict[file_name].size upload_url = self.bucket_url.Clone() upload_url.object_name = object_name upload_target = StorageUrlToUploadObjectMetadata(upload_url) try: if use_file: fp = FilePart(file_name, file_start, file_size) else: data = temp_file_dict[file_name].data[file_start:file_start+file_size] fp = cStringIO.StringIO(data) def _InnerUpload(): if file_size < ResumableThreshold(): return self.gsutil_api.UploadObject( fp, upload_target, provider=self.provider, size=file_size, fields=['name', 'mediaLink', 'size']) else: return self.gsutil_api.UploadObjectResumable( fp, upload_target, provider=self.provider, size=file_size, fields=['name', 'mediaLink', 'size'], tracker_callback=_DummyTrackerCallback) return self._RunOperation(_InnerUpload) finally: if fp: fp.close() def Download(self, object_name, file_name=None, serialization_data=None, start_byte=0, end_byte=None): """Downloads an object from the test bucket. Args: object_name: The name of the object (in the test bucket) to download. file_name: Optional file name to write downloaded data to. If None, downloaded data is discarded immediately. serialization_data: Optional serialization data, used so that we don't have to get the metadata before downloading. start_byte: The first byte in the object to download. (only should be specified for sliced downloads) end_byte: The last byte in the object to download. (only should be specified for sliced downloads) """ fp = None try: if file_name is not None: fp = open(file_name, 'r+b') fp.seek(start_byte) else: fp = self.discard_sink def _InnerDownload(): self.gsutil_api.GetObjectMedia( self.bucket_url.bucket_name, object_name, fp, provider=self.provider, start_byte=start_byte, end_byte=end_byte, serialization_data=serialization_data) self._RunOperation(_InnerDownload) finally: if fp: fp.close() def Delete(self, object_name): """Deletes an object from the test bucket. Args: object_name: The name of the object to delete. """ try: def _InnerDelete(): self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name, provider=self.provider) self._RunOperation(_InnerDelete) except NotFoundException: pass def _GetDiskCounters(self): """Retrieves disk I/O statistics for all disks. Adapted from the psutil module's psutil._pslinux.disk_io_counters: http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py Originally distributed under under a BSD license. Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola. Returns: A dictionary containing disk names mapped to the disk counters from /disk/diskstats. """ # iostat documentation states that sectors are equivalent with blocks and # have a size of 512 bytes since 2.4 kernels. This value is needed to # calculate the amount of disk I/O in bytes. sector_size = 512 partitions = [] with open('/proc/partitions', 'r') as f: lines = f.readlines()[2:] for line in lines: _, _, _, name = line.split() if name[-1].isdigit(): partitions.append(name) retdict = {} with open('/proc/diskstats', 'r') as f: for line in f: values = line.split()[:11] _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values if name in partitions: rbytes = int(rbytes) * sector_size wbytes = int(wbytes) * sector_size reads = int(reads) writes = int(writes) rtime = int(rtime) wtime = int(wtime) retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime) return retdict def _GetTcpStats(self): """Tries to parse out TCP packet information from netstat output. Returns: A dictionary containing TCP information, or None if netstat is not available. """ # netstat return code is non-zero for -s on Linux, so don't raise on error. try: netstat_output = self._Exec(['netstat', '-s'], return_output=True, raise_on_error=False) except OSError: self.logger.warning('netstat not found on your system; some measurement ' 'data will be missing') return None netstat_output = netstat_output.strip().lower() found_tcp = False tcp_retransmit = None tcp_received = None tcp_sent = None for line in netstat_output.split('\n'): # Header for TCP section is "Tcp:" in Linux/Mac and # "TCP Statistics for" in Windows. if 'tcp:' in line or 'tcp statistics' in line: found_tcp = True # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts" # Windows == "segments retransmitted". if (found_tcp and tcp_retransmit is None and ('segments retransmited' in line or 'retransmit timeouts' in line or 'segments retransmitted' in line)): tcp_retransmit = ''.join(c for c in line if c in string.digits) # Linux+Windows == "segments received", Mac == "packets received". if (found_tcp and tcp_received is None and ('segments received' in line or 'packets received' in line)): tcp_received = ''.join(c for c in line if c in string.digits) # Linux == "segments send out" (sic), Mac+Windows == "packets sent". if (found_tcp and tcp_sent is None and ('segments send out' in line or 'packets sent' in line or 'segments sent' in line)): tcp_sent = ''.join(c for c in line if c in string.digits) result = {} try: result['tcp_retransmit'] = int(tcp_retransmit) result['tcp_received'] = int(tcp_received) result['tcp_sent'] = int(tcp_sent) except (ValueError, TypeError): result['tcp_retransmit'] = None result['tcp_received'] = None result['tcp_sent'] = None return result def _CollectSysInfo(self): """Collects system information.""" sysinfo = {} # All exceptions that might be raised from socket module calls. socket_errors = ( socket.error, socket.herror, socket.gaierror, socket.timeout) # Find out whether HTTPS is enabled in Boto. sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True) # Look up proxy info. proxy_host = boto.config.get('Boto', 'proxy', None) proxy_port = boto.config.getint('Boto', 'proxy_port', 0) sysinfo['using_proxy'] = bool(proxy_host) if boto.config.get('Boto', 'proxy_rdns', False): self.logger.info('DNS lookups are disallowed in this environment, so ' 'some information is not included in this perfdiag run.') # Get the local IP address from socket lib. try: sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname()) except socket_errors: sysinfo['ip_address'] = '' # Record the temporary directory used since it can affect performance, e.g. # when on a networked filesystem. sysinfo['tempdir'] = self.directory # Produces an RFC 2822 compliant GMT timestamp. sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000', time.gmtime()) # Execute a CNAME lookup on Google DNS to find what Google server # it's routing to. cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST] try: nslookup_cname_output = self._Exec(cmd, return_output=True) m = re.search(r' = (?P[^.]+)\.', nslookup_cname_output) sysinfo['googserv_route'] = m.group('googserv') if m else None except (CommandException, OSError): sysinfo['googserv_route'] = '' # Try to determine the latency of a DNS lookup for the Google hostname # endpoint. Note: we don't piggyback on gethostbyname_ex below because # the _ex version requires an extra RTT. try: t0 = time.time() socket.gethostbyname(self.XML_API_HOST) t1 = time.time() sysinfo['google_host_dns_latency'] = t1 - t0 except socket_errors: pass # Look up IP addresses for Google Server. try: (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST) sysinfo['googserv_ips'] = ipaddrlist except socket_errors: ipaddrlist = [] sysinfo['googserv_ips'] = [] # Reverse lookup the hostnames for the Google Server IPs. sysinfo['googserv_hostnames'] = [] for googserv_ip in ipaddrlist: try: (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip) sysinfo['googserv_hostnames'].append(hostname) except socket_errors: pass # Query o-o to find out what the Google DNS thinks is the user's IP. try: cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.'] nslookup_txt_output = self._Exec(cmd, return_output=True) m = re.search(r'text\s+=\s+"(?P[\.\d]+)"', nslookup_txt_output) sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None except (CommandException, OSError): sysinfo['dns_o-o_ip'] = '' # Try to determine the latency of connecting to the Google hostname # endpoint. sysinfo['google_host_connect_latencies'] = {} for googserv_ip in ipaddrlist: try: sock = socket.socket() t0 = time.time() sock.connect((googserv_ip, self.XML_API_PORT)) t1 = time.time() sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0 except socket_errors: pass # If using a proxy, try to determine the latency of a DNS lookup to resolve # the proxy hostname and the latency of connecting to the proxy. if proxy_host: proxy_ip = None try: t0 = time.time() proxy_ip = socket.gethostbyname(proxy_host) t1 = time.time() sysinfo['proxy_dns_latency'] = t1 - t0 except socket_errors: pass try: sock = socket.socket() t0 = time.time() sock.connect((proxy_ip or proxy_host, proxy_port)) t1 = time.time() sysinfo['proxy_host_connect_latency'] = t1 - t0 except socket_errors: pass # Try and find the number of CPUs in the system if available. try: sysinfo['cpu_count'] = multiprocessing.cpu_count() except NotImplementedError: sysinfo['cpu_count'] = None # For *nix platforms, obtain the CPU load. try: sysinfo['load_avg'] = list(os.getloadavg()) except (AttributeError, OSError): sysinfo['load_avg'] = None # Try and collect memory information from /proc/meminfo if possible. mem_total = None mem_free = None mem_buffers = None mem_cached = None try: with open('/proc/meminfo', 'r') as f: for line in f: if line.startswith('MemTotal'): mem_total = (int(''.join(c for c in line if c in string.digits)) * 1000) elif line.startswith('MemFree'): mem_free = (int(''.join(c for c in line if c in string.digits)) * 1000) elif line.startswith('Buffers'): mem_buffers = (int(''.join(c for c in line if c in string.digits)) * 1000) elif line.startswith('Cached'): mem_cached = (int(''.join(c for c in line if c in string.digits)) * 1000) except (IOError, ValueError): pass sysinfo['meminfo'] = {'mem_total': mem_total, 'mem_free': mem_free, 'mem_buffers': mem_buffers, 'mem_cached': mem_cached} # Get configuration attributes from config module. sysinfo['gsutil_config'] = {} for attr in dir(config): attr_value = getattr(config, attr) # Filter out multiline strings that are not useful. if attr.isupper() and not (isinstance(attr_value, basestring) and '\n' in attr_value): sysinfo['gsutil_config'][attr] = attr_value sysinfo['tcp_proc_values'] = {} stats_to_check = [ '/proc/sys/net/core/rmem_default', '/proc/sys/net/core/rmem_max', '/proc/sys/net/core/wmem_default', '/proc/sys/net/core/wmem_max', '/proc/sys/net/ipv4/tcp_timestamps', '/proc/sys/net/ipv4/tcp_sack', '/proc/sys/net/ipv4/tcp_window_scaling', ] for fname in stats_to_check: try: with open(fname, 'r') as f: value = f.read() sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip() except IOError: pass self.results['sysinfo'] = sysinfo def _DisplayStats(self, trials): """Prints out mean, standard deviation, median, and 90th percentile.""" n = len(trials) mean = float(sum(trials)) / n stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n) print str(n).rjust(6), '', print ('%.1f' % (mean * 1000)).rjust(9), '', print ('%.1f' % (stdev * 1000)).rjust(12), '', print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '', print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), '' def _DisplayResults(self): """Displays results collected from diagnostic run.""" print print '=' * 78 print 'DIAGNOSTIC RESULTS'.center(78) print '=' * 78 if 'latency' in self.results: print print '-' * 78 print 'Latency'.center(78) print '-' * 78 print ('Operation Size Trials Mean (ms) Std Dev (ms) ' 'Median (ms) 90th % (ms)') print ('========= ========= ====== ========= ============ ' '=========== ===========') for key in sorted(self.results['latency']): trials = sorted(self.results['latency'][key]) op, numbytes = key.split('_') numbytes = int(numbytes) if op == 'METADATA': print 'Metadata'.rjust(9), '', print MakeHumanReadable(numbytes).rjust(9), '', self._DisplayStats(trials) if op == 'DOWNLOAD': print 'Download'.rjust(9), '', print MakeHumanReadable(numbytes).rjust(9), '', self._DisplayStats(trials) if op == 'UPLOAD': print 'Upload'.rjust(9), '', print MakeHumanReadable(numbytes).rjust(9), '', self._DisplayStats(trials) if op == 'DELETE': print 'Delete'.rjust(9), '', print MakeHumanReadable(numbytes).rjust(9), '', self._DisplayStats(trials) if 'write_throughput' in self.results: print print '-' * 78 print 'Write Throughput'.center(78) print '-' * 78 write_thru = self.results['write_throughput'] print 'Copied %s %s file(s) for a total transfer size of %s.' % ( self.num_objects, MakeHumanReadable(write_thru['file_size']), MakeHumanReadable(write_thru['total_bytes_copied'])) print 'Write throughput: %s/s.' % ( MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8)) print 'Parallelism strategy: %s' % write_thru['parallelism'] if 'write_throughput_file' in self.results: print print '-' * 78 print 'Write Throughput With File I/O'.center(78) print '-' * 78 write_thru_file = self.results['write_throughput_file'] print 'Copied %s %s file(s) for a total transfer size of %s.' % ( self.num_objects, MakeHumanReadable(write_thru_file['file_size']), MakeHumanReadable(write_thru_file['total_bytes_copied'])) print 'Write throughput: %s/s.' % ( MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8)) print 'Parallelism strategy: %s' % write_thru_file['parallelism'] if 'read_throughput' in self.results: print print '-' * 78 print 'Read Throughput'.center(78) print '-' * 78 read_thru = self.results['read_throughput'] print 'Copied %s %s file(s) for a total transfer size of %s.' % ( self.num_objects, MakeHumanReadable(read_thru['file_size']), MakeHumanReadable(read_thru['total_bytes_copied'])) print 'Read throughput: %s/s.' % ( MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8)) print 'Parallelism strategy: %s' % read_thru['parallelism'] if 'read_throughput_file' in self.results: print print '-' * 78 print 'Read Throughput With File I/O'.center(78) print '-' * 78 read_thru_file = self.results['read_throughput_file'] print 'Copied %s %s file(s) for a total transfer size of %s.' % ( self.num_objects, MakeHumanReadable(read_thru_file['file_size']), MakeHumanReadable(read_thru_file['total_bytes_copied'])) print 'Read throughput: %s/s.' % ( MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8)) print 'Parallelism strategy: %s' % read_thru_file['parallelism'] if 'listing' in self.results: print print '-' * 78 print 'Listing'.center(78) print '-' * 78 listing = self.results['listing'] insert = listing['insert'] delete = listing['delete'] print 'After inserting %s objects:' % listing['num_files'] print (' Total time for objects to appear: %.2g seconds' % insert['time_took']) print ' Number of listing calls made: %s' % insert['num_listing_calls'] print (' Individual listing call latencies: [%s]' % ', '.join('%.2gs' % lat for lat in insert['list_latencies'])) print (' Files reflected after each call: [%s]' % ', '.join(map(str, insert['files_seen_after_listing']))) print 'After deleting %s objects:' % listing['num_files'] print (' Total time for objects to appear: %.2g seconds' % delete['time_took']) print ' Number of listing calls made: %s' % delete['num_listing_calls'] print (' Individual listing call latencies: [%s]' % ', '.join('%.2gs' % lat for lat in delete['list_latencies'])) print (' Files reflected after each call: [%s]' % ', '.join(map(str, delete['files_seen_after_listing']))) if 'sysinfo' in self.results: print print '-' * 78 print 'System Information'.center(78) print '-' * 78 info = self.results['sysinfo'] print 'IP Address: \n %s' % info['ip_address'] print 'Temporary Directory: \n %s' % info['tempdir'] print 'Bucket URI: \n %s' % self.results['bucket_uri'] print 'gsutil Version: \n %s' % self.results.get('gsutil_version', 'Unknown') print 'boto Version: \n %s' % self.results.get('boto_version', 'Unknown') if 'gmt_timestamp' in info: ts_string = info['gmt_timestamp'] timetuple = None try: # Convert RFC 2822 string to Linux timestamp. timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000') except ValueError: pass if timetuple: # Converts the GMT time tuple to local Linux timestamp. localtime = calendar.timegm(timetuple) localdt = datetime.datetime.fromtimestamp(localtime) print 'Measurement time: \n %s' % localdt.strftime( '%Y-%m-%d %I:%M:%S %p %Z') print 'Google Server: \n %s' % info['googserv_route'] print ('Google Server IP Addresses: \n %s' % ('\n '.join(info['googserv_ips']))) print ('Google Server Hostnames: \n %s' % ('\n '.join(info['googserv_hostnames']))) print 'Google DNS thinks your IP is: \n %s' % info['dns_o-o_ip'] print 'CPU Count: \n %s' % info['cpu_count'] print 'CPU Load Average: \n %s' % info['load_avg'] try: print ('Total Memory: \n %s' % MakeHumanReadable(info['meminfo']['mem_total'])) # Free memory is really MemFree + Buffers + Cached. print 'Free Memory: \n %s' % MakeHumanReadable( info['meminfo']['mem_free'] + info['meminfo']['mem_buffers'] + info['meminfo']['mem_cached']) except TypeError: pass if 'netstat_end' in info and 'netstat_start' in info: netstat_after = info['netstat_end'] netstat_before = info['netstat_start'] for tcp_type in ('sent', 'received', 'retransmit'): try: delta = (netstat_after['tcp_%s' % tcp_type] - netstat_before['tcp_%s' % tcp_type]) print 'TCP segments %s during test:\n %d' % (tcp_type, delta) except TypeError: pass else: print ('TCP segment counts not available because "netstat" was not ' 'found during test runs') if 'disk_counters_end' in info and 'disk_counters_start' in info: print 'Disk Counter Deltas:\n', disk_after = info['disk_counters_end'] disk_before = info['disk_counters_start'] print '', 'disk'.rjust(6), for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime', 'wtime']: print colname.rjust(8), print for diskname in sorted(disk_after): before = disk_before[diskname] after = disk_after[diskname] (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after print '', diskname.rjust(6), deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1, wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1] for delta in deltas: print str(delta).rjust(8), print if 'tcp_proc_values' in info: print 'TCP /proc values:\n', for item in info['tcp_proc_values'].iteritems(): print ' %s = %s' % item if 'boto_https_enabled' in info: print 'Boto HTTPS Enabled: \n %s' % info['boto_https_enabled'] if 'using_proxy' in info: print 'Requests routed through proxy: \n %s' % info['using_proxy'] if 'google_host_dns_latency' in info: print ('Latency of the DNS lookup for Google Storage server (ms): ' '\n %.1f' % (info['google_host_dns_latency'] * 1000.0)) if 'google_host_connect_latencies' in info: print 'Latencies connecting to Google Storage server IPs (ms):' for ip, latency in info['google_host_connect_latencies'].iteritems(): print ' %s = %.1f' % (ip, latency * 1000.0) if 'proxy_dns_latency' in info: print ('Latency of the DNS lookup for the configured proxy (ms): ' '\n %.1f' % (info['proxy_dns_latency'] * 1000.0)) if 'proxy_host_connect_latency' in info: print ('Latency connecting to the configured proxy (ms): \n %.1f' % (info['proxy_host_connect_latency'] * 1000.0)) if 'request_errors' in self.results and 'total_requests' in self.results: print print '-' * 78 print 'In-Process HTTP Statistics'.center(78) print '-' * 78 total = int(self.results['total_requests']) numerrors = int(self.results['request_errors']) numbreaks = int(self.results['connection_breaks']) availability = (((total - numerrors) / float(total)) * 100 if total > 0 else 100) print 'Total HTTP requests made: %d' % total print 'HTTP 5xx errors: %d' % numerrors print 'HTTP connections broken: %d' % numbreaks print 'Availability: %.7g%%' % availability if 'error_responses_by_code' in self.results: sorted_codes = sorted( self.results['error_responses_by_code'].iteritems()) if sorted_codes: print 'Error responses by code:' print '\n'.join(' %s: %s' % c for c in sorted_codes) if self.output_file: with open(self.output_file, 'w') as f: json.dump(self.results, f, indent=2) print print "Output file written to '%s'." % self.output_file print def _ParsePositiveInteger(self, val, msg): """Tries to convert val argument to a positive integer. Args: val: The value (as a string) to convert to a positive integer. msg: The error message to place in the CommandException on an error. Returns: A valid positive integer. Raises: CommandException: If the supplied value is not a valid positive integer. """ try: val = int(val) if val < 1: raise CommandException(msg) return val except ValueError: raise CommandException(msg) def _ParseArgs(self): """Parses arguments for perfdiag command.""" # From -n. self.num_objects = 5 # From -c. self.processes = 1 # From -k. self.threads = 1 # From -p self.parallel_strategy = None # From -y self.num_slices = 4 # From -s. self.thru_filesize = 1048576 # From -d. self.directory = tempfile.gettempdir() # Keep track of whether or not to delete the directory upon completion. self.delete_directory = False # From -t. self.diag_tests = set(self.DEFAULT_DIAG_TESTS) # From -o. self.output_file = None # From -i. self.input_file = None # From -m. self.metadata_keys = {} if self.sub_opts: for o, a in self.sub_opts: if o == '-n': self.num_objects = self._ParsePositiveInteger( a, 'The -n parameter must be a positive integer.') if o == '-c': self.processes = self._ParsePositiveInteger( a, 'The -c parameter must be a positive integer.') if o == '-k': self.threads = self._ParsePositiveInteger( a, 'The -k parameter must be a positive integer.') if o == '-p': if a.lower() in self.PARALLEL_STRATEGIES: self.parallel_strategy = a.lower() else: raise CommandException( "'%s' is not a valid parallelism strategy." % a) if o == '-y': self.num_slices = self._ParsePositiveInteger( a, 'The -y parameter must be a positive integer.') if o == '-s': try: self.thru_filesize = HumanReadableToBytes(a) except ValueError: raise CommandException('Invalid -s parameter.') if o == '-d': self.directory = a if not os.path.exists(self.directory): self.delete_directory = True os.makedirs(self.directory) if o == '-t': self.diag_tests = set() for test_name in a.strip().split(','): if test_name.lower() not in self.ALL_DIAG_TESTS: raise CommandException("List of test names (-t) contains invalid " "test name '%s'." % test_name) self.diag_tests.add(test_name) if o == '-m': pieces = a.split(':') if len(pieces) != 2: raise CommandException( "Invalid metadata key-value combination '%s'." % a) key, value = pieces self.metadata_keys[key] = value if o == '-o': self.output_file = os.path.abspath(a) if o == '-i': self.input_file = os.path.abspath(a) if not os.path.isfile(self.input_file): raise CommandException("Invalid input file (-i): '%s'." % a) try: with open(self.input_file, 'r') as f: self.results = json.load(f) self.logger.info("Read input file: '%s'.", self.input_file) except ValueError: raise CommandException("Could not decode input file (-i): '%s'." % a) return # If parallelism is specified, default parallelism strategy to fan. if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy: self.parallel_strategy = self.FAN elif self.processes == 1 and self.threads == 1 and self.parallel_strategy: raise CommandException( 'Cannot specify parallelism strategy (-p) without also specifying ' 'multiple threads and/or processes (-c and/or -k).') if not self.args: self.RaiseWrongNumberOfArgumentsException() self.bucket_url = StorageUrlFromString(self.args[0]) self.provider = self.bucket_url.scheme if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket(): raise CommandException('The perfdiag command requires a URL that ' 'specifies a bucket.\n"%s" is not ' 'valid.' % self.args[0]) if (self.thru_filesize > HumanReadableToBytes('2GiB') and (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)): raise CommandException( 'For in-memory tests maximum file size is 2GiB. For larger file ' 'sizes, specify rthru_file and/or wthru_file with the -t option.') perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH) slice_not_available = ( self.provider == 's3' and self.diag_tests.intersection(self.WTHRU, self.WTHRU_FILE)) if perform_slice and slice_not_available: raise CommandException('Sliced uploads are not available for s3. ' 'Use -p fan or sequential uploads for s3.') # Ensure the bucket exists. self.gsutil_api.GetBucket(self.bucket_url.bucket_name, provider=self.bucket_url.scheme, fields=['id']) self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror, socket.timeout, httplib.BadStatusLine, ServiceException] # Command entry point. def RunCommand(self): """Called by gsutil when the command is being invoked.""" self._ParseArgs() if self.input_file: self._DisplayResults() return 0 # We turn off retries in the underlying boto library because the # _RunOperation function handles errors manually so it can count them. boto.config.set('Boto', 'num_retries', '0') self.logger.info( 'Number of iterations to run: %d\n' 'Base bucket URI: %s\n' 'Number of processes: %d\n' 'Number of threads: %d\n' 'Parallelism strategy: %s\n' 'Throughput file size: %s\n' 'Diagnostics to run: %s', self.num_objects, self.bucket_url, self.processes, self.threads, self.parallel_strategy, MakeHumanReadable(self.thru_filesize), (', '.join(self.diag_tests))) try: self._SetUp() # Collect generic system info. self._CollectSysInfo() # Collect netstat info and disk counters before tests (and again later). netstat_output = self._GetTcpStats() if netstat_output: self.results['sysinfo']['netstat_start'] = netstat_output if IS_LINUX: self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters() # Record bucket URL. self.results['bucket_uri'] = str(self.bucket_url) self.results['json_format'] = 'perfdiag' self.results['metadata'] = self.metadata_keys if self.LAT in self.diag_tests: self._RunLatencyTests() if self.RTHRU in self.diag_tests: self._RunReadThruTests() # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it # will be used in RTHRU_FILE to save time and bandwidth. if self.WTHRU_FILE in self.diag_tests: self._RunWriteThruTests(use_file=True) if self.RTHRU_FILE in self.diag_tests: self._RunReadThruTests(use_file=True) if self.WTHRU in self.diag_tests: self._RunWriteThruTests() if self.LIST in self.diag_tests: self._RunListTests() # Collect netstat info and disk counters after tests. netstat_output = self._GetTcpStats() if netstat_output: self.results['sysinfo']['netstat_end'] = netstat_output if IS_LINUX: self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters() self.results['total_requests'] = self.total_requests self.results['request_errors'] = self.request_errors self.results['error_responses_by_code'] = self.error_responses_by_code self.results['connection_breaks'] = self.connection_breaks self.results['gsutil_version'] = gslib.VERSION self.results['boto_version'] = boto.__version__ self._TearDown() self._DisplayResults() finally: # TODO: Install signal handlers so this is performed in response to a # terminating signal; consider multi-threaded object deletes during # cleanup so it happens quickly. self._TearDown() return 0 def StorageUrlToUploadObjectMetadata(storage_url): if storage_url.IsCloudUrl() and storage_url.IsObject(): upload_target = apitools_messages.Object() upload_target.name = storage_url.object_name upload_target.bucket = storage_url.bucket_name return upload_target else: raise CommandException('Non-cloud URL upload target %s was created in ' 'perfdiag implemenation.' % storage_url)