I'm doing some performance work in our application. The primary domain objects (called "models") are represented as tree-like collections of persistent objects -- the largest existing model is currently about 4600 objects, and we expect there will be models at least 3 times that size. The software performs various tasks that loop over most of the objects making up a model. Profiling shows that a lot of time is being spent in Durus code, retrieving the data for each object and unpickling it as the code loops over one subset of objects or another. Because I know that essentially all of a model's objects will be needed, it might improvement performance if there was a way to bring over all the objects making up a model in one operation, saving 4600 round-trip times. The draft patch below adds a Connection.page_in(obj) method that retrieves obj, and the closure of all objects referenced by obj, in one operation. Questions: * Does the patch use Durus internals correctly? In particular I wonder about the StorageServer.page_in() method; it checks that oid isn't on the .invalid list, but I didn't make it check every OID in the resulting closure of references. I suspect it should, though; am I correct? * Connection.page_in() contains code that largely duplicates Connection.load(). Should this common code be factored out? On the other hand, maybe saving the extra function calls would be worth it. * I don't like the name page_in(), but can't think of a better one. OK, that's not a question. --amk Index: connection.py =================================================================== --- connection.py (revision 13735) +++ connection.py (working copy) @@ -257,7 +257,35 @@ self.abort() self.storage.pack() + def page_in(self, obj): + # XXX Check that the object hasn't already been paged in. + paged_in = False + if not paged_in: + self.shrink_cache() + obj_list = self.storage.page_in(obj._p_oid) + orig_oid = obj._p_oid + for oid, pickle in obj_list: + new_obj = self.cache.get(oid) + if new_obj is not None: + ##print 'filling in object', repr(oid), id(new_obj) + state = self.reader.get_state(pickle) + new_obj.__setstate__(state) + new_obj._p_set_status_saved() + else: + # Not in cache, so create new object for it + new_obj = self.reader.get_ghost(pickle) + ##print 'creating object', repr(oid), id(new_obj) + new_obj._p_oid = oid + new_obj._p_connection = self + new_obj._p_set_status_ghost() + + # Fill in state for the object + state = self.reader.get_state(pickle) + new_obj.__setstate__(state) + new_obj._p_set_status_saved() + self.cache[oid] = new_obj + class Cache(object): def __init__(self, size): Index: storage.py =================================================================== --- storage.py (revision 13735) +++ storage.py (working copy) @@ -16,6 +16,12 @@ """ raise NotImplementedError + def page_in (self, oid): + """(oid) -> [(oid, record)] + Returns the closure of all the records referenced by the specified + OID. + """ + def begin(self): """ Begin a commit. Index: file_storage.py =================================================================== --- file_storage.py (revision 13735) +++ file_storage.py (working copy) @@ -141,6 +141,26 @@ self.fp.seek(offset) return self._read_block() + def page_in(self, oid): + output = [] + queue = [oid] + seen = set() + while len(queue): + ##print len(queue), queue[:15] + oid = queue.pop(0) + offset = self.index[oid] + self.fp.seek(offset) + record = self._read_block() + oid2, data, refdata = unpack_record(record) + assert oid == oid2 + output.append((oid, data)) + seen.add(oid) + ref_oids = [oid for oid in split_oids(refdata) + if oid not in seen] + queue.extend(ref_oids) + + return output + def begin(self): pass Index: client_storage.py =================================================================== --- client_storage.py (revision 13735) +++ client_storage.py (working copy) @@ -88,6 +88,27 @@ if status != STATUS_OKAY: raise ProtocolError, 'server returned invalid status %r' % status + def page_in(self, oid): + self.s.sendall('R' + oid) + status = recv(self.s, 1) + if status == STATUS_OKAY: + pass + elif status == STATUS_INVALID: + raise ReadConflictError([oid]) + elif status == STATUS_KEYERROR: + raise DurusKeyError(oid) + else: + raise ProtocolError, 'server returned invalid status %r' % status + numrecs = u32(recv(self.s, 4)) + output = [] + for i in xrange(numrecs): + oid = recv(self.s, 8) + rlen = u32(recv(self.s, 4)) + data = recv(self.s, rlen) + output.append((oid, data)) + return output + + def gen_oid_record(self): """() -> sequence([oid:str, record:str]) A FileStorage will do a better job of this. Index: storage_server.py =================================================================== --- storage_server.py (revision 13735) +++ storage_server.py (working copy) @@ -182,6 +182,21 @@ self.packer = self.storage.get_packer() s.sendall(STATUS_OKAY) + def handle_R(self, s): + # closure of references + oid = recv(s, 8) + if oid in self._find_client(s).invalid: + s.sendall(STATUS_INVALID) + return + + s.sendall(STATUS_OKAY) + output = self.storage.page_in(oid) + + s.sendall(p32(len(output))) + for oid, data in output: + s.sendall(oid + p32(len(data)) + data) + + def handle_Q(self, s): # graceful quit log(20, 'Quit')