"""
$URL: svn+ssh://svn.mikewatkins.ca/repo/trunk/parlez/lib/test/utest_pgsql_storage.py $
$Id: utest_pgsql_storage.py 43 2007-09-12 23:25:01Z mw $
"""
from durus.connection import Connection
from durus.serialize import pack_record
from durus.utils import int8_to_str
from parlez.pgsql_storage import PostgresqlStorage
from sancho.utest import UTest, raises
import pgsql

DATABASE_NAME = 'durus'
TABLE_NAME = 'durus_test'

class PgsqlTest(UTest):

    def _pre(self):
        import pgsql
        self.db = pgsql.connect(database=DATABASE_NAME)
        try:
            self.db.execute('drop table %s' % TABLE_NAME)
            self.db.commit()
        except:
            pass

    def if_postgres_running(self):
        # TODO
        # test for postgres in process list
        # make this a function and have the entire test run if True
        pass

    def check_init(self):
        assert self.db
        PostgresqlStorage(self.db, table_name=TABLE_NAME)
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 0
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        assert con
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 1
        assert self.db.execute('select p_oid from %s' % TABLE_NAME).fetchone()[0] == 0

    def check_postgresql_storage(self):
        # lifted from durus utest_storage.py "check_memory_storage"
        b = PostgresqlStorage(self.db, table_name=TABLE_NAME)
        assert b.new_oid() == int8_to_str(0)
        assert b.new_oid() == int8_to_str(1)
        assert b.new_oid() == int8_to_str(2)
        raises(KeyError, b.load, int8_to_str(0))
        record = pack_record(int8_to_str(0), 'ok', '')
        b.begin()
        b.store(int8_to_str(0), record)
        b.end()
        b.sync()
        b.begin()
        b.store(int8_to_str(1), pack_record(int8_to_str(1), 'no', ''))
        b.end()
        assert len(list(b.gen_oid_record())) == 1
        assert record == b.load(int8_to_str(0))
        records = b.bulk_load([int8_to_str(0), int8_to_str(1)])
        assert len(list(records)) == 2
        records = b.bulk_load([int8_to_str(0), int8_to_str(1), int8_to_str(2)])
        raises(KeyError, list, records)

    def check_size(self):
        # won't return an accurate value unless vacuum run; make sure it doesn't barf
        # just the same
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        con.storage.get_size()
        self.db.execute('vacuum %s' % TABLE_NAME)
        assert con.storage.get_size() == 1

    def check_root(self):
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        root = con.get_root()
        assert root is not None

    def check_pack(self):
        from dulcinea.news import NewsItem, NewsDatabase
        con = Connection(PostgresqlStorage(self.db, table_name=TABLE_NAME))
        root = con.get_root()
        news = NewsDatabase()
        root['news'] = news
        for i in range(10):
            item = NewsItem()
            item.set_title('%d news item' % i)
            news.add(item)
        con.commit()
        # the object count may fail if NewsItem/NewsDatabase change in the future
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 15
        con.pack()
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 15
        del news.mapping[1]
        con.commit()
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 15
        con.pack()
        assert self.db.execute('select count(p_oid) from %s' % TABLE_NAME).fetchone()[0] == 14
        assert self.db.execute('select count(pack) from %s where pack != 0' %\
                               TABLE_NAME).fetchone()[0] == 0




if __name__ == '__main__':
    PgsqlTest()
