#! /usr/bin/env python """ usage: %(progname)s [args] """ import os, sys, string, time, getopt from log import * import odb import sqlite import re # --- these are using for removing nulls from strings # --- because sqlite can't handle them def escape_string(str): def subfn(m): c = m.group(0) return "%%%02X" % ord(c) return re.sub("('|\0|%)",subfn,str) def unescape_string(str): def subfn(m): hexnum = int(m.group(1),16) return "%c" % hexnum return re.sub("%(..)",subfn,str) class Database(odb.Database): def __init__(self,db, debug=0): odb.Database.__init__(self, db, debug=debug) self.SQLError = sqlite.Error def escape(self,str): if str is None: return None elif type(str) == type(""): return string.replace(str,"'","''") elif type(str) == type(1): return str else: raise "unknown column data type: %s" % type(str) def listTables(self, cursor=None): if cursor is None: cursor = self.defaultCursor() cursor.execute("select name from sqlite_master where type='table'") rows = cursor.fetchall() tables = [] for row in rows: tables.append(row[0]) return tables def listIndices(self, cursor=None): if cursor is None: cursor = self.defaultCursor() cursor.execute("select name from sqlite_master where type='index'") rows = cursor.fetchall() tables = [] for row in rows: tables.append(row[0]) return tables def listFieldsDict(self, table_name, cursor=None): if cursor is None: cursor = self.defaultCursor() sql = "pragma table_info(%s)" % table_name cursor.execute(sql) rows = cursor.fetchall() columns = {} for row in rows: colname = row[1] columns[colname] = row return columns def _tableCreateStatement(self, table_name, cursor=None): if cursor is None: cursor = self.defaultCursor() sql = "select sql from sqlite_master where type='table' and name='%s'" % table_name print sql cursor.execute(sql) row = cursor.fetchone() sqlstatement = row[0] return sqlstatement def alterTableToMatch(self, table): tableName = table.getTableName() tmpTableName = tableName + "_" + str(os.getpid()) invalidAppCols, invalidDBCols = table.checkTable(warnflag=0) ## if invalidAppCols or invalidDBCols: ## return if not invalidAppCols and not invalidDBCols: return oldcols = self.listFieldsDict(tableName) # tmpcols = oldcols.keys() tmpcols = [] newcols = table.getAppColumnList() for colname, coltype, options in newcols: if oldcols.has_key(colname): tmpcols.append(colname) tmpcolnames = string.join(tmpcols, ",") statements = [] sql = "begin transaction" statements.append(sql) sql = "create temporary table %s (%s)" % (tmpTableName, tmpcolnames) statements.append(sql) sql = "insert into %s select %s from %s" % (tmpTableName, tmpcolnames, tableName) statements.append(sql) sql = "drop table %s" % tableName statements.append(sql) sql = table._createTableSQL() statements.append(sql) sql = "insert into %s(%s) select %s from %s" % (tableName, tmpcolnames, tmpcolnames, tmpTableName) statements.append(sql) sql = "drop table %s" % tmpTableName statements.append(sql) sql = "commit" statements.append(sql) cur = self.defaultCursor() for statement in statements: # print statement cur.execute(statement) def test(): pass def usage(progname): print __doc__ % vars() def main(argv, stdout, environ): progname = argv[0] optlist, args = getopt.getopt(argv[1:], "", ["help", "test", "debug"]) testflag = 0 if len(args) == 0: usage(progname) return for (field, val) in optlist: if field == "--help": usage(progname) return elif field == "--debug": debugfull() elif field == "--test": testflag = 1 if testflag: test() return if __name__ == "__main__": main(sys.argv, sys.stdout, os.environ)