Attached is an experiment which adds limited conflict resolution to Durus. I have a bit of experience with ZODB and have been repeatedly frustrated by its complexity when diagnosing problems. At this point I think it would be easier to enhance Durus for the needs of my app (gaining a good understanding of the implementation along the way) rather than continue to wrestle with ZODB. For conflict resolution, I wanted to come up with something easy to implement and understand, matching the rest of Durus. I suspected that useful conflict-proof objects such as counters and sets could be implemented even with fairly constrained resolution support. My working constraints were to make minimal changes to Durus and have negligible time/space overhead when conflict resolution is not used. As far as client-server interaction, the conflict resolution works as follows. Normally in a commit operation, the server sends a list of invalid OID's to the client, and the client responds with a number: 0 if it decides there was a conflict, otherwise a positive number indicating the number of object records it will send and commit. Here I modified the protocol to also support a negative number from the client, which means "I need some server object records". The client uses these as merge points in the conflict resolution. Just like a bulk load, the client will send a list of OID's, and the server will respond with the records. From there, reverting to the original protocol, the client can return 0 for a conflict error (i.e. the conflict resolution failed), or a positive count of object records to commit. To resolve a conflict for a particular OID on the client, a merge must be performed between three states of the object: the state committed on the server, the original local state (as retrieved from the client storage), and the current local state. The merging takes place on the client. Any class derived from Persistent and having a method named _p_resolve supports conflict resolution. The _p_resolve method takes the server state and original state as arguments, and is expected to merge those into its own state, returning True on success. When assembling arguments for a _p_resolve call, the connection gets an OID's server state via the protocol changes described above. The original local state is made available by a new slot called _p_state0 on PersistentBase. This slot is filled in when an object transitions to the unsaved state (and has a _p_resolve method), and is cleared when it transitions to the ghosted state. Implementations of _p_resolve are constrained in that they may not modify other persistent objects or cause another object to be unghosted. This limitation reflects the single-pass interaction between the client and server during a commit. Here is a simple summation class with conflict resolution: class Sum(Persistent): def __init__(self, x=0): self.x = x def increment(self, delta=1): self.x += delta def value(self): return self.x def _p_resolve(self, stored_state, initial_state): state = self.__getstate__() state['x'] += stored_state['x'] - initial_state['x'] return True and here is Sum in use: >>> c1 = Connection(ClientStorage()) >>> c2 = Connection(ClientStorage()) >>> c3 = Connection(ClientStorage()) >>> c1.get_root()['count'] = Sum() >>> c1.commit() >>> c1.get_root()['count'].value() 0 >>> c1.get_root()['count'].increment() >>> c2.get_root()['count'].increment() >>> c3.get_root()['count'].increment() >>> c1.commit() >>> c2.commit() >>> c3.commit() >>> c3.get_root()['count'].value() 3 --John
diff -urN tmp/Durus-3.5.orig/utils.py Durus-3.5/utils.py --- tmp/Durus-3.5.orig/utils.py 2006-05-11 08:45:36.000000000 -0400 +++ Durus-3.5/utils.py 2006-10-04 21:52:57.000000000 -0400 @@ -26,3 +26,10 @@ """Unpack an 8-byte string into a 32-bit long integer.""" return struct.unpack(">L", v)[0] +def ps32(v): + """Pack a signed integer or long into a 4-byte string""" + return struct.pack(">l", v) + +def us32(v): + """Unpack a signed 4-byte string into a 32-bit long integer.""" + return struct.unpack(">l", v)[0] diff -urN tmp/Durus-3.5.orig/storage_server.py Durus-3.5/storage_server.py --- tmp/Durus-3.5.orig/storage_server.py 2006-08-01 16:21:11.000000000 -0400 +++ Durus-3.5/storage_server.py 2006-10-08 20:35:16.000000000 -0400 @@ -5,7 +5,7 @@ from datetime import datetime from durus.logger import log, is_logging from durus.serialize import extract_class_name, split_oids -from durus.utils import p32, u32, u64 +from durus.utils import p32, u32, us32, u64 from grp import getgrnam, getgrgid from os import unlink, stat, chown, geteuid, getegid, umask from os.path import exists @@ -295,9 +295,18 @@ client = self._find_client(s) s.sendall(p32(len(client.invalid)) + ''.join(client.invalid)) client.invalid.clear() - tlen = u32(recv(s, 4)) + tlen = us32(recv(s, 4)) if tlen == 0: return # client decided not to commit (e.g. conflict) + elif tlen < 0: + # send objects for conflict resolution + oid_str = recv(s, 8 * -tlen) + for oid in split_oids(oid_str): + self._send_load_response(s, oid) + # get num items to commit again + tlen = u32(recv(s, 4)) + if tlen == 0: + return tdata = recv(s, tlen) logging_debug = is_logging(10) logging_debug and log(10, 'Committing %s bytes', tlen) diff -urN tmp/Durus-3.5.orig/client_storage.py Durus-3.5/client_storage.py --- tmp/Durus-3.5.orig/client_storage.py 2006-08-01 16:21:12.000000000 -0400 +++ Durus-3.5/client_storage.py 2006-10-08 16:46:02.000000000 -0400 @@ -9,7 +9,7 @@ from durus.storage_server import DEFAULT_PORT, DEFAULT_HOST, recv from durus.storage_server import SocketAddress, StorageServer from durus.storage_server import STATUS_OKAY, STATUS_KEYERROR, STATUS_INVALID -from durus.utils import p32, u32, p64, u64 +from durus.utils import p32, u32, ps32, p64, u64 class ClientStorage(Storage): @@ -70,12 +70,13 @@ if n != 0: packed_oids = recv(self.s, n*8) try: - handle_invalidations(split_oids(packed_oids)) + handle_invalidations(split_oids(packed_oids), + bulk_load=ConflictResolutionBulkLoad(self)) except ConflictError: self.transaction_new_oids.reverse() self.oid_pool.extend(self.transaction_new_oids) self.begin() # clear out records and transaction_new_oids. - self.s.sendall(p32(0)) # Tell server we are done. + self.s.sendall(ps32(0)) # Tell server we are done. raise tdata = [] for oid, record in self.records.iteritems(): @@ -123,3 +124,20 @@ yield oid, record except DurusKeyError: pass + + +class ConflictResolutionBulkLoad(object): + + def __init__(self, storage): + self.storage = storage + self.called = False + + def __call__(self, oids): + if self.called: + raise RuntimeError('bulk load already called') + self.called = True + oid_str = ''.join(oids) + num_oids = len(oids) + self.storage.s.sendall(ps32(-num_oids) + oid_str) + for oid in oids: + yield self.storage._get_load_response(oid) diff -urN tmp/Durus-3.5.orig/connection.py Durus-3.5/connection.py --- tmp/Durus-3.5.orig/connection.py 2006-08-01 16:21:11.000000000 -0400 +++ Durus-3.5/connection.py 2006-10-08 20:38:25.000000000 -0400 @@ -8,7 +8,8 @@ from durus.persistent import ConnectionBase from durus.persistent_dict import PersistentDict from durus.serialize import ObjectReader, ObjectWriter -from durus.serialize import split_oids, unpack_record, pack_record +from durus.serialize import split_oids, unpack_record, pack_record, \ + extract_state from durus.storage import Storage from durus.utils import p64 from itertools import islice, chain @@ -281,7 +282,7 @@ self.shrink_cache() self.transaction_serial += 1 - def _handle_invalidations(self, oids, read_oid=None): + def _handle_invalidations(self, oids, read_oid=None, bulk_load=None): """(oids:[str], read_oid:str=None) Check if any of the oids are for objects that were accessed during this transaction. If so, raise the appropriate conflict exception. @@ -297,7 +298,38 @@ conflicts.append(oid) if conflicts: if read_oid is None: - raise ConflictError(conflicts) + # This shortcut avoids the extra bulk load in most cases + # where conflict resolution isn't being used. + if not hasattr(self.cache.get(conflicts[0]), '_p_resolve'): + raise ConflictError(conflicts) + """ + Run conflict resolution + for each oid with conflict: + get object state from server (without affecting cache) + get local object + get previous local object state + call resolve method on object + ConflictError on failure + re-store resolved objects to transaction + """ + # ISSUE: would like to use izip here, but exiting early with + # data on the socket would cause problems + for oid, record in zip(conflicts, bulk_load(conflicts)): + server_state = extract_state(record) + obj = self.cache.get(oid) + try: + assert obj._p_resolve(server_state, obj._p_state0) + except (AttributeError, AssertionError): + raise ConflictError(conflicts) + writer = ObjectWriter(self) + try: + for oid in conflicts: + obj = self.cache.get(oid) + writer.gen_new_objects(obj) + data, refs = writer.get_state(obj) + self.storage.store(oid, pack_record(oid, data, refs)) + finally: + writer.close() else: raise ReadConflictError([read_oid]) diff -urN tmp/Durus-3.5.orig/serialize.py Durus-3.5/serialize.py --- tmp/Durus-3.5.orig/serialize.py 2006-08-01 16:21:14.000000000 -0400 +++ Durus-3.5/serialize.py 2006-10-08 20:33:55.000000000 -0400 @@ -44,6 +44,35 @@ class_name = state.split('\n', 2)[1] return class_name +def _extract_state_from_data(data, load=True, get_unpickler=None): + s = StringIO() + s.write(data) + s.seek(0) + if get_unpickler: + unpickler = get_unpickler(s) + else: + unpickler = Unpickler(s) + klass = unpickler.load() + position = s.tell() + if data[s.tell()] == 'x': + # This is almost certainly a compressed pickle. + try: + decompressed = decompress(data[position:]) + except zlib_error: + pass # let the unpickler try anyway. + else: + s.write(decompressed) + s.seek(position) + if load: + return unpickler.load() + else: + return s.read() + +def extract_state(record): + oid, data, refs = unpack_record(record) + return _extract_state_from_data(data) + + class ObjectWriter(object): """ Serializes objects for storage in the database. @@ -133,25 +162,7 @@ return instance def get_state(self, data, load=True): - s = StringIO() - s.write(data) - s.seek(0) - unpickler = self._get_unpickler(s) - klass = unpickler.load() - position = s.tell() - if data[s.tell()] == 'x': - # This is almost certainly a compressed pickle. - try: - decompressed = decompress(data[position:]) - except zlib_error: - pass # let the unpickler try anyway. - else: - s.write(decompressed) - s.seek(position) - if load: - return unpickler.load() - else: - return s.read() + return _extract_state_from_data(data, load, self._get_unpickler) def get_state_pickle(self, data): return self.get_state(data, load=False) diff -urN tmp/Durus-3.5.orig/persistent.py Durus-3.5/persistent.py --- tmp/Durus-3.5.orig/persistent.py 2006-08-01 16:21:12.000000000 -0400 +++ Durus-3.5/persistent.py 2006-10-08 20:42:58.000000000 -0400 @@ -2,6 +2,7 @@ $URL: svn+ssh://svn.mems-exchange.org/repos/trunk/durus/persistent.py $ $Id: persistent.py 28547 2006-07-27 19:12:25Z dbinger $ """ +from copy import deepcopy from durus.utils import format_oid from sys import stderr @@ -76,7 +77,8 @@ The _p_oid is None when this instance has never been stored. """ - __slots__ = ['_p_status', '_p_serial', '_p_connection', '_p_oid'] + __slots__ = ['_p_status', '_p_serial', '_p_connection', '_p_oid', + '_p_state0'] def __new__(klass, *args, **kwargs): instance = object.__new__(klass, *args, **kwargs) @@ -137,10 +139,8 @@ def _p_note_change(self): if self._p_status != UNSAVED: - if self._p_status == GHOST: - self._p_load_state() + self._p_set_status_unsaved() self._p_connection.note_change(self) - self._p_status = UNSAVED def _p_format_oid(self): return format_oid(self._p_oid) @@ -148,6 +148,7 @@ def _p_set_status_ghost(self, getattribute=object.__getattribute__): d = getattribute(self, '__dict__') d.clear() + self._p_state0 = None self._p_status = GHOST def _p_set_status_saved(self): @@ -156,6 +157,8 @@ def _p_set_status_unsaved(self): if self._p_status == GHOST: self._p_load_state() + if hasattr(self, '_p_resolve'): + self._p_state0 = deepcopy(self.__getstate__()) self._p_status = UNSAVED def _p_is_ghost(self): diff -urN -b tmp/Durus-3.5.orig/_persistent.c Durus-3.5/_persistent.c --- tmp/Durus-3.5.orig/_persistent.c 2006-08-01 16:21:10.000000000 -0400 +++ Durus-3.5/_persistent.c 2006-10-08 20:43:22.000000000 -0400 @@ -16,6 +16,7 @@ PyObject *p_serial; PyObject *p_connection; PyObject *p_oid; + PyObject *p_state0; } PersistentBaseObject; typedef struct { @@ -39,6 +40,8 @@ Py_INCREF(x->p_connection); x->p_oid = Py_None; Py_INCREF(x->p_oid); + x->p_state0 = Py_None; + Py_INCREF(x->p_state0); return (PyObject *)x; } @@ -50,6 +53,7 @@ Py_XDECREF(self->p_connection); Py_XDECREF(self->p_oid); Py_XDECREF(self->p_serial); + Py_XDECREF(self->p_state0); PyObject_GC_Del(self); Py_TRASHCAN_SAFE_END(self); } @@ -60,6 +64,7 @@ Py_VISIT(self->p_connection); Py_VISIT(self->p_oid); Py_VISIT(self->p_serial); + Py_VISIT(self->p_state0); return 0; } @@ -69,6 +74,7 @@ Py_CLEAR(self->p_connection); Py_CLEAR(self->p_oid); Py_CLEAR(self->p_serial); + Py_CLEAR(self->p_state0); return 0; } @@ -186,6 +192,7 @@ {"_p_status", T_INT, offsetof(PersistentBaseObject, p_status)}, {"_p_connection", T_OBJECT_EX, offsetof(PersistentBaseObject, p_connection)}, {"_p_oid", T_OBJECT_EX, offsetof(PersistentBaseObject, p_oid)}, + {"_p_state0", T_OBJECT_EX, offsetof(PersistentBaseObject, p_state0)}, {NULL} };