#!/usr/bin/env python # -*- coding: utf-8 -*- """ Unit tests for gluon.sql """ import sys import os if os.path.isdir('gluon'): sys.path.append(os.path.realpath('gluon')) else: sys.path.append(os.path.realpath('../')) import unittest import datetime from dal import DAL, Field, Table, SQLALL ALLOWED_DATATYPES = [ 'string', 'text', 'integer', 'boolean', 'double', 'blob', 'date', 'time', 'datetime', 'upload', 'password', ] def setUpModule(): pass def tearDownModule(): if os.path.isfile('sql.log'): os.unlink('sql.log') class TestFields(unittest.TestCase): def testFieldName(self): # Check that Fields cannot start with underscores self.assertRaises(SyntaxError, Field, '_abc', 'string') # Check that Fields cannot contain punctuation other than underscores self.assertRaises(SyntaxError, Field, 'a.bc', 'string') # Check that Fields cannot be a name of a method or property of Table for x in ['drop', 'on', 'truncate']: self.assertRaises(SyntaxError, Field, x, 'string') # Check that Fields allows underscores in the body of a field name. self.assert_(Field('a_bc', 'string'), "Field isn't allowing underscores in fieldnames. It should.") def testFieldTypes(self): # Check that string, text, and password default length is 512 for typ in ['string', 'password']: self.assert_(Field('abc', typ).length == 512, "Default length for type '%s' is not 512 or 255" % typ) # Check that upload default length is 512 self.assert_(Field('abc', 'upload').length == 512, "Default length for type 'upload' is not 128") # Check that Tables passed in the type creates a reference self.assert_(Field('abc', Table(None, 'temp')).type == 'reference temp', 'Passing an Table does not result in a reference type.') def testFieldLabels(self): # Check that a label is successfully built from the supplied fieldname self.assert_(Field('abc', 'string').label == 'Abc', 'Label built is incorrect') self.assert_(Field('abc_def', 'string').label == 'Abc Def', 'Label built is incorrect') def testFieldFormatters(self): # Formatter should be called Validator # Test the default formatters for typ in ALLOWED_DATATYPES: f = Field('abc', typ) if typ not in ['date', 'time', 'datetime']: isinstance(f.formatter('test'), str) else: isinstance(f.formatter(datetime.datetime.now()), str) def testRun(self): db = DAL('sqlite:memory:') for ft in ['string', 'text', 'password', 'upload', 'blob']: db.define_table('t', Field('a', ft, default='')) self.assertEqual(db.t.insert(a='x'), 1) self.assertEqual(db().select(db.t.a)[0].a, 'x') db.t.drop() db.define_table('t', Field('a', 'integer', default=1)) self.assertEqual(db.t.insert(a=3), 1) self.assertEqual(db().select(db.t.a)[0].a, 3) db.t.drop() db.define_table('t', Field('a', 'double', default=1)) self.assertEqual(db.t.insert(a=3.1), 1) self.assertEqual(db().select(db.t.a)[0].a, 3.1) db.t.drop() db.define_table('t', Field('a', 'boolean', default=True)) self.assertEqual(db.t.insert(a=True), 1) self.assertEqual(db().select(db.t.a)[0].a, True) db.t.drop() db.define_table('t', Field('a', 'date', default=datetime.date.today())) t0 = datetime.date.today() self.assertEqual(db.t.insert(a=t0), 1) self.assertEqual(db().select(db.t.a)[0].a, t0) db.t.drop() db.define_table('t', Field('a', 'datetime', default=datetime.datetime.today())) t0 = datetime.datetime( 1971, 12, 21, 10, 30, 55, 0, ) self.assertEqual(db.t.insert(a=t0), 1) self.assertEqual(db().select(db.t.a)[0].a, t0) db.t.drop() db.define_table('t', Field('a', 'time', default='11:30')) t0 = datetime.time(10, 30, 55) self.assertEqual(db.t.insert(a=t0), 1) self.assertEqual(db().select(db.t.a)[0].a, t0) db.t.drop() class TestAll(unittest.TestCase): def setUp(self): self.pt = Table(None,'PseudoTable',Field('name'),Field('birthdate')) def testSQLALL(self): ans = 'PseudoTable.id, PseudoTable.name, PseudoTable.birthdate' self.assertEqual(str(SQLALL(self.pt)), ans) class TestTable(unittest.TestCase): def testTableCreation(self): # Check for error when not passing type other than Field or Table self.assertRaises(SyntaxError, Table, None, 'test', None) persons = Table(None, 'persons', Field('firstname','string'), Field('lastname', 'string')) # Does it have the correct fields? self.assert_(set(persons.fields).issuperset(set(['firstname', 'lastname']))) # ALL is set correctly self.assert_('persons.firstname, persons.lastname' in str(persons.ALL)) def testTableAlias(self): db = DAL('sqlite:memory:') persons = Table(db, 'persons', Field('firstname', 'string'), Field('lastname', 'string')) aliens = persons.with_alias('aliens') # Are the different table instances with the same fields self.assert_(persons is not aliens) self.assert_(set(persons.fields) == set(aliens.fields)) def testTableInheritance(self): persons = Table(None, 'persons', Field('firstname', 'string'), Field('lastname', 'string')) customers = Table(None, 'customers', Field('items_purchased', 'integer'), persons) self.assert_(set(customers.fields).issuperset(set( ['items_purchased', 'firstname', 'lastname']))) class TestInsert(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a')) self.assertEqual(db.t.insert(a='1'), 1) self.assertEqual(db.t.insert(a='1'), 2) self.assertEqual(db.t.insert(a='1'), 3) self.assertEqual(db(db.t.a == '1').count(), 3) self.assertEqual(db(db.t.a == '1').update(a='2'), 3) self.assertEqual(db(db.t.a == '2').count(), 3) self.assertEqual(db(db.t.a == '2').delete(), 3) db.t.drop() class TestSelect(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a')) self.assertEqual(db.t.insert(a='1'), 1) self.assertEqual(db.t.insert(a='2'), 2) self.assertEqual(db.t.insert(a='3'), 3) self.assertEqual(len(db(db.t.id > 0).select()), 3) self.assertEqual(db(db.t.id > 0).select(orderby=~db.t.a | db.t.id)[0].a, '3') self.assertEqual(len(db(db.t.id > 0).select(limitby=(1, 2))), 1) self.assertEqual(db(db.t.id > 0).select(limitby=(1, 2))[0].a, '2') self.assertEqual(len(db().select(db.t.ALL)), 3) self.assertEqual(len(db(db.t.a == None).select()), 0) self.assertEqual(len(db(db.t.a != None).select()), 3) self.assertEqual(len(db(db.t.a > '1').select()), 2) self.assertEqual(len(db(db.t.a >= '1').select()), 3) self.assertEqual(len(db(db.t.a == '1').select()), 1) self.assertEqual(len(db(db.t.a != '1').select()), 2) self.assertEqual(len(db(db.t.a < '3').select()), 2) self.assertEqual(len(db(db.t.a <= '3').select()), 3) self.assertEqual(len(db(db.t.a > '1')(db.t.a < '3').select()), 1) self.assertEqual(len(db((db.t.a > '1') & (db.t.a < '3')).select()), 1) self.assertEqual(len(db((db.t.a > '1') | (db.t.a < '3')).select()), 3) self.assertEqual(len(db((db.t.a > '1') & ~(db.t.a > '2')).select()), 1) self.assertEqual(len(db(~(db.t.a > '1') & (db.t.a > '2')).select()), 0) db.t.drop() class TestBelongs(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a')) self.assertEqual(db.t.insert(a='1'), 1) self.assertEqual(db.t.insert(a='2'), 2) self.assertEqual(db.t.insert(a='3'), 3) self.assertEqual(len(db(db.t.a.belongs(('1', '3'))).select()), 2) self.assertEqual(len(db(db.t.a.belongs(db(db.t.id > 2)._select(db.t.a))).select()), 1) self.assertEqual(len(db(db.t.a.belongs(db(db.t.a.belongs(('1', '3')))._select(db.t.a))).select()), 2) self.assertEqual(len(db(db.t.a.belongs(db(db.t.a.belongs(db (db.t.a.belongs(('1', '3')))._select(db.t.a)))._select( db.t.a))).select()), 2) db.t.drop() class TestLike(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a')) self.assertEqual(db.t.insert(a='abc'), 1) self.assertEqual(len(db(db.t.a.like('a%')).select()), 1) self.assertEqual(len(db(db.t.a.like('%b%')).select()), 1) self.assertEqual(len(db(db.t.a.like('%c')).select()), 1) self.assertEqual(len(db(db.t.a.like('%d%')).select()), 0) self.assertEqual(len(db(db.t.a.lower().like('A%')).select()), 1) self.assertEqual(len(db(db.t.a.lower().like('%B%')).select()), 1) self.assertEqual(len(db(db.t.a.lower().like('%C')).select()), 1) self.assertEqual(len(db(db.t.a.upper().like('A%')).select()), 1) self.assertEqual(len(db(db.t.a.upper().like('%B%')).select()), 1) self.assertEqual(len(db(db.t.a.upper().like('%C')).select()), 1) db.t.drop() class TestDatetime(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a', 'datetime')) self.assertEqual(db.t.insert(a=datetime.datetime(1971, 12, 21, 11, 30)), 1) self.assertEqual(db.t.insert(a=datetime.datetime(1971, 11, 21, 10, 30)), 2) self.assertEqual(db.t.insert(a=datetime.datetime(1970, 12, 21, 9, 30)), 3) self.assertEqual(len(db(db.t.a == datetime.datetime(1971, 12, 21, 11, 30)).select()), 1) self.assertEqual(len(db(db.t.a.year() == 1971).select()), 2) self.assertEqual(len(db(db.t.a.month() == 12).select()), 2) self.assertEqual(len(db(db.t.a.day() == 21).select()), 3) self.assertEqual(len(db(db.t.a.hour() == 11).select()), 1) self.assertEqual(len(db(db.t.a.minutes() == 30).select()), 3) self.assertEqual(len(db(db.t.a.seconds() == 0).select()), 3) db.t.drop() class TestExpressions(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a', 'integer')) self.assertEqual(db.t.insert(a=1), 1) self.assertEqual(db.t.insert(a=2), 2) self.assertEqual(db.t.insert(a=3), 3) self.assertEqual(db(db.t.a == 3).update(a=db.t.a + 1), 1) self.assertEqual(len(db(db.t.a == 4).select()), 1) db.t.drop() class TestJoin(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t1', Field('a')) db.define_table('t2', Field('a'), Field('b', db.t1)) i1 = db.t1.insert(a='1') i2 = db.t1.insert(a='2') i3 = db.t1.insert(a='3') db.t2.insert(a='4', b=i1) db.t2.insert(a='5', b=i2) db.t2.insert(a='6', b=i2) self.assertEqual(len(db(db.t1.id == db.t2.b).select(orderby=db.t1.a | db.t2.a)), 3) self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.a | db.t2.a)[2].t1.a, '2') self.assertEqual(db(db.t1.id == db.t2.b).select(orderby=db.t1.a | db.t2.a)[2].t2.a, '6') self.assertEqual(len(db().select(db.t1.ALL, db.t2.ALL, left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a)), 4) self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a)[2].t1.a, '2') self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a)[2].t2.a, '6') self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a)[3].t1.a, '3') self.assertEqual(db().select(db.t1.ALL, db.t2.ALL, left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a)[3].t2.a, None) self.assertEqual(len(db().select(db.t1.ALL, db.t2.id.count(), left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a, groupby=db.t1.a)), 3) self.assertEqual(db().select(db.t1.ALL, db.t2.id.count(), left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a, groupby=db.t1.a)[0]._extra[db.t2.id.count()], 1) self.assertEqual(db().select(db.t1.ALL, db.t2.id.count(), left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a, groupby=db.t1.a)[1]._extra[db.t2.id.count()], 2) self.assertEqual(db().select(db.t1.ALL, db.t2.id.count(), left=db.t2.on(db.t1.id == db.t2.b), orderby=db.t1.a | db.t2.a, groupby=db.t1.a)[2]._extra[db.t2.id.count()], 0) db.t1.drop() db.t2.drop() class TestMinMaxSum(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a', 'integer')) self.assertEqual(db.t.insert(a=1), 1) self.assertEqual(db.t.insert(a=2), 2) self.assertEqual(db.t.insert(a=3), 3) s = db.t.a.min() self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 1) s = db.t.a.max() self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 3) s = db.t.a.sum() self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 6) s = db.t.a.count() self.assertEqual(db(db.t.id > 0).select(s)[0]._extra[s], 3) db.t.drop() #class TestCache(unittest. # def testRun(self): # cache = cache.ram # db = DAL('sqlite:memory:') # db.define_table('t', Field('a')) # db.t.insert(a='1') # r1 = db().select(db.t.ALL, cache=(cache, 1000)) # db.t.insert(a='1') # r2 = db().select(db.t.ALL, cache=(cache, 1000)) # self.assertEqual(r1.response, r2.response) # db.t.drop() class TestMigrations(unittest.TestCase): def testRun(self): db = DAL('sqlite://.storage.db') db.define_table('t', Field('a'), migrate='.storage.table') db.commit() db = DAL('sqlite://.storage.db') db.define_table('t', Field('a'), Field('b'), migrate='.storage.table') db.commit() db = DAL('sqlite://.storage.db') db.define_table('t', Field('a'), Field('b', 'text'), migrate='.storage.table') db.commit() db = DAL('sqlite://.storage.db') db.define_table('t', Field('a'), migrate='.storage.table') db.t.drop() db.commit() def tearDown(self): if os.path.exists('.storage.db'): os.unlink('.storage.db') if os.path.exists('.storage.table'): os.unlink('.storage.table') class TestReferece(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('name'), Field('a','reference t')) db.commit() x = db.t.insert(name='max') assert x.id == 1 assert x['id'] == 1 x.a = x assert x.a == 1 x.update_record() y = db.t[1] assert y.a == 1 assert y.a.a.a.a.a.a.name == 'max' z=db.t.insert(name='xxx', a = y) assert z.a == y.id db.t.drop() db.commit() class TestClientLevelOps(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a')) db.commit() db.t.insert(a="test") rows1 = db(db.t.id>0).select() rows2 = db(db.t.id>0).select() rows3 = rows1 & rows2 assert len(rows3) == 2 rows4 = rows1 | rows2 assert len(rows4) == 1 rows5 = rows1.find(lambda row: row.a=="test") assert len(rows5) == 1 rows6 = rows2.exclude(lambda row: row.a=="test") assert len(rows6) == 1 rows7 = rows5.sort(lambda row: row.a) assert len(rows7) == 1 db.t.drop() db.commit() class TestVirtualFields(unittest.TestCase): def testRun(self): db = DAL('sqlite:memory:') db.define_table('t', Field('a')) db.commit() db.t.insert(a="test") class Compute: def a_upper(row): return row.t.a.upper() db.t.virtualfields.append(Compute()) assert db(db.t.id>0).select().first().a_upper == 'TEST' db.t.drop() db.commit() if __name__ == '__main__': unittest.main() tearDownModule()