durusmail: durus-users: Draft patch for "paging in" object closures
Draft patch for "paging in" object closures
2006-06-19
2006-06-20
Draft patch for "paging in" object closures
A.M. Kuchling
2006-06-19
I'm doing some performance work in our application.  The primary
domain objects (called "models") are represented as tree-like
collections of persistent objects -- the largest existing model is
currently about 4600 objects, and we expect there will be models at
least 3 times that size.

The software performs various tasks that loop over most of the objects
making up a model.  Profiling shows that a lot of time is being spent
in Durus code, retrieving the data for each object and unpickling it
as the code loops over one subset of objects or another.

Because I know that essentially all of a model's objects will be
needed, it might improvement performance if there was a way to bring
over all the objects making up a model in one operation, saving 4600
round-trip times.

The draft patch below adds a Connection.page_in(obj) method that
retrieves obj, and the closure of all objects referenced by obj, in
one operation.

Questions:

        * Does the patch use Durus internals correctly?  In particular I
          wonder about the StorageServer.page_in() method; it checks that
          oid isn't on the .invalid list, but I didn't make it
          check every OID in the resulting closure of references.
          I suspect it should, though; am I correct?

        * Connection.page_in() contains code that largely duplicates
          Connection.load().  Should this common code be factored out?
          On the other hand, maybe saving the extra function calls would be
          worth it.

        * I don't like the name page_in(), but can't think of a better one.
          OK, that's not a question.

--amk

Index: connection.py
===================================================================
--- connection.py       (revision 13735)
+++ connection.py       (working copy)
@@ -257,7 +257,35 @@
         self.abort()
         self.storage.pack()

+    def page_in(self, obj):
+        # XXX Check that the object hasn't already been paged in.
+        paged_in = False

+        if not paged_in:
+            self.shrink_cache()
+            obj_list = self.storage.page_in(obj._p_oid)
+            orig_oid = obj._p_oid
+            for oid, pickle in obj_list:
+                new_obj = self.cache.get(oid)
+                if new_obj is not None:
+                    ##print 'filling in object', repr(oid), id(new_obj)
+                    state = self.reader.get_state(pickle)
+                    new_obj.__setstate__(state)
+                    new_obj._p_set_status_saved()
+                else:
+                    # Not in cache, so create new object for it
+                    new_obj = self.reader.get_ghost(pickle)
+                    ##print 'creating object', repr(oid), id(new_obj)
+                    new_obj._p_oid = oid
+                    new_obj._p_connection = self
+                    new_obj._p_set_status_ghost()
+
+                    # Fill in state for the object
+                    state = self.reader.get_state(pickle)
+                    new_obj.__setstate__(state)
+                    new_obj._p_set_status_saved()
+                    self.cache[oid] = new_obj
+
 class Cache(object):

     def __init__(self, size):
Index: storage.py
===================================================================
--- storage.py  (revision 13735)
+++ storage.py  (working copy)
@@ -16,6 +16,12 @@
         """
         raise NotImplementedError

+    def page_in (self, oid):
+        """(oid) -> [(oid, record)]
+        Returns the closure of all the records referenced by the specified
+        OID.
+        """
+
     def begin(self):
         """
         Begin a commit.
Index: file_storage.py
===================================================================
--- file_storage.py     (revision 13735)
+++ file_storage.py     (working copy)
@@ -141,6 +141,26 @@
         self.fp.seek(offset)
         return self._read_block()

+    def page_in(self, oid):
+        output = []
+        queue = [oid]
+        seen = set()
+        while len(queue):
+            ##print len(queue), queue[:15]
+            oid = queue.pop(0)
+            offset = self.index[oid]
+            self.fp.seek(offset)
+            record = self._read_block()
+            oid2, data, refdata = unpack_record(record)
+            assert oid == oid2
+            output.append((oid, data))
+            seen.add(oid)
+            ref_oids = [oid for oid in split_oids(refdata)
+                        if oid not in seen]
+            queue.extend(ref_oids)
+
+        return output
+
     def begin(self):
         pass

Index: client_storage.py
===================================================================
--- client_storage.py   (revision 13735)
+++ client_storage.py   (working copy)
@@ -88,6 +88,27 @@
         if status != STATUS_OKAY:
             raise ProtocolError, 'server returned invalid status %r' % status

+    def page_in(self, oid):
+        self.s.sendall('R' + oid)
+        status = recv(self.s, 1)
+        if status == STATUS_OKAY:
+            pass
+        elif status == STATUS_INVALID:
+            raise ReadConflictError([oid])
+        elif status == STATUS_KEYERROR:
+            raise DurusKeyError(oid)
+        else:
+            raise ProtocolError, 'server returned invalid status %r' % status
+        numrecs = u32(recv(self.s, 4))
+        output = []
+        for i in xrange(numrecs):
+            oid = recv(self.s, 8)
+            rlen = u32(recv(self.s, 4))
+            data = recv(self.s, rlen)
+            output.append((oid, data))
+        return output
+
+
     def gen_oid_record(self):
         """() -> sequence([oid:str, record:str])
         A FileStorage will do a better job of this.
Index: storage_server.py
===================================================================
--- storage_server.py   (revision 13735)
+++ storage_server.py   (working copy)
@@ -182,6 +182,21 @@
             self.packer = self.storage.get_packer()
         s.sendall(STATUS_OKAY)

+    def handle_R(self, s):
+        # closure of references
+        oid = recv(s, 8)
+        if oid in self._find_client(s).invalid:
+            s.sendall(STATUS_INVALID)
+            return
+
+        s.sendall(STATUS_OKAY)
+        output = self.storage.page_in(oid)
+
+        s.sendall(p32(len(output)))
+        for oid, data in output:
+            s.sendall(oid + p32(len(data)) + data)
+
+
     def handle_Q(self, s):
         # graceful quit
         log(20, 'Quit')
reply