• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Module to upload a MySQL dump file to Cloud SQL.
8
9Usage:
10  dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE
11                      [REMOTE]
12
13  Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional
14  arguments will connect to localhost as root with an empty password.
15
16  positional arguments:
17    FILE             text dump file containing MySQL commands
18    REMOTE           Cloud SQL instance name or MySQL hostname
19
20  optional arguments:
21    -h, --help       show this help message and exit
22    --resume NUM     resume dump at command NUM
23    --user USER      user (ignored for CloudSQL)
24    --passwd PASSWD  passwd (ignored for CloudSQL)
25"""
26
27from __future__ import absolute_import
28from __future__ import division
29from __future__ import print_function
30import argparse
31import collections
32import datetime
33import os
34import re
35import sys
36import time
37import six
38
39
40BYTES_PER_GB = 2**30
41
42
43class MySQLConnectionManager(object):
44    """Manages connections to a MySQL database.
45
46    Vars:
47      factory: A *ConnectionFactory.
48      connected: Whether we currently hold a live DB connection.
49      cmd_num: The number of commands executed.
50    """
51    def __init__(self, connection_factory):
52        self.factory = connection_factory
53        self.connected = False
54        self.cmd_num = 0
55
56    def write(self, data, execute_cmd=True, increment_cmd=False):
57        """Buffers writes to command boundaries.
58
59        Args:
60          data: A line of data from the MySQL dump.
61          execute_cmd: Whether to execute the command, defaults to True.
62          increment_cmd: Whether to increment cmd_num, defaults to False.
63          """
64        if not data or not data.strip() or data == '\n' or data[:2] == '--':
65            return
66        self._cmd += data[:-1] if data[-1] == '\n' else data
67        if self._cmd[-1] != ';':
68            return
69        # Execute command.
70        if execute_cmd:
71            self._cursor.execute(six.ensure_text(self._cmd, 'utf-8'))
72        self._cmd = ''
73        if increment_cmd:
74            self.cmd_num += 1
75
76    def disconnect(self):
77      """Closes the current database connection."""
78      if self.connected:
79          self.connected = False
80          self._cursor.close()
81          self._db.close()
82
83    def connect(self):
84      """Creates a new database connection."""
85      self.disconnect()
86      self._db = self.factory.connect()
87      self.connected = True
88      self._cursor = self._db.cursor()
89      self._cmd = ''
90
91
92class CloudSQLConnectionFactory(object):
93    """Creates Cloud SQL database connections."""
94    def __init__(self, cloudsql_instance):
95        self._instance = cloudsql_instance
96
97    def connect(self):
98        """Connects to the Cloud SQL database and returns the connection.
99
100        Returns:
101          A MySQLdb compatible database connection to the Cloud SQL instance.
102        """
103        print('Connecting to Cloud SQL instance %s.' % self._instance)
104        try:
105            from google.storage.speckle.python.api import rdbms_googleapi
106        except ImportError:
107            sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK '
108                     'directory to your PYTHONPATH. Download the SDK from: '
109                     'https://developers.google.com/appengine/downloads')
110        return rdbms_googleapi.connect(None, instance=self._instance)
111
112
113class LocalSQLConnectionFactory(object):
114    """Creates local MySQL database connections."""
115    def __init__(self, host=None, user='root', passwd=''):
116        if not host:
117          host = 'localhost'
118        self._host = host
119        self._user = user
120        self._passwd = passwd
121
122    def connect(self):
123        """Connects to the local MySQL database and returns the connection.
124
125        Returns:
126          A MySQLdb database connection to the local MySQL database.
127        """
128        print('Connecting to mysql at localhost as %s.' % self._user)
129        try:
130            import MySQLdb
131        except ImportError:
132            sys.exit('Unable to import MySQLdb. To install on Ubuntu: '
133                     'apt-get install python-mysqldb')
134        return MySQLdb.connect(host=self._host, user=self._user,
135                               passwd=self._passwd)
136
137
138class MySQLState(object):
139    """Maintains the MySQL global state.
140
141    This is a hack that keeps record of all MySQL lines that set global state.
142    These are needed to reconstruct the MySQL state on resume.
143    """
144    _set_regex = re.compile('\S*\s*SET(.*)[\s=]')
145
146    def __init__(self):
147        self._db_line = ''
148        self._table_lock = []
149        self._sets = collections.OrderedDict()
150
151    def process(self, line):
152        """Check and save lines that affect the global state.
153
154        Args:
155          line: A line from the MySQL dump file.
156        """
157        # Most recent USE line.
158        if line[:3] == 'USE':
159            self._db_line = line
160        # SET variables.
161        m = self._set_regex.match(line)
162        if m:
163            self._sets[m.group(1).strip()] = line
164        # Maintain LOCK TABLES
165        if (line[:11] == 'LOCK TABLES' or
166            ('ALTER TABLE' in line and 'DISABLE KEYS' in line)):
167            self._table_lock.append(line)
168        if (line[:14] == 'UNLOCK TABLES;'):
169            self._table_lock = []
170
171    def write(self, out):
172        """Print lines to recreate the saved state.
173
174        Args:
175          out: A File-like object to write out saved state.
176        """
177        out.write(self._db_line)
178        for v in six.itervalues(self._sets):
179            out.write(v)
180        for l in self._table_lock:
181            out.write(l)
182
183    def breakpoint(self, line):
184      """Returns true if we can handle breaking after this line.
185
186      Args:
187        line: A line from the MySQL dump file.
188
189      Returns:
190        Boolean indicating whether we can break after |line|.
191      """
192      return (line[:28] == '-- Table structure for table' or
193              line[:11] == 'INSERT INTO')
194
195
196def dump_to_cloudsql(dumpfile, manager, cmd_offset=0):
197    """Dumps a MySQL dump file to a database through a MySQLConnectionManager.
198
199    Args:
200      dumpfile: Path to a file from which to read the MySQL dump.
201      manager: An instance of MySQLConnectionManager.
202      cmd_offset: No commands will be executed on the database before this count
203        is reached. Used to continue an uncompleted dump. Defaults to 0.
204    """
205    state = MySQLState()
206    total = os.path.getsize(dumpfile)
207    start_time = time.time()
208    line_num = 0
209    with open(dumpfile, 'r') as dump:
210        for line in dump:
211            line_num += 1
212            if not manager.connected:
213                manager.connect()
214            try:
215                # Construct commands from lines and execute them.
216                state.process(line)
217                if manager.cmd_num == cmd_offset and cmd_offset != 0:
218                    print('\nRecreating state at line: %d' % line_num)
219                    state.write(manager)
220                manager.write(line, manager.cmd_num >= cmd_offset, True)
221                # Print status.
222                sys.stdout.write(
223                    '\rstatus:  %.3f%%     %0.2f GB     %d commands ' %
224                    (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB,
225                     manager.cmd_num))
226                sys.stdout.flush()
227            # Handle interrupts and connection failures.
228            except KeyboardInterrupt:
229                print('\nInterrupted while executing command: %d' %
230                      manager.cmd_num)
231                raise
232            except:
233                print('\nFailed while executing command: %d' % manager.cmd_num)
234                delta = int(time.time() - start_time)
235                print('Total time: %s' % str(datetime.timedelta(seconds=delta)))
236                if state.breakpoint(line):
237                    # Attempt to resume.
238                    print('Execution can resume from here (line = %d)' %
239                          line_num)
240                    manager.cmd_num += 1
241                    cmd_offset = manager.cmd_num
242                    print('Will now attempt to auto-resume at command: %d' %
243                          cmd_offset)
244                    manager.disconnect()
245                else:
246                    print('Execution may fail to resume correctly from here.')
247                    print('Use --resume=%d to attempt to resume the dump.' %
248                          manager.cmd_num)
249                    raise
250    print('\nDone.')
251
252
253if __name__ == '__main__':
254    """Imports a MySQL database from a dump file.
255
256    Interprets command line arguments and calls dump_to_cloudsql appropriately.
257    """
258    description = """Uploads MySQL dump file to a MySQL database or Cloud SQL.
259                  With no optional arguments will connect to localhost as root
260                  with an empty password."""
261    parser = argparse.ArgumentParser(description=description)
262    parser.add_argument('mysqldump', metavar='FILE',
263                        help='text dump file containing MySQL commands')
264    parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE',
265        help='either a Cloud SQL account:instance or a hostname')
266    parser.add_argument('--resume', default=0, type=int, metavar='NUM',
267                        help='resume dump at command NUM')
268    parser.add_argument('--user', default='root', metavar='USER',
269                        help='user (ignored for Cloud SQL)')
270    parser.add_argument('--passwd', default='', metavar='PASSWD',
271                        help='passwd (ignored for Cloud SQL)')
272    args = parser.parse_args()
273    if args.remote and ':' in args.remote:
274        connection = CloudSQLConnectionFactory(args.remote)
275    else:
276        connection = LocalSQLConnectionFactory(args.remote, args.user,
277                                               args.passwd)
278    if args.resume:
279        print('Resuming execution at command: %d' % options.resume)
280    dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection),
281                     args.resume)
282