"""TestCases for exercising a Recno DB. """ import os, sys import errno from pprint import pprint import string import unittest from test_all import db, test_support, verbose, get_new_environment_path, get_new_database_path #---------------------------------------------------------------------- class SimpleRecnoTestCase(unittest.TestCase): if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and (sys.version_info < (3, 2))) : def assertIsInstance(self, obj, datatype, msg=None) : return self.assertEqual(type(obj), datatype, msg=msg) def assertGreaterEqual(self, a, b, msg=None) : return self.assertGreaterEqual(a, b, msg=msg) def setUp(self): self.filename = get_new_database_path() self.homeDir = None def tearDown(self): test_support.unlink(self.filename) if self.homeDir: test_support.rmtree(self.homeDir) def test01_basic(self): d = db.DB() get_returns_none = d.set_get_returns_none(2) d.set_get_returns_none(get_returns_none) d.open(self.filename, db.DB_RECNO, db.DB_CREATE) for x in string.ascii_letters: recno = d.append(x * 60) self.assertIsInstance(recno, int) self.assertGreaterEqual(recno, 1) if verbose: print recno, if verbose: print stat = d.stat() if verbose: pprint(stat) for recno in range(1, len(d)+1): data = d[recno] if verbose: print data self.assertIsInstance(data, str) self.assertEqual(data, d.get(recno)) try: data = d[0] # This should raise a KeyError!?!?! except db.DBInvalidArgError, val: if sys.version_info < (2, 6) : self.assertEqual(val[0], db.EINVAL) else : self.assertEqual(val.args[0], db.EINVAL) if verbose: print val else: self.fail("expected exception") # test that has_key raises DB exceptions (fixed in pybsddb 4.3.2) try: d.has_key(0) except db.DBError, val: pass else: self.fail("has_key did not raise a proper exception") try: data = d[100] except KeyError: pass else: self.fail("expected exception") try: data = d.get(100) except db.DBNotFoundError, val: if get_returns_none: self.fail("unexpected exception") else: self.assertEqual(data, None) keys = d.keys() if verbose: print keys self.assertIsInstance(keys, list) self.assertIsInstance(keys[0], int) self.assertEqual(len(keys), len(d)) items = d.items() if verbose: pprint(items) self.assertIsInstance(items, list) self.assertIsInstance(items[0], tuple) self.assertEqual(len(items[0]), 2) self.assertIsInstance(items[0][0], int) self.assertIsInstance(items[0][1], str) self.assertEqual(len(items), len(d)) self.assertTrue(d.has_key(25)) del d[25] self.assertFalse(d.has_key(25)) d.delete(13) self.assertFalse(d.has_key(13)) data = d.get_both(26, "z" * 60) self.assertEqual(data, "z" * 60, 'was %r' % data) if verbose: print data fd = d.fd() if verbose: print fd c = d.cursor() rec = c.first() while rec: if verbose: print rec rec = c.next() c.set(50) rec = c.current() if verbose: print rec c.put(-1, "a replacement record", db.DB_CURRENT) c.set(50) rec = c.current() self.assertEqual(rec, (50, "a replacement record")) if verbose: print rec rec = c.set_range(30) if verbose: print rec # test that non-existent key lookups work (and that # DBC_set_range doesn't have a memleak under valgrind) rec = c.set_range(999999) self.assertEqual(rec, None) if verbose: print rec c.close() d.close() d = db.DB() d.open(self.filename) c = d.cursor() # put a record beyond the consecutive end of the recno's d[100] = "way out there" self.assertEqual(d[100], "way out there") try: data = d[99] except KeyError: pass else: self.fail("expected exception") try: d.get(99) except db.DBKeyEmptyError, val: if get_returns_none: self.fail("unexpected DBKeyEmptyError exception") else: if sys.version_info < (2, 6) : self.assertEqual(val[0], db.DB_KEYEMPTY) else : self.assertEqual(val.args[0], db.DB_KEYEMPTY) if verbose: print val else: if not get_returns_none: self.fail("expected exception") rec = c.set(40) while rec: if verbose: print rec rec = c.next() c.close() d.close() def test02_WithSource(self): """ A Recno file that is given a "backing source file" is essentially a simple ASCII file. Normally each record is delimited by \n and so is just a line in the file, but you can set a different record delimiter if needed. """ homeDir = get_new_environment_path() self.homeDir = homeDir source = os.path.join(homeDir, 'test_recno.txt') if not os.path.isdir(homeDir): os.mkdir(homeDir) f = open(source, 'w') # create the file f.close() d = db.DB() # This is the default value, just checking if both int d.set_re_delim(0x0A) d.set_re_delim('\n') # and char can be used... d.set_re_source(source) d.open(self.filename, db.DB_RECNO, db.DB_CREATE) data = "The quick brown fox jumped over the lazy dog".split() for datum in data: d.append(datum) d.sync() d.close() # get the text from the backing source f = open(source, 'r') text = f.read() f.close() text = text.strip() if verbose: print text print data print text.split('\n') self.assertEqual(text.split('\n'), data) # open as a DB again d = db.DB() d.set_re_source(source) d.open(self.filename, db.DB_RECNO) d[3] = 'reddish-brown' d[8] = 'comatose' d.sync() d.close() f = open(source, 'r') text = f.read() f.close() text = text.strip() if verbose: print text print text.split('\n') self.assertEqual(text.split('\n'), "The quick reddish-brown fox jumped over the comatose dog".split()) def test03_FixedLength(self): d = db.DB() d.set_re_len(40) # fixed length records, 40 bytes long d.set_re_pad('-') # sets the pad character... d.set_re_pad(45) # ...test both int and char d.open(self.filename, db.DB_RECNO, db.DB_CREATE) for x in string.ascii_letters: d.append(x * 35) # These will be padded d.append('.' * 40) # this one will be exact try: # this one will fail d.append('bad' * 20) except db.DBInvalidArgError, val: if sys.version_info < (2, 6) : self.assertEqual(val[0], db.EINVAL) else : self.assertEqual(val.args[0], db.EINVAL) if verbose: print val else: self.fail("expected exception") c = d.cursor() rec = c.first() while rec: if verbose: print rec rec = c.next() c.close() d.close() def test04_get_size_empty(self) : d = db.DB() d.open(self.filename, dbtype=db.DB_RECNO, flags=db.DB_CREATE) row_id = d.append(' ') self.assertEqual(1, d.get_size(key=row_id)) row_id = d.append('') self.assertEqual(0, d.get_size(key=row_id)) #---------------------------------------------------------------------- def test_suite(): return unittest.makeSuite(SimpleRecnoTestCase) if __name__ == '__main__': unittest.main(defaultTest='test_suite')