1#!/usr/bin/python 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Module to upload a MySQL dump file to Cloud SQL. 8 9Usage: 10 dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE 11 [REMOTE] 12 13 Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional 14 arguments will connect to localhost as root with an empty password. 15 16 positional arguments: 17 FILE text dump file containing MySQL commands 18 REMOTE Cloud SQL instance name or MySQL hostname 19 20 optional arguments: 21 -h, --help show this help message and exit 22 --resume NUM resume dump at command NUM 23 --user USER user (ignored for CloudSQL) 24 --passwd PASSWD passwd (ignored for CloudSQL) 25""" 26 27from __future__ import division 28import argparse 29import collections 30import datetime 31import os 32import re 33import sys 34import time 35 36 37BYTES_PER_GB = 2**30 38 39 40class MySQLConnectionManager(object): 41 """Manages connections to a MySQL database. 42 43 Vars: 44 factory: A *ConnectionFactory. 45 connected: Whether we currently hold a live DB connection. 46 cmd_num: The number of commands executed. 47 """ 48 def __init__(self, connection_factory): 49 self.factory = connection_factory 50 self.connected = False 51 self.cmd_num = 0 52 53 def write(self, data, execute_cmd=True, increment_cmd=False): 54 """Buffers writes to command boundaries. 55 56 Args: 57 data: A line of data from the MySQL dump. 58 execute_cmd: Whether to execute the command, defaults to True. 59 increment_cmd: Whether to increment cmd_num, defaults to False. 60 """ 61 if not data or not data.strip() or data == '\n' or data[:2] == '--': 62 return 63 self._cmd += data[:-1] if data[-1] == '\n' else data 64 if self._cmd[-1] != ';': 65 return 66 # Execute command. 67 if execute_cmd: 68 self._cursor.execute(self._cmd.decode('utf-8')) 69 self._cmd = '' 70 if increment_cmd: 71 self.cmd_num += 1 72 73 def disconnect(self): 74 """Closes the current database connection.""" 75 if self.connected: 76 self.connected = False 77 self._cursor.close() 78 self._db.close() 79 80 def connect(self): 81 """Creates a new database connection.""" 82 self.disconnect() 83 self._db = self.factory.connect() 84 self.connected = True 85 self._cursor = self._db.cursor() 86 self._cmd = '' 87 88 89class CloudSQLConnectionFactory(object): 90 """Creates Cloud SQL database connections.""" 91 def __init__(self, cloudsql_instance): 92 self._instance = cloudsql_instance 93 94 def connect(self): 95 """Connects to the Cloud SQL database and returns the connection. 96 97 Returns: 98 A MySQLdb compatible database connection to the Cloud SQL instance. 99 """ 100 print 'Connecting to Cloud SQL instance %s.' % self._instance 101 try: 102 from google.storage.speckle.python.api import rdbms_googleapi 103 except ImportError: 104 sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK ' 105 'directory to your PYTHONPATH. Download the SDK from: ' 106 'https://developers.google.com/appengine/downloads') 107 return rdbms_googleapi.connect(None, instance=self._instance) 108 109 110class LocalSQLConnectionFactory(object): 111 """Creates local MySQL database connections.""" 112 def __init__(self, host=None, user='root', passwd=''): 113 if not host: 114 host = 'localhost' 115 self._host = host 116 self._user = user 117 self._passwd = passwd 118 119 def connect(self): 120 """Connects to the local MySQL database and returns the connection. 121 122 Returns: 123 A MySQLdb database connection to the local MySQL database. 124 """ 125 print 'Connecting to mysql at localhost as %s.' % self._user 126 try: 127 import MySQLdb 128 except ImportError: 129 sys.exit('Unable to import MySQLdb. To install on Ubuntu: ' 130 'apt-get install python-mysqldb') 131 return MySQLdb.connect(host=self._host, user=self._user, 132 passwd=self._passwd) 133 134 135class MySQLState(object): 136 """Maintains the MySQL global state. 137 138 This is a hack that keeps record of all MySQL lines that set global state. 139 These are needed to reconstruct the MySQL state on resume. 140 """ 141 _set_regex = re.compile('\S*\s*SET(.*)[\s=]') 142 143 def __init__(self): 144 self._db_line = '' 145 self._table_lock = [] 146 self._sets = collections.OrderedDict() 147 148 def process(self, line): 149 """Check and save lines that affect the global state. 150 151 Args: 152 line: A line from the MySQL dump file. 153 """ 154 # Most recent USE line. 155 if line[:3] == 'USE': 156 self._db_line = line 157 # SET variables. 158 m = self._set_regex.match(line) 159 if m: 160 self._sets[m.group(1).strip()] = line 161 # Maintain LOCK TABLES 162 if (line[:11] == 'LOCK TABLES' or 163 ('ALTER TABLE' in line and 'DISABLE KEYS' in line)): 164 self._table_lock.append(line) 165 if (line[:14] == 'UNLOCK TABLES;'): 166 self._table_lock = [] 167 168 def write(self, out): 169 """Print lines to recreate the saved state. 170 171 Args: 172 out: A File-like object to write out saved state. 173 """ 174 out.write(self._db_line) 175 for v in self._sets.itervalues(): 176 out.write(v) 177 for l in self._table_lock: 178 out.write(l) 179 180 def breakpoint(self, line): 181 """Returns true if we can handle breaking after this line. 182 183 Args: 184 line: A line from the MySQL dump file. 185 186 Returns: 187 Boolean indicating whether we can break after |line|. 188 """ 189 return (line[:28] == '-- Table structure for table' or 190 line[:11] == 'INSERT INTO') 191 192 193def dump_to_cloudsql(dumpfile, manager, cmd_offset=0): 194 """Dumps a MySQL dump file to a database through a MySQLConnectionManager. 195 196 Args: 197 dumpfile: Path to a file from which to read the MySQL dump. 198 manager: An instance of MySQLConnectionManager. 199 cmd_offset: No commands will be executed on the database before this count 200 is reached. Used to continue an uncompleted dump. Defaults to 0. 201 """ 202 state = MySQLState() 203 total = os.path.getsize(dumpfile) 204 start_time = time.time() 205 line_num = 0 206 with open(dumpfile, 'r') as dump: 207 for line in dump: 208 line_num += 1 209 if not manager.connected: 210 manager.connect() 211 try: 212 # Construct commands from lines and execute them. 213 state.process(line) 214 if manager.cmd_num == cmd_offset and cmd_offset != 0: 215 print '\nRecreating state at line: %d' % line_num 216 state.write(manager) 217 manager.write(line, manager.cmd_num >= cmd_offset, True) 218 # Print status. 219 sys.stdout.write( 220 '\rstatus: %.3f%% %0.2f GB %d commands ' % 221 (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB, 222 manager.cmd_num)) 223 sys.stdout.flush() 224 # Handle interrupts and connection failures. 225 except KeyboardInterrupt: 226 print ('\nInterrupted while executing command: %d' % 227 manager.cmd_num) 228 raise 229 except: 230 print '\nFailed while executing command: %d' % manager.cmd_num 231 delta = int(time.time() - start_time) 232 print 'Total time: %s' % str(datetime.timedelta(seconds=delta)) 233 if state.breakpoint(line): 234 # Attempt to resume. 235 print ('Execution can resume from here (line = %d)' % 236 line_num) 237 manager.cmd_num += 1 238 cmd_offset = manager.cmd_num 239 print ('Will now attempt to auto-resume at command: %d' % 240 cmd_offset) 241 manager.disconnect() 242 else: 243 print 'Execution may fail to resume correctly from here.' 244 print ('Use --resume=%d to attempt to resume the dump.' % 245 manager.cmd_num) 246 raise 247 print '\nDone.' 248 249 250if __name__ == '__main__': 251 """Imports a MySQL database from a dump file. 252 253 Interprets command line arguments and calls dump_to_cloudsql appropriately. 254 """ 255 description = """Uploads MySQL dump file to a MySQL database or Cloud SQL. 256 With no optional arguments will connect to localhost as root 257 with an empty password.""" 258 parser = argparse.ArgumentParser(description=description) 259 parser.add_argument('mysqldump', metavar='FILE', 260 help='text dump file containing MySQL commands') 261 parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE', 262 help='either a Cloud SQL account:instance or a hostname') 263 parser.add_argument('--resume', default=0, type=int, metavar='NUM', 264 help='resume dump at command NUM') 265 parser.add_argument('--user', default='root', metavar='USER', 266 help='user (ignored for Cloud SQL)') 267 parser.add_argument('--passwd', default='', metavar='PASSWD', 268 help='passwd (ignored for Cloud SQL)') 269 args = parser.parse_args() 270 if args.remote and ':' in args.remote: 271 connection = CloudSQLConnectionFactory(args.remote) 272 else: 273 connection = LocalSQLConnectionFactory(args.remote, args.user, 274 args.passwd) 275 if args.resume: 276 print 'Resuming execution at command: %d' % options.resume 277 dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection), 278 args.resume) 279