durusmail: durus-users: Revised paging-in patch
Revised paging-in patch
2006-07-07
2006-07-08
2006-07-08
2006-07-08
2006-07-10
2006-07-11
2006-07-14
Revised paging-in patch
A.M. Kuchling
2006-07-07
After talking to David and Roger, I've revised the paging-in patch.
Now it adds a bulk_load() method that takes a list of OIDs and returns
the corresponding records.  The StorageServer gains a 'B' command that
will return up to 100 records, and page_in now does its traversing of the
reference closure on the client side.

--amk

Index: client_storage.py
===================================================================
--- client_storage.py   (revision 13735)
+++ client_storage.py   (working copy)
@@ -88,6 +88,29 @@
         if status != STATUS_OKAY:
             raise ProtocolError, 'server returned invalid status %r' % status

+    def bulk_load (self, oid_list):
+        self.s.sendall('B' + p32(len(oid_list)) + ''.join(oid_list))
+        status = recv(self.s, 1)
+        if status == STATUS_OKAY:
+            pass
+        elif status == STATUS_INVALID:
+            oid = recv(self.s, 8)
+            raise ReadConflictError([oid])
+        elif status == STATUS_KEYERROR:
+            oid = recv(self.s, 8)
+            raise DurusKeyError(oid)
+        else:
+            raise ProtocolError, 'server returned invalid status %r' % status
+        numrecs = u32(recv(self.s, 4))
+        output = []
+        for i in xrange(numrecs):
+            oid = recv(self.s, 8)
+            rlen = u32(recv(self.s, 4))
+            record = recv(self.s, rlen)
+            output.append(record)
+        return output
+
+
     def gen_oid_record(self):
         """() -> sequence([oid:str, record:str])
         A FileStorage will do a better job of this.
Index: file_storage.py
===================================================================
--- file_storage.py     (revision 13735)
+++ file_storage.py     (working copy)
@@ -141,6 +141,15 @@
         self.fp.seek(offset)
         return self._read_block()

+    def bulk_load(self, oid_list):
+        if self.fp is None:
+            raise IOError, 'storage is closed'
+        for oid in oid_list:
+            offset = self.index[oid]
+            self.fp.seek(offset)
+            record = self._read_block()
+            yield record
+
     def begin(self):
         pass

Index: storage.py
===================================================================
--- storage.py  (revision 13735)
+++ storage.py  (working copy)
@@ -41,7 +41,12 @@
         """
         raise NotImplementedError

+    def bulk_load (self, oid_list):
+        """(oid) -> sequence(record:str)
+        """
+        raise NotImplementedError

+
 def gen_referring_oid_record(storage, referred_oid):
     """(storage:Storage, referred_oid:str) -> sequence([oid:str, record:str])
     Generate oid, record pairs for all objects that include a
Index: storage_server.py
===================================================================
--- storage_server.py   (revision 13735)
+++ storage_server.py   (working copy)
@@ -182,6 +182,52 @@
             self.packer = self.storage.get_packer()
         s.sendall(STATUS_OKAY)

+    def handle_B(self, s):
+        # bulk read of objects
+        number = u32(recv(s, 4))        # Read number of OIDs
+
+        # Read at most 100 objects
+        invalid = self._find_client(s).invalid
+        number_left = number
+        number_read = 0
+        response = []
+        status = STATUS_OKAY
+        bad_oid = None
+
+        for i in range(min(number_left, 100)):
+            oid = recv(s, 8)
+            number_read += 1
+            if oid in invalid:
+                status = STATUS_INVALID
+                bad_oid = oid
+                break
+
+            # Read record
+            try:
+                record = self.storage.load(oid)
+            except KeyError:
+                log(10, 'KeyError %s', u64(oid))
+                status = STATUS_KEYERROR
+                bad_oid = oid
+                break
+            else:
+                if is_logging(5):
+                    log(5, 'Load %-7s %s', u64(oid),
extract_class_name(record))
+                response.append((oid, record))
+
+        # Read and ignore any excess OIDs
+        for i in range(number - number_read):
+            oid = recv(s, 8)
+
+        # Return the response
+        s.sendall(status)
+        if status == STATUS_OKAY:
+            s.sendall(p32(len(response)))
+            for oid, record in response:
+                s.sendall(oid + p32(len(record)) + record)
+        else:
+            s.sendall(bad_oid)
+
     def handle_Q(self, s):
         # graceful quit
         log(20, 'Quit')
Index: connection.py
===================================================================
--- connection.py       (revision 13735)
+++ connection.py       (working copy)
@@ -6,7 +6,7 @@
 from durus.logger import log
 from durus.persistent_dict import PersistentDict
 from durus.serialize import ObjectReader, ObjectWriter
-from durus.serialize import unpack_record, pack_record
+from durus.serialize import split_oids, unpack_record, pack_record
 from durus.storage import Storage
 from durus.utils import p64
 from itertools import islice, chain
@@ -257,7 +257,68 @@
         self.abort()
         self.storage.pack()

+    def page_in(self, target_obj, limit=None):

+        # Utility function -- returns a list of OIDs of the ghost objects
+        # referenced by an object.
+        def list_refs (target_obj):
+            refs = []
+            for obj in target_obj.__dict__.itervalues():
+                if hasattr(obj, '_p_oid') and obj._p_is_ghost():
+                    refs.append(obj._p_oid)
+                    assert obj._p_is_ghost()
+            return refs
+
+        # Simple check to see if the object has already been paged in:
+        # if any of the object's attributes are ghosts, assume that
+        # there's work to be done.
+        queue = list_refs(target_obj)
+        if len(queue) > 0:
+            # Do a breadth-first traversal of the closure of referenced objects
+            seen = set()
+            while len(queue):
+                # Perform a bulk load of the first 100 queue elements
+                oid_list = queue[:100]
+                del queue[:100]
+                # Filter out OIDs we've already seen
+                oid_list = [oid for oid in oid_list
+                            if oid not in seen]
+                if len(oid_list) == 0:
+                    # If list is empty, don't bother sending a
+                    # bulk-load request for 0 OIDs.
+                    continue
+
+                # Request bulk load
+                records = self.storage.bulk_load(oid_list)
+
+                # Add objects to the cache
+                for record in records:
+                    oid, pickle, refdata = unpack_record(record)
+                    new_obj = self.cache.get(oid)
+                    if not new_obj._p_is_ghost():
+                        # Already resident, so skip it
+                        continue
+                    ##print repr(oid), new_obj, new_obj._p_is_ghost()
+                    if new_obj is None:
+                        # Not in cache, so create new object for it
+                        new_obj = self.reader.get_ghost(pickle)
+                        ##print 'creating object', repr(oid)
+                        new_obj._p_oid = oid
+                        new_obj._p_connection = self
+                        new_obj._p_set_status_ghost()
+                        self.cache[oid] = new_obj
+
+                    # Fill in state for the object
+                    assert new_obj._p_is_ghost()
+                    ##print 'filling in object', repr(oid), id(new_obj)
+                    state = self.reader.get_state(pickle)
+                    new_obj.__setstate__(state)
+                    new_obj._p_set_status_saved()
+
+                    queue.extend(split_oids(refdata))
+                    seen.add(oid)
+
+
 class Cache(object):

     def __init__(self, size):
reply