""" $URL$ $Id$ Provides an abstract DBAPIStorage; two working implementations are provided PostgresqlStorage (supporting psycopg2 driver) and SqliteStorage (supporting Python 2.5's built in sqlite3 DBAPI interface as well as pysqlite2). Motivation to do this thanks to a sqlite/apsw specific effort by Peter Wilkinson of Thirdfloor Software Works Pty. Ltd. """ from durus.serialize import unpack_record, split_oids from durus.connection import ROOT_OID from durus.utils import str_to_int8 from durus.storage import Storage class DBAPIStorage(Storage): """ This is an abstract class which SQL-based Durus storages can inherit. This class assumes a Python DB API compatible connection. """ def __init__(self, sql_connection, table_name='durus_records', **kwargs): if self.__class__ is DBAPIStorage: raise RuntimeError("DBAPIStorage is abstract") self._sql_connection = sql_connection self._sql_cursor = self._sql_connection.cursor() self._sql_table_name = table_name self.options = kwargs if not self._sql_table_exists(): self._sql_create_table() # Durus, not sql, transactions self.transaction = None def _sql_get_cursor(self): return self._sql_cursor def _sql_create_table(self): raise NotImplementedError, ('Table creation is not portable, override ' 'in your subclass.') def _sql_table_exists(self): """() -> boolean Postgres: SELECT tablename from pg_tables where schemaname='public' AND tablename= sqlite: 'PRAGMA table_info(%s)' % table_name """ raise NotImplementedError, ('Table existence detection is not portable') def new_oid(self): """() -> str Must return return int8_to_str() """ raise NotImplementedError, ('OID generation is not portable') def load(self, oid): """(oid:str) -> record One of the few portable queries! """ # transaction wrapper not necessary but keeps code tidy using it cursor = self._sql_get_cursor() cursor.execute('SELECT record FROM %s WHERE p_oid = %s' % (self._sql_table_name, str_to_int8(oid))) result = cursor.fetchone() if result: return str(result[0]) else: raise KeyError def begin(self): """() -> None A new Durus transaction """ self.transaction = {} self._sql_get_cursor().execute('BEGIN') def store(self, oid, record): """(oid:str, record:object) -> None Adds record to the transaction queue """ assert len(oid) == 8 assert oid not in self.transaction self.transaction[oid] = record def end(self, handle_invalidations=None): def gen_items(): for oid, record in self.transaction.iteritems(): yield (buffer(record), str_to_int8(oid)) cursor = self._sql_get_cursor() cursor.executemany('''UPDATE %s set record=%%s WHERE p_oid = %%s''' %\ self._sql_table_name, gen_items()) self.transaction = None def sync(self): return [] def close(self): """Close database connections to the storage""" self._sql_get_cursor().close() if self._sql_connection: self._sql_connection.close() def pack(self): """() -> None A single pass packer; the SQL herein is portable across all db engines. """ assert not self.transaction def chunks(iterable, chunk_length): for i in xrange(0, len(iterable), chunk_length): yield iterable[i:i+chunk_length] todo = [ROOT_OID] seen = set() mark_as_packed = [] cursor = self._sql_get_cursor() cursor.execute('BEGIN') while todo: oid = todo.pop() if oid in seen: continue seen.add(oid) record = self.load(oid) record_oid, data, refdata = unpack_record(record) assert oid == record_oid todo.extend(split_oids(refdata)) mark_as_packed.append(str_to_int8(oid)) # too slow individually #cursor.execute("UPDATE %s SET pack = 1 WHERE p_oid = %%s" %\ #self._sql_table_name, (str_to_int8(oid),)) # low tech 'execute many' which is not quite twice as fast # as individual updates and executemany too for chunk in chunks(mark_as_packed, 500): sql = "UPDATE %s set pack = 1 WHERE p_oid in (%s)" %\ (self._sql_table_name, ','.join(map(str,chunk))) cursor.execute(sql) # The following executemany() should be faster, but psycopg2 just creates an # iterable and issues n update queries rather than using the backend # executemany capability; as of yet have not explored this situation # with sqlite. #cursor.executemany("UPDATE %s SET pack = 1 WHERE p_oid = %%s" %\ #self._sql_table_name, [(i,) for i in mark_as_packed]) cursor.execute("DELETE FROM %s WHERE pack = 0" % self._sql_table_name) cursor.execute("UPDATE %s SET pack = 0" % self._sql_table_name) cursor.execute('COMMIT')