#!/usr/bin/python # # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Module to upload a MySQL dump file to Cloud SQL. Usage: dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE [REMOTE] Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional arguments will connect to localhost as root with an empty password. positional arguments: FILE text dump file containing MySQL commands REMOTE Cloud SQL instance name or MySQL hostname optional arguments: -h, --help show this help message and exit --resume NUM resume dump at command NUM --user USER user (ignored for CloudSQL) --passwd PASSWD passwd (ignored for CloudSQL) """ from __future__ import division import argparse import collections import datetime import os import re import sys import time BYTES_PER_GB = 2**30 class MySQLConnectionManager(object): """Manages connections to a MySQL database. Vars: factory: A *ConnectionFactory. connected: Whether we currently hold a live DB connection. cmd_num: The number of commands executed. """ def __init__(self, connection_factory): self.factory = connection_factory self.connected = False self.cmd_num = 0 def write(self, data, execute_cmd=True, increment_cmd=False): """Buffers writes to command boundaries. Args: data: A line of data from the MySQL dump. execute_cmd: Whether to execute the command, defaults to True. increment_cmd: Whether to increment cmd_num, defaults to False. """ if not data or not data.strip() or data == '\n' or data[:2] == '--': return self._cmd += data[:-1] if data[-1] == '\n' else data if self._cmd[-1] != ';': return # Execute command. if execute_cmd: self._cursor.execute(self._cmd.decode('utf-8')) self._cmd = '' if increment_cmd: self.cmd_num += 1 def disconnect(self): """Closes the current database connection.""" if self.connected: self.connected = False self._cursor.close() self._db.close() def connect(self): """Creates a new database connection.""" self.disconnect() self._db = self.factory.connect() self.connected = True self._cursor = self._db.cursor() self._cmd = '' class CloudSQLConnectionFactory(object): """Creates Cloud SQL database connections.""" def __init__(self, cloudsql_instance): self._instance = cloudsql_instance def connect(self): """Connects to the Cloud SQL database and returns the connection. Returns: A MySQLdb compatible database connection to the Cloud SQL instance. """ print 'Connecting to Cloud SQL instance %s.' % self._instance try: from google.storage.speckle.python.api import rdbms_googleapi except ImportError: sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK ' 'directory to your PYTHONPATH. Download the SDK from: ' 'https://developers.google.com/appengine/downloads') return rdbms_googleapi.connect(None, instance=self._instance) class LocalSQLConnectionFactory(object): """Creates local MySQL database connections.""" def __init__(self, host=None, user='root', passwd=''): if not host: host = 'localhost' self._host = host self._user = user self._passwd = passwd def connect(self): """Connects to the local MySQL database and returns the connection. Returns: A MySQLdb database connection to the local MySQL database. """ print 'Connecting to mysql at localhost as %s.' % self._user try: import MySQLdb except ImportError: sys.exit('Unable to import MySQLdb. To install on Ubuntu: ' 'apt-get install python-mysqldb') return MySQLdb.connect(host=self._host, user=self._user, passwd=self._passwd) class MySQLState(object): """Maintains the MySQL global state. This is a hack that keeps record of all MySQL lines that set global state. These are needed to reconstruct the MySQL state on resume. """ _set_regex = re.compile('\S*\s*SET(.*)[\s=]') def __init__(self): self._db_line = '' self._table_lock = [] self._sets = collections.OrderedDict() def process(self, line): """Check and save lines that affect the global state. Args: line: A line from the MySQL dump file. """ # Most recent USE line. if line[:3] == 'USE': self._db_line = line # SET variables. m = self._set_regex.match(line) if m: self._sets[m.group(1).strip()] = line # Maintain LOCK TABLES if (line[:11] == 'LOCK TABLES' or ('ALTER TABLE' in line and 'DISABLE KEYS' in line)): self._table_lock.append(line) if (line[:14] == 'UNLOCK TABLES;'): self._table_lock = [] def write(self, out): """Print lines to recreate the saved state. Args: out: A File-like object to write out saved state. """ out.write(self._db_line) for v in self._sets.itervalues(): out.write(v) for l in self._table_lock: out.write(l) def breakpoint(self, line): """Returns true if we can handle breaking after this line. Args: line: A line from the MySQL dump file. Returns: Boolean indicating whether we can break after |line|. """ return (line[:28] == '-- Table structure for table' or line[:11] == 'INSERT INTO') def dump_to_cloudsql(dumpfile, manager, cmd_offset=0): """Dumps a MySQL dump file to a database through a MySQLConnectionManager. Args: dumpfile: Path to a file from which to read the MySQL dump. manager: An instance of MySQLConnectionManager. cmd_offset: No commands will be executed on the database before this count is reached. Used to continue an uncompleted dump. Defaults to 0. """ state = MySQLState() total = os.path.getsize(dumpfile) start_time = time.time() line_num = 0 with open(dumpfile, 'r') as dump: for line in dump: line_num += 1 if not manager.connected: manager.connect() try: # Construct commands from lines and execute them. state.process(line) if manager.cmd_num == cmd_offset and cmd_offset != 0: print '\nRecreating state at line: %d' % line_num state.write(manager) manager.write(line, manager.cmd_num >= cmd_offset, True) # Print status. sys.stdout.write( '\rstatus: %.3f%% %0.2f GB %d commands ' % (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB, manager.cmd_num)) sys.stdout.flush() # Handle interrupts and connection failures. except KeyboardInterrupt: print ('\nInterrupted while executing command: %d' % manager.cmd_num) raise except: print '\nFailed while executing command: %d' % manager.cmd_num delta = int(time.time() - start_time) print 'Total time: %s' % str(datetime.timedelta(seconds=delta)) if state.breakpoint(line): # Attempt to resume. print ('Execution can resume from here (line = %d)' % line_num) manager.cmd_num += 1 cmd_offset = manager.cmd_num print ('Will now attempt to auto-resume at command: %d' % cmd_offset) manager.disconnect() else: print 'Execution may fail to resume correctly from here.' print ('Use --resume=%d to attempt to resume the dump.' % manager.cmd_num) raise print '\nDone.' if __name__ == '__main__': """Imports a MySQL database from a dump file. Interprets command line arguments and calls dump_to_cloudsql appropriately. """ description = """Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional arguments will connect to localhost as root with an empty password.""" parser = argparse.ArgumentParser(description=description) parser.add_argument('mysqldump', metavar='FILE', help='text dump file containing MySQL commands') parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE', help='either a Cloud SQL account:instance or a hostname') parser.add_argument('--resume', default=0, type=int, metavar='NUM', help='resume dump at command NUM') parser.add_argument('--user', default='root', metavar='USER', help='user (ignored for Cloud SQL)') parser.add_argument('--passwd', default='', metavar='PASSWD', help='passwd (ignored for Cloud SQL)') args = parser.parse_args() if args.remote and ':' in args.remote: connection = CloudSQLConnectionFactory(args.remote) else: connection = LocalSQLConnectionFactory(args.remote, args.user, args.passwd) if args.resume: print 'Resuming execution at command: %d' % options.resume dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection), args.resume)