1#!/usr/bin/python2 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Module to upload a MySQL dump file to Cloud SQL. 8 9Usage: 10 dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE 11 [REMOTE] 12 13 Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional 14 arguments will connect to localhost as root with an empty password. 15 16 positional arguments: 17 FILE text dump file containing MySQL commands 18 REMOTE Cloud SQL instance name or MySQL hostname 19 20 optional arguments: 21 -h, --help show this help message and exit 22 --resume NUM resume dump at command NUM 23 --user USER user (ignored for CloudSQL) 24 --passwd PASSWD passwd (ignored for CloudSQL) 25""" 26 27from __future__ import absolute_import 28from __future__ import division 29from __future__ import print_function 30import argparse 31import collections 32import datetime 33import os 34import re 35import sys 36import time 37import six 38 39 40BYTES_PER_GB = 2**30 41 42 43class MySQLConnectionManager(object): 44 """Manages connections to a MySQL database. 45 46 Vars: 47 factory: A *ConnectionFactory. 48 connected: Whether we currently hold a live DB connection. 49 cmd_num: The number of commands executed. 50 """ 51 def __init__(self, connection_factory): 52 self.factory = connection_factory 53 self.connected = False 54 self.cmd_num = 0 55 56 def write(self, data, execute_cmd=True, increment_cmd=False): 57 """Buffers writes to command boundaries. 58 59 Args: 60 data: A line of data from the MySQL dump. 61 execute_cmd: Whether to execute the command, defaults to True. 62 increment_cmd: Whether to increment cmd_num, defaults to False. 63 """ 64 if not data or not data.strip() or data == '\n' or data[:2] == '--': 65 return 66 self._cmd += data[:-1] if data[-1] == '\n' else data 67 if self._cmd[-1] != ';': 68 return 69 # Execute command. 70 if execute_cmd: 71 self._cursor.execute(six.ensure_text(self._cmd, 'utf-8')) 72 self._cmd = '' 73 if increment_cmd: 74 self.cmd_num += 1 75 76 def disconnect(self): 77 """Closes the current database connection.""" 78 if self.connected: 79 self.connected = False 80 self._cursor.close() 81 self._db.close() 82 83 def connect(self): 84 """Creates a new database connection.""" 85 self.disconnect() 86 self._db = self.factory.connect() 87 self.connected = True 88 self._cursor = self._db.cursor() 89 self._cmd = '' 90 91 92class CloudSQLConnectionFactory(object): 93 """Creates Cloud SQL database connections.""" 94 def __init__(self, cloudsql_instance): 95 self._instance = cloudsql_instance 96 97 def connect(self): 98 """Connects to the Cloud SQL database and returns the connection. 99 100 Returns: 101 A MySQLdb compatible database connection to the Cloud SQL instance. 102 """ 103 print('Connecting to Cloud SQL instance %s.' % self._instance) 104 try: 105 from google.storage.speckle.python.api import rdbms_googleapi 106 except ImportError: 107 sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK ' 108 'directory to your PYTHONPATH. Download the SDK from: ' 109 'https://developers.google.com/appengine/downloads') 110 return rdbms_googleapi.connect(None, instance=self._instance) 111 112 113class LocalSQLConnectionFactory(object): 114 """Creates local MySQL database connections.""" 115 def __init__(self, host=None, user='root', passwd=''): 116 if not host: 117 host = 'localhost' 118 self._host = host 119 self._user = user 120 self._passwd = passwd 121 122 def connect(self): 123 """Connects to the local MySQL database and returns the connection. 124 125 Returns: 126 A MySQLdb database connection to the local MySQL database. 127 """ 128 print('Connecting to mysql at localhost as %s.' % self._user) 129 try: 130 import MySQLdb 131 except ImportError: 132 sys.exit('Unable to import MySQLdb. To install on Ubuntu: ' 133 'apt-get install python-mysqldb') 134 return MySQLdb.connect(host=self._host, user=self._user, 135 passwd=self._passwd) 136 137 138class MySQLState(object): 139 """Maintains the MySQL global state. 140 141 This is a hack that keeps record of all MySQL lines that set global state. 142 These are needed to reconstruct the MySQL state on resume. 143 """ 144 _set_regex = re.compile('\S*\s*SET(.*)[\s=]') 145 146 def __init__(self): 147 self._db_line = '' 148 self._table_lock = [] 149 self._sets = collections.OrderedDict() 150 151 def process(self, line): 152 """Check and save lines that affect the global state. 153 154 Args: 155 line: A line from the MySQL dump file. 156 """ 157 # Most recent USE line. 158 if line[:3] == 'USE': 159 self._db_line = line 160 # SET variables. 161 m = self._set_regex.match(line) 162 if m: 163 self._sets[m.group(1).strip()] = line 164 # Maintain LOCK TABLES 165 if (line[:11] == 'LOCK TABLES' or 166 ('ALTER TABLE' in line and 'DISABLE KEYS' in line)): 167 self._table_lock.append(line) 168 if (line[:14] == 'UNLOCK TABLES;'): 169 self._table_lock = [] 170 171 def write(self, out): 172 """Print lines to recreate the saved state. 173 174 Args: 175 out: A File-like object to write out saved state. 176 """ 177 out.write(self._db_line) 178 for v in six.itervalues(self._sets): 179 out.write(v) 180 for l in self._table_lock: 181 out.write(l) 182 183 def breakpoint(self, line): 184 """Returns true if we can handle breaking after this line. 185 186 Args: 187 line: A line from the MySQL dump file. 188 189 Returns: 190 Boolean indicating whether we can break after |line|. 191 """ 192 return (line[:28] == '-- Table structure for table' or 193 line[:11] == 'INSERT INTO') 194 195 196def dump_to_cloudsql(dumpfile, manager, cmd_offset=0): 197 """Dumps a MySQL dump file to a database through a MySQLConnectionManager. 198 199 Args: 200 dumpfile: Path to a file from which to read the MySQL dump. 201 manager: An instance of MySQLConnectionManager. 202 cmd_offset: No commands will be executed on the database before this count 203 is reached. Used to continue an uncompleted dump. Defaults to 0. 204 """ 205 state = MySQLState() 206 total = os.path.getsize(dumpfile) 207 start_time = time.time() 208 line_num = 0 209 with open(dumpfile, 'r') as dump: 210 for line in dump: 211 line_num += 1 212 if not manager.connected: 213 manager.connect() 214 try: 215 # Construct commands from lines and execute them. 216 state.process(line) 217 if manager.cmd_num == cmd_offset and cmd_offset != 0: 218 print('\nRecreating state at line: %d' % line_num) 219 state.write(manager) 220 manager.write(line, manager.cmd_num >= cmd_offset, True) 221 # Print status. 222 sys.stdout.write( 223 '\rstatus: %.3f%% %0.2f GB %d commands ' % 224 (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB, 225 manager.cmd_num)) 226 sys.stdout.flush() 227 # Handle interrupts and connection failures. 228 except KeyboardInterrupt: 229 print('\nInterrupted while executing command: %d' % 230 manager.cmd_num) 231 raise 232 except: 233 print('\nFailed while executing command: %d' % manager.cmd_num) 234 delta = int(time.time() - start_time) 235 print('Total time: %s' % str(datetime.timedelta(seconds=delta))) 236 if state.breakpoint(line): 237 # Attempt to resume. 238 print('Execution can resume from here (line = %d)' % 239 line_num) 240 manager.cmd_num += 1 241 cmd_offset = manager.cmd_num 242 print('Will now attempt to auto-resume at command: %d' % 243 cmd_offset) 244 manager.disconnect() 245 else: 246 print('Execution may fail to resume correctly from here.') 247 print('Use --resume=%d to attempt to resume the dump.' % 248 manager.cmd_num) 249 raise 250 print('\nDone.') 251 252 253if __name__ == '__main__': 254 """Imports a MySQL database from a dump file. 255 256 Interprets command line arguments and calls dump_to_cloudsql appropriately. 257 """ 258 description = """Uploads MySQL dump file to a MySQL database or Cloud SQL. 259 With no optional arguments will connect to localhost as root 260 with an empty password.""" 261 parser = argparse.ArgumentParser(description=description) 262 parser.add_argument('mysqldump', metavar='FILE', 263 help='text dump file containing MySQL commands') 264 parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE', 265 help='either a Cloud SQL account:instance or a hostname') 266 parser.add_argument('--resume', default=0, type=int, metavar='NUM', 267 help='resume dump at command NUM') 268 parser.add_argument('--user', default='root', metavar='USER', 269 help='user (ignored for Cloud SQL)') 270 parser.add_argument('--passwd', default='', metavar='PASSWD', 271 help='passwd (ignored for Cloud SQL)') 272 args = parser.parse_args() 273 if args.remote and ':' in args.remote: 274 connection = CloudSQLConnectionFactory(args.remote) 275 else: 276 connection = LocalSQLConnectionFactory(args.remote, args.user, 277 args.passwd) 278 if args.resume: 279 print('Resuming execution at command: %d' % options.resume) 280 dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection), 281 args.resume) 282