After talking to David and Roger, I've revised the paging-in patch.
Now it adds a bulk_load() method that takes a list of OIDs and returns
the corresponding records. The StorageServer gains a 'B' command that
will return up to 100 records, and page_in now does its traversing of the
reference closure on the client side.
--amk
Index: client_storage.py
===================================================================
--- client_storage.py (revision 13735)
+++ client_storage.py (working copy)
@@ -88,6 +88,29 @@
if status != STATUS_OKAY:
raise ProtocolError, 'server returned invalid status %r' % status
+ def bulk_load (self, oid_list):
+ self.s.sendall('B' + p32(len(oid_list)) + ''.join(oid_list))
+ status = recv(self.s, 1)
+ if status == STATUS_OKAY:
+ pass
+ elif status == STATUS_INVALID:
+ oid = recv(self.s, 8)
+ raise ReadConflictError([oid])
+ elif status == STATUS_KEYERROR:
+ oid = recv(self.s, 8)
+ raise DurusKeyError(oid)
+ else:
+ raise ProtocolError, 'server returned invalid status %r' % status
+ numrecs = u32(recv(self.s, 4))
+ output = []
+ for i in xrange(numrecs):
+ oid = recv(self.s, 8)
+ rlen = u32(recv(self.s, 4))
+ record = recv(self.s, rlen)
+ output.append(record)
+ return output
+
+
def gen_oid_record(self):
"""() -> sequence([oid:str, record:str])
A FileStorage will do a better job of this.
Index: file_storage.py
===================================================================
--- file_storage.py (revision 13735)
+++ file_storage.py (working copy)
@@ -141,6 +141,15 @@
self.fp.seek(offset)
return self._read_block()
+ def bulk_load(self, oid_list):
+ if self.fp is None:
+ raise IOError, 'storage is closed'
+ for oid in oid_list:
+ offset = self.index[oid]
+ self.fp.seek(offset)
+ record = self._read_block()
+ yield record
+
def begin(self):
pass
Index: storage.py
===================================================================
--- storage.py (revision 13735)
+++ storage.py (working copy)
@@ -41,7 +41,12 @@
"""
raise NotImplementedError
+ def bulk_load (self, oid_list):
+ """(oid) -> sequence(record:str)
+ """
+ raise NotImplementedError
+
def gen_referring_oid_record(storage, referred_oid):
"""(storage:Storage, referred_oid:str) -> sequence([oid:str, record:str])
Generate oid, record pairs for all objects that include a
Index: storage_server.py
===================================================================
--- storage_server.py (revision 13735)
+++ storage_server.py (working copy)
@@ -182,6 +182,52 @@
self.packer = self.storage.get_packer()
s.sendall(STATUS_OKAY)
+ def handle_B(self, s):
+ # bulk read of objects
+ number = u32(recv(s, 4)) # Read number of OIDs
+
+ # Read at most 100 objects
+ invalid = self._find_client(s).invalid
+ number_left = number
+ number_read = 0
+ response = []
+ status = STATUS_OKAY
+ bad_oid = None
+
+ for i in range(min(number_left, 100)):
+ oid = recv(s, 8)
+ number_read += 1
+ if oid in invalid:
+ status = STATUS_INVALID
+ bad_oid = oid
+ break
+
+ # Read record
+ try:
+ record = self.storage.load(oid)
+ except KeyError:
+ log(10, 'KeyError %s', u64(oid))
+ status = STATUS_KEYERROR
+ bad_oid = oid
+ break
+ else:
+ if is_logging(5):
+ log(5, 'Load %-7s %s', u64(oid),
extract_class_name(record))
+ response.append((oid, record))
+
+ # Read and ignore any excess OIDs
+ for i in range(number - number_read):
+ oid = recv(s, 8)
+
+ # Return the response
+ s.sendall(status)
+ if status == STATUS_OKAY:
+ s.sendall(p32(len(response)))
+ for oid, record in response:
+ s.sendall(oid + p32(len(record)) + record)
+ else:
+ s.sendall(bad_oid)
+
def handle_Q(self, s):
# graceful quit
log(20, 'Quit')
Index: connection.py
===================================================================
--- connection.py (revision 13735)
+++ connection.py (working copy)
@@ -6,7 +6,7 @@
from durus.logger import log
from durus.persistent_dict import PersistentDict
from durus.serialize import ObjectReader, ObjectWriter
-from durus.serialize import unpack_record, pack_record
+from durus.serialize import split_oids, unpack_record, pack_record
from durus.storage import Storage
from durus.utils import p64
from itertools import islice, chain
@@ -257,7 +257,68 @@
self.abort()
self.storage.pack()
+ def page_in(self, target_obj, limit=None):
+ # Utility function -- returns a list of OIDs of the ghost objects
+ # referenced by an object.
+ def list_refs (target_obj):
+ refs = []
+ for obj in target_obj.__dict__.itervalues():
+ if hasattr(obj, '_p_oid') and obj._p_is_ghost():
+ refs.append(obj._p_oid)
+ assert obj._p_is_ghost()
+ return refs
+
+ # Simple check to see if the object has already been paged in:
+ # if any of the object's attributes are ghosts, assume that
+ # there's work to be done.
+ queue = list_refs(target_obj)
+ if len(queue) > 0:
+ # Do a breadth-first traversal of the closure of referenced objects
+ seen = set()
+ while len(queue):
+ # Perform a bulk load of the first 100 queue elements
+ oid_list = queue[:100]
+ del queue[:100]
+ # Filter out OIDs we've already seen
+ oid_list = [oid for oid in oid_list
+ if oid not in seen]
+ if len(oid_list) == 0:
+ # If list is empty, don't bother sending a
+ # bulk-load request for 0 OIDs.
+ continue
+
+ # Request bulk load
+ records = self.storage.bulk_load(oid_list)
+
+ # Add objects to the cache
+ for record in records:
+ oid, pickle, refdata = unpack_record(record)
+ new_obj = self.cache.get(oid)
+ if not new_obj._p_is_ghost():
+ # Already resident, so skip it
+ continue
+ ##print repr(oid), new_obj, new_obj._p_is_ghost()
+ if new_obj is None:
+ # Not in cache, so create new object for it
+ new_obj = self.reader.get_ghost(pickle)
+ ##print 'creating object', repr(oid)
+ new_obj._p_oid = oid
+ new_obj._p_connection = self
+ new_obj._p_set_status_ghost()
+ self.cache[oid] = new_obj
+
+ # Fill in state for the object
+ assert new_obj._p_is_ghost()
+ ##print 'filling in object', repr(oid), id(new_obj)
+ state = self.reader.get_state(pickle)
+ new_obj.__setstate__(state)
+ new_obj._p_set_status_saved()
+
+ queue.extend(split_oids(refdata))
+ seen.add(oid)
+
+
class Cache(object):
def __init__(self, size):