durusmail: durus-users: Draft patch for "paging in" object closures
Draft patch for "paging in" object closures
Draft patch for "paging in" object closures
A.M. Kuchling
I'm doing some performance work in our application.  The primary
domain objects (called "models") are represented as tree-like
collections of persistent objects -- the largest existing model is
currently about 4600 objects, and we expect there will be models at
least 3 times that size.

The software performs various tasks that loop over most of the objects
making up a model.  Profiling shows that a lot of time is being spent
in Durus code, retrieving the data for each object and unpickling it
as the code loops over one subset of objects or another.

Because I know that essentially all of a model's objects will be
needed, it might improvement performance if there was a way to bring
over all the objects making up a model in one operation, saving 4600
round-trip times.

The draft patch below adds a Connection.page_in(obj) method that
retrieves obj, and the closure of all objects referenced by obj, in
one operation.


        * Does the patch use Durus internals correctly?  In particular I
          wonder about the StorageServer.page_in() method; it checks that
          oid isn't on the .invalid list, but I didn't make it
          check every OID in the resulting closure of references.
          I suspect it should, though; am I correct?

        * Connection.page_in() contains code that largely duplicates
          Connection.load().  Should this common code be factored out?
          On the other hand, maybe saving the extra function calls would be
          worth it.

        * I don't like the name page_in(), but can't think of a better one.
          OK, that's not a question.


Index: connection.py
--- connection.py       (revision 13735)
+++ connection.py       (working copy)
@@ -257,7 +257,35 @@

+    def page_in(self, obj):
+        # XXX Check that the object hasn't already been paged in.
+        paged_in = False

+        if not paged_in:
+            self.shrink_cache()
+            obj_list = self.storage.page_in(obj._p_oid)
+            orig_oid = obj._p_oid
+            for oid, pickle in obj_list:
+                new_obj = self.cache.get(oid)
+                if new_obj is not None:
+                    ##print 'filling in object', repr(oid), id(new_obj)
+                    state = self.reader.get_state(pickle)
+                    new_obj.__setstate__(state)
+                    new_obj._p_set_status_saved()
+                else:
+                    # Not in cache, so create new object for it
+                    new_obj = self.reader.get_ghost(pickle)
+                    ##print 'creating object', repr(oid), id(new_obj)
+                    new_obj._p_oid = oid
+                    new_obj._p_connection = self
+                    new_obj._p_set_status_ghost()
+                    # Fill in state for the object
+                    state = self.reader.get_state(pickle)
+                    new_obj.__setstate__(state)
+                    new_obj._p_set_status_saved()
+                    self.cache[oid] = new_obj
 class Cache(object):

     def __init__(self, size):
Index: storage.py
--- storage.py  (revision 13735)
+++ storage.py  (working copy)
@@ -16,6 +16,12 @@
         raise NotImplementedError

+    def page_in (self, oid):
+        """(oid) -> [(oid, record)]
+        Returns the closure of all the records referenced by the specified
+        OID.
+        """
     def begin(self):
         Begin a commit.
Index: file_storage.py
--- file_storage.py     (revision 13735)
+++ file_storage.py     (working copy)
@@ -141,6 +141,26 @@
         return self._read_block()

+    def page_in(self, oid):
+        output = []
+        queue = [oid]
+        seen = set()
+        while len(queue):
+            ##print len(queue), queue[:15]
+            oid = queue.pop(0)
+            offset = self.index[oid]
+            self.fp.seek(offset)
+            record = self._read_block()
+            oid2, data, refdata = unpack_record(record)
+            assert oid == oid2
+            output.append((oid, data))
+            seen.add(oid)
+            ref_oids = [oid for oid in split_oids(refdata)
+                        if oid not in seen]
+            queue.extend(ref_oids)
+        return output
     def begin(self):

Index: client_storage.py
--- client_storage.py   (revision 13735)
+++ client_storage.py   (working copy)
@@ -88,6 +88,27 @@
         if status != STATUS_OKAY:
             raise ProtocolError, 'server returned invalid status %r' % status

+    def page_in(self, oid):
+        self.s.sendall('R' + oid)
+        status = recv(self.s, 1)
+        if status == STATUS_OKAY:
+            pass
+        elif status == STATUS_INVALID:
+            raise ReadConflictError([oid])
+        elif status == STATUS_KEYERROR:
+            raise DurusKeyError(oid)
+        else:
+            raise ProtocolError, 'server returned invalid status %r' % status
+        numrecs = u32(recv(self.s, 4))
+        output = []
+        for i in xrange(numrecs):
+            oid = recv(self.s, 8)
+            rlen = u32(recv(self.s, 4))
+            data = recv(self.s, rlen)
+            output.append((oid, data))
+        return output
     def gen_oid_record(self):
         """() -> sequence([oid:str, record:str])
         A FileStorage will do a better job of this.
Index: storage_server.py
--- storage_server.py   (revision 13735)
+++ storage_server.py   (working copy)
@@ -182,6 +182,21 @@
             self.packer = self.storage.get_packer()

+    def handle_R(self, s):
+        # closure of references
+        oid = recv(s, 8)
+        if oid in self._find_client(s).invalid:
+            s.sendall(STATUS_INVALID)
+            return
+        s.sendall(STATUS_OKAY)
+        output = self.storage.page_in(oid)
+        s.sendall(p32(len(output)))
+        for oid, data in output:
+            s.sendall(oid + p32(len(data)) + data)
     def handle_Q(self, s):
         # graceful quit
         log(20, 'Quit')