I'm doing some performance work in our application. The primary
domain objects (called "models") are represented as tree-like
collections of persistent objects -- the largest existing model is
currently about 4600 objects, and we expect there will be models at
least 3 times that size.
The software performs various tasks that loop over most of the objects
making up a model. Profiling shows that a lot of time is being spent
in Durus code, retrieving the data for each object and unpickling it
as the code loops over one subset of objects or another.
Because I know that essentially all of a model's objects will be
needed, it might improvement performance if there was a way to bring
over all the objects making up a model in one operation, saving 4600
round-trip times.
The draft patch below adds a Connection.page_in(obj) method that
retrieves obj, and the closure of all objects referenced by obj, in
one operation.
Questions:
* Does the patch use Durus internals correctly? In particular I
wonder about the StorageServer.page_in() method; it checks that
oid isn't on the .invalid list, but I didn't make it
check every OID in the resulting closure of references.
I suspect it should, though; am I correct?
* Connection.page_in() contains code that largely duplicates
Connection.load(). Should this common code be factored out?
On the other hand, maybe saving the extra function calls would be
worth it.
* I don't like the name page_in(), but can't think of a better one.
OK, that's not a question.
--amk
Index: connection.py
===================================================================
--- connection.py (revision 13735)
+++ connection.py (working copy)
@@ -257,7 +257,35 @@
self.abort()
self.storage.pack()
+ def page_in(self, obj):
+ # XXX Check that the object hasn't already been paged in.
+ paged_in = False
+ if not paged_in:
+ self.shrink_cache()
+ obj_list = self.storage.page_in(obj._p_oid)
+ orig_oid = obj._p_oid
+ for oid, pickle in obj_list:
+ new_obj = self.cache.get(oid)
+ if new_obj is not None:
+ ##print 'filling in object', repr(oid), id(new_obj)
+ state = self.reader.get_state(pickle)
+ new_obj.__setstate__(state)
+ new_obj._p_set_status_saved()
+ else:
+ # Not in cache, so create new object for it
+ new_obj = self.reader.get_ghost(pickle)
+ ##print 'creating object', repr(oid), id(new_obj)
+ new_obj._p_oid = oid
+ new_obj._p_connection = self
+ new_obj._p_set_status_ghost()
+
+ # Fill in state for the object
+ state = self.reader.get_state(pickle)
+ new_obj.__setstate__(state)
+ new_obj._p_set_status_saved()
+ self.cache[oid] = new_obj
+
class Cache(object):
def __init__(self, size):
Index: storage.py
===================================================================
--- storage.py (revision 13735)
+++ storage.py (working copy)
@@ -16,6 +16,12 @@
"""
raise NotImplementedError
+ def page_in (self, oid):
+ """(oid) -> [(oid, record)]
+ Returns the closure of all the records referenced by the specified
+ OID.
+ """
+
def begin(self):
"""
Begin a commit.
Index: file_storage.py
===================================================================
--- file_storage.py (revision 13735)
+++ file_storage.py (working copy)
@@ -141,6 +141,26 @@
self.fp.seek(offset)
return self._read_block()
+ def page_in(self, oid):
+ output = []
+ queue = [oid]
+ seen = set()
+ while len(queue):
+ ##print len(queue), queue[:15]
+ oid = queue.pop(0)
+ offset = self.index[oid]
+ self.fp.seek(offset)
+ record = self._read_block()
+ oid2, data, refdata = unpack_record(record)
+ assert oid == oid2
+ output.append((oid, data))
+ seen.add(oid)
+ ref_oids = [oid for oid in split_oids(refdata)
+ if oid not in seen]
+ queue.extend(ref_oids)
+
+ return output
+
def begin(self):
pass
Index: client_storage.py
===================================================================
--- client_storage.py (revision 13735)
+++ client_storage.py (working copy)
@@ -88,6 +88,27 @@
if status != STATUS_OKAY:
raise ProtocolError, 'server returned invalid status %r' % status
+ def page_in(self, oid):
+ self.s.sendall('R' + oid)
+ status = recv(self.s, 1)
+ if status == STATUS_OKAY:
+ pass
+ elif status == STATUS_INVALID:
+ raise ReadConflictError([oid])
+ elif status == STATUS_KEYERROR:
+ raise DurusKeyError(oid)
+ else:
+ raise ProtocolError, 'server returned invalid status %r' % status
+ numrecs = u32(recv(self.s, 4))
+ output = []
+ for i in xrange(numrecs):
+ oid = recv(self.s, 8)
+ rlen = u32(recv(self.s, 4))
+ data = recv(self.s, rlen)
+ output.append((oid, data))
+ return output
+
+
def gen_oid_record(self):
"""() -> sequence([oid:str, record:str])
A FileStorage will do a better job of this.
Index: storage_server.py
===================================================================
--- storage_server.py (revision 13735)
+++ storage_server.py (working copy)
@@ -182,6 +182,21 @@
self.packer = self.storage.get_packer()
s.sendall(STATUS_OKAY)
+ def handle_R(self, s):
+ # closure of references
+ oid = recv(s, 8)
+ if oid in self._find_client(s).invalid:
+ s.sendall(STATUS_INVALID)
+ return
+
+ s.sendall(STATUS_OKAY)
+ output = self.storage.page_in(oid)
+
+ s.sendall(p32(len(output)))
+ for oid, data in output:
+ s.sendall(oid + p32(len(data)) + data)
+
+
def handle_Q(self, s):
# graceful quit
log(20, 'Quit')