Index: client_storage.py =================================================================== --- client_storage.py (revision 13735) +++ client_storage.py (working copy) @@ -88,6 +88,28 @@ if status != STATUS_OKAY: raise ProtocolError, 'server returned invalid status %r' % status + def bulk_load (self, oid_list): + self.s.sendall('B' + p32(len(oid_list)) + ''.join(oid_list)) + status = recv(self.s, 1) + if status == STATUS_OKAY: + pass + elif status == STATUS_INVALID: + oid = recv(self.s, 8) + raise ReadConflictError([oid]) + elif status == STATUS_KEYERROR: + oid = recv(self.s, 8) + raise DurusKeyError(oid) + else: + raise ProtocolError, 'server returned invalid status %r' % status + numrecs = u32(recv(self.s, 4)) + output = [] + for i in xrange(numrecs): + rlen = u32(recv(self.s, 4)) + record = recv(self.s, rlen) + output.append(record) + return output + + def gen_oid_record(self): """() -> sequence([oid:str, record:str]) A FileStorage will do a better job of this. Index: file_storage.py =================================================================== --- file_storage.py (revision 13735) +++ file_storage.py (working copy) @@ -141,6 +141,15 @@ self.fp.seek(offset) return self._read_block() + def bulk_load(self, oid_list): + if self.fp is None: + raise IOError, 'storage is closed' + for oid in oid_list: + offset = self.index[oid] + self.fp.seek(offset) + record = self._read_block() + yield record + def begin(self): pass Index: storage.py =================================================================== --- storage.py (revision 13735) +++ storage.py (working copy) @@ -41,7 +41,12 @@ """ raise NotImplementedError + def bulk_load (self, oid_list): + """(oid) -> sequence(record:str) + """ + raise NotImplementedError + def gen_referring_oid_record(storage, referred_oid): """(storage:Storage, referred_oid:str) -> sequence([oid:str, record:str]) Generate oid, record pairs for all objects that include a Index: storage_server.py =================================================================== --- storage_server.py (revision 13735) +++ storage_server.py (working copy) @@ -182,6 +182,49 @@ self.packer = self.storage.get_packer() s.sendall(STATUS_OKAY) + def handle_B(self, s): + # bulk read of objects + number = u32(recv(s, 4)) # Read number of OIDs + + # Read at most 100 objects + invalid = self._find_client(s).invalid + number_read = 0 + response = [] + status = STATUS_OKAY + bad_oid = None + + for i in range(min(number, 100)): + oid = recv(s, 8) + number_read += 1 + if oid in invalid: + status = STATUS_INVALID + bad_oid = oid + break + + # Read record + try: + record = self.storage.load(oid) + except KeyError: + log(10, 'KeyError %s', u64(oid)) + status = STATUS_KEYERROR + bad_oid = oid + break + else: + if is_logging(5): + log(5, 'Load %-7s %s', u64(oid), extract_class_name(record)) + response.append(p32(len(record)) + record) + + # Read and ignore any excess OIDs + for i in range(number - number_read): + oid = recv(s, 8) + + # Return the response + s.sendall(status) + if status == STATUS_OKAY: + s.sendall(p32(len(response)) + ''.join(response)) + else: + s.sendall(bad_oid) + def handle_Q(self, s): # graceful quit log(20, 'Quit') Index: connection.py =================================================================== --- connection.py (revision 13735) +++ connection.py (working copy) @@ -6,7 +6,7 @@ from durus.logger import log from durus.persistent_dict import PersistentDict from durus.serialize import ObjectReader, ObjectWriter -from durus.serialize import unpack_record, pack_record +from durus.serialize import split_oids, unpack_record, pack_record from durus.storage import Storage from durus.utils import p64 from itertools import islice, chain @@ -257,7 +257,71 @@ self.abort() self.storage.pack() + def page_in(self, target_obj): + # Utility function -- returns a list of OIDs of the ghost objects + # referenced by an object. + def list_refs (target_obj): + refs = [] + for obj in target_obj.__dict__.itervalues(): + if hasattr(obj, '_p_oid') and obj._p_is_ghost(): + refs.append(obj._p_oid) + return refs + + # Simple check to see if the object has already been paged in: + # if any of the object's attributes are ghosts, assume that + # there's work to be done. + queue = list_refs(target_obj) + if len(queue) > 0: + # Do a breadth-first traversal of the closure of referenced objects + seen = set() + while len(queue): + # Filter out OIDs we've already seen + queue = [oid for oid in queue + if oid not in seen] + + # Perform a bulk load of the first 100 queue elements + oid_list = queue[:100] + del queue[:100] + if len(oid_list) == 0: + # If list is empty, don't bother sending a + # bulk-load request for 0 OIDs. + continue + + # Request bulk load + oid_list = list(set(oid_list)) # Filter out duplicates + records = self.storage.bulk_load(oid_list) + + # Add objects to the cache + for record in records: + oid, pickle, refdata = unpack_record(record) + new_obj = self.cache.get(oid) + if new_obj is None: + # Not in cache, so create new object for it + new_obj = self.reader.get_ghost(pickle) + ##print 'creating object', repr(oid) + new_obj._p_oid = oid + new_obj._p_connection = self + new_obj._p_set_status_ghost() + self.cache[oid] = new_obj + + elif not new_obj._p_is_ghost(): + # Already resident, so skip it on this pass, and + # on all future passes. + #print 'already resident!', new_obj + seen.add(oid) + continue + + # Fill in state for the object + ##print 'filling in object', repr(oid), id(new_obj) + state = self.reader.get_state(pickle) + new_obj.__setstate__(state) + new_obj._p_set_status_saved() + + queue.extend(split_oids(refdata)) + seen.add(oid) + + class Cache(object): def __init__(self, size):