durusmail: durus-users: [PATCH] Limited conflict resolution
[PATCH] Limited conflict resolution
[PATCH] Limited conflict resolution
2006-10-10
2006-10-10
[PATCH] Limited conflict resolution
John Belmonte
2006-10-10
Attached is an experiment which adds limited conflict resolution to
Durus.  I have a bit of experience with ZODB and have been repeatedly
frustrated by its complexity when diagnosing problems.  At this point I
think it would be easier to enhance Durus for the needs of my app
(gaining a good understanding of the implementation along the way)
rather than continue to wrestle with ZODB.  For conflict resolution, I
wanted to come up with something easy to implement and understand,
matching the rest of Durus.  I suspected that useful conflict-proof
objects such as counters and sets could be implemented even with fairly
constrained resolution support.

My working constraints were to make minimal changes to Durus and have
negligible time/space overhead when conflict resolution is not used.

As far as client-server interaction, the conflict resolution works as
follows.  Normally in a commit operation, the server sends a list of
invalid OID's to the client, and the client responds with a number: 0 if
it decides there was a conflict, otherwise a positive number indicating
the number of object records it will send and commit.  Here I modified
the protocol to also support a negative number from the client, which
means "I need some server object records".  The client uses these as
merge points in the conflict resolution.  Just like a bulk load, the
client will send a list of OID's, and the server will respond with the
records.  From there, reverting to the original protocol, the client can
return 0 for a conflict error (i.e. the conflict resolution failed), or
a positive count of object records to commit.

To resolve a conflict for a particular OID on the client, a merge must
be performed between three states of the object: the state committed on
the server, the original local state (as retrieved from the client
storage), and the current local state.  The merging takes place on the
client.  Any class derived from Persistent and having a method named
_p_resolve supports conflict resolution.  The _p_resolve method takes
the server state and original state as arguments, and is expected to
merge those into its own state, returning True on success.

When assembling arguments for a _p_resolve call, the connection gets an
OID's server state via the protocol changes described above.  The
original local state is made available by a new slot called _p_state0 on
PersistentBase.  This slot is filled in when an object transitions to
the unsaved state (and has a _p_resolve method), and is cleared when it
transitions to the ghosted state.

Implementations of _p_resolve are constrained in that they may not
modify other persistent objects or cause another object to be unghosted.
 This limitation reflects the single-pass interaction between the client
and server during a commit.

Here is a simple summation class with conflict resolution:

    class Sum(Persistent):

        def __init__(self, x=0):
            self.x = x

        def increment(self, delta=1):
            self.x += delta

        def value(self):
            return self.x

        def _p_resolve(self, stored_state, initial_state):
            state = self.__getstate__()
            state['x'] += stored_state['x'] - initial_state['x']
            return True

and here is Sum in use:

    >>> c1 = Connection(ClientStorage())
    >>> c2 = Connection(ClientStorage())
    >>> c3 = Connection(ClientStorage())
    >>> c1.get_root()['count'] = Sum()
    >>> c1.commit()
    >>> c1.get_root()['count'].value()
    0
    >>> c1.get_root()['count'].increment()
    >>> c2.get_root()['count'].increment()
    >>> c3.get_root()['count'].increment()
    >>> c1.commit()
    >>> c2.commit()
    >>> c3.commit()
    >>> c3.get_root()['count'].value()
    3


--John
diff -urN tmp/Durus-3.5.orig/utils.py Durus-3.5/utils.py
--- tmp/Durus-3.5.orig/utils.py 2006-05-11 08:45:36.000000000 -0400
+++ Durus-3.5/utils.py  2006-10-04 21:52:57.000000000 -0400
@@ -26,3 +26,10 @@
     """Unpack an 8-byte string into a 32-bit long integer."""
     return struct.unpack(">L", v)[0]

+def ps32(v):
+    """Pack a signed integer or long into a 4-byte string"""
+    return struct.pack(">l", v)
+
+def us32(v):
+    """Unpack a signed 4-byte string into a 32-bit long integer."""
+    return struct.unpack(">l", v)[0]
diff -urN tmp/Durus-3.5.orig/storage_server.py Durus-3.5/storage_server.py
--- tmp/Durus-3.5.orig/storage_server.py        2006-08-01 16:21:11.000000000
-0400
+++ Durus-3.5/storage_server.py 2006-10-08 20:35:16.000000000 -0400
@@ -5,7 +5,7 @@
 from datetime import datetime
 from durus.logger import log, is_logging
 from durus.serialize import extract_class_name, split_oids
-from durus.utils import p32, u32, u64
+from durus.utils import p32, u32, us32, u64
 from grp import getgrnam, getgrgid
 from os import unlink, stat, chown, geteuid, getegid, umask
 from os.path import exists
@@ -295,9 +295,18 @@
         client = self._find_client(s)
         s.sendall(p32(len(client.invalid)) + ''.join(client.invalid))
         client.invalid.clear()
-        tlen = u32(recv(s, 4))
+        tlen = us32(recv(s, 4))
         if tlen == 0:
             return # client decided not to commit (e.g. conflict)
+        elif tlen < 0:
+            # send objects for conflict resolution
+            oid_str = recv(s, 8 * -tlen)
+            for oid in split_oids(oid_str):
+                self._send_load_response(s, oid)
+            # get num items to commit again
+            tlen = u32(recv(s, 4))
+            if tlen == 0:
+                return
         tdata = recv(s, tlen)
         logging_debug = is_logging(10)
         logging_debug and log(10, 'Committing %s bytes', tlen)

diff -urN tmp/Durus-3.5.orig/client_storage.py Durus-3.5/client_storage.py
--- tmp/Durus-3.5.orig/client_storage.py        2006-08-01 16:21:12.000000000
-0400
+++ Durus-3.5/client_storage.py 2006-10-08 16:46:02.000000000 -0400
@@ -9,7 +9,7 @@
 from durus.storage_server import DEFAULT_PORT, DEFAULT_HOST, recv
 from durus.storage_server import SocketAddress, StorageServer
 from durus.storage_server import STATUS_OKAY, STATUS_KEYERROR, STATUS_INVALID
-from durus.utils import p32, u32, p64, u64
+from durus.utils import p32, u32, ps32, p64, u64


 class ClientStorage(Storage):
@@ -70,12 +70,13 @@
         if n != 0:
             packed_oids = recv(self.s, n*8)
             try:
-                handle_invalidations(split_oids(packed_oids))
+                handle_invalidations(split_oids(packed_oids),
+                    bulk_load=ConflictResolutionBulkLoad(self))
             except ConflictError:
                 self.transaction_new_oids.reverse()
                 self.oid_pool.extend(self.transaction_new_oids)
                 self.begin() # clear out records and transaction_new_oids.
-                self.s.sendall(p32(0)) # Tell server we are done.
+                self.s.sendall(ps32(0)) # Tell server we are done.
                 raise
         tdata = []
         for oid, record in self.records.iteritems():
@@ -123,3 +124,20 @@
                 yield oid, record
             except DurusKeyError:
                 pass
+
+
+class ConflictResolutionBulkLoad(object):
+
+    def __init__(self, storage):
+        self.storage = storage
+        self.called = False
+
+    def __call__(self, oids):
+        if self.called:
+            raise RuntimeError('bulk load already called')
+        self.called = True
+        oid_str = ''.join(oids)
+        num_oids = len(oids)
+        self.storage.s.sendall(ps32(-num_oids) + oid_str)
+        for oid in oids:
+            yield self.storage._get_load_response(oid)
diff -urN tmp/Durus-3.5.orig/connection.py Durus-3.5/connection.py
--- tmp/Durus-3.5.orig/connection.py    2006-08-01 16:21:11.000000000 -0400
+++ Durus-3.5/connection.py     2006-10-08 20:38:25.000000000 -0400
@@ -8,7 +8,8 @@
 from durus.persistent import ConnectionBase
 from durus.persistent_dict import PersistentDict
 from durus.serialize import ObjectReader, ObjectWriter
-from durus.serialize import split_oids, unpack_record, pack_record
+from durus.serialize import split_oids, unpack_record, pack_record, \
+    extract_state
 from durus.storage import Storage
 from durus.utils import p64
 from itertools import islice, chain
@@ -281,7 +282,7 @@
         self.shrink_cache()
         self.transaction_serial += 1

-    def _handle_invalidations(self, oids, read_oid=None):
+    def _handle_invalidations(self, oids, read_oid=None, bulk_load=None):
         """(oids:[str], read_oid:str=None)
         Check if any of the oids are for objects that were accessed during
         this transaction.  If so, raise the appropriate conflict exception.
@@ -297,7 +298,38 @@
                 conflicts.append(oid)
         if conflicts:
             if read_oid is None:
-                raise ConflictError(conflicts)
+                # This shortcut avoids the extra bulk load in most cases
+                # where conflict resolution isn't being used.
+                if not hasattr(self.cache.get(conflicts[0]), '_p_resolve'):
+                    raise ConflictError(conflicts)
+                """
+                Run conflict resolution
+                    for each oid with conflict:
+                        get object state from server (without affecting cache)
+                        get local object
+                        get previous local object state
+                        call resolve method on object
+                        ConflictError on failure
+                    re-store resolved objects to transaction
+                """
+                # ISSUE: would like to use izip here, but exiting early with
+                #       data on the socket would cause problems
+                for oid, record in zip(conflicts, bulk_load(conflicts)):
+                    server_state = extract_state(record)
+                    obj = self.cache.get(oid)
+                    try:
+                        assert obj._p_resolve(server_state, obj._p_state0)
+                    except (AttributeError, AssertionError):
+                        raise ConflictError(conflicts)
+                writer = ObjectWriter(self)
+                try:
+                    for oid in conflicts:
+                        obj = self.cache.get(oid)
+                        writer.gen_new_objects(obj)
+                        data, refs = writer.get_state(obj)
+                        self.storage.store(oid, pack_record(oid, data, refs))
+                finally:
+                    writer.close()
             else:
                 raise ReadConflictError([read_oid])

diff -urN tmp/Durus-3.5.orig/serialize.py Durus-3.5/serialize.py
--- tmp/Durus-3.5.orig/serialize.py     2006-08-01 16:21:14.000000000 -0400
+++ Durus-3.5/serialize.py      2006-10-08 20:33:55.000000000 -0400
@@ -44,6 +44,35 @@
     class_name = state.split('\n', 2)[1]
     return class_name

+def _extract_state_from_data(data, load=True, get_unpickler=None):
+    s = StringIO()
+    s.write(data)
+    s.seek(0)
+    if get_unpickler:
+        unpickler = get_unpickler(s)
+    else:
+        unpickler = Unpickler(s)
+    klass = unpickler.load()
+    position = s.tell()
+    if data[s.tell()] == 'x':
+        # This is almost certainly a compressed pickle.
+        try:
+            decompressed = decompress(data[position:])
+        except zlib_error:
+            pass # let the unpickler try anyway.
+        else:
+            s.write(decompressed)
+            s.seek(position)
+    if load:
+        return unpickler.load()
+    else:
+        return s.read()
+
+def extract_state(record):
+    oid, data, refs = unpack_record(record)
+    return _extract_state_from_data(data)
+
+
 class ObjectWriter(object):
     """
     Serializes objects for storage in the database.
@@ -133,25 +162,7 @@
         return instance

     def get_state(self, data, load=True):
-        s = StringIO()
-        s.write(data)
-        s.seek(0)
-        unpickler = self._get_unpickler(s)
-        klass = unpickler.load()
-        position = s.tell()
-        if data[s.tell()] == 'x':
-            # This is almost certainly a compressed pickle.
-            try:
-                decompressed = decompress(data[position:])
-            except zlib_error:
-                pass # let the unpickler try anyway.
-            else:
-                s.write(decompressed)
-                s.seek(position)
-        if load:
-            return unpickler.load()
-        else:
-            return s.read()
+        return _extract_state_from_data(data, load, self._get_unpickler)

     def get_state_pickle(self, data):
         return self.get_state(data, load=False)
diff -urN tmp/Durus-3.5.orig/persistent.py Durus-3.5/persistent.py
--- tmp/Durus-3.5.orig/persistent.py    2006-08-01 16:21:12.000000000 -0400
+++ Durus-3.5/persistent.py     2006-10-08 20:42:58.000000000 -0400
@@ -2,6 +2,7 @@
 $URL: svn+ssh://svn.mems-exchange.org/repos/trunk/durus/persistent.py $
 $Id: persistent.py 28547 2006-07-27 19:12:25Z dbinger $
 """
+from copy import deepcopy
 from durus.utils import format_oid
 from sys import stderr

@@ -76,7 +77,8 @@
             The _p_oid is None when this instance has never been stored.
         """

-        __slots__ = ['_p_status', '_p_serial', '_p_connection', '_p_oid']
+        __slots__ = ['_p_status', '_p_serial', '_p_connection', '_p_oid',
+            '_p_state0']

         def __new__(klass, *args, **kwargs):
             instance = object.__new__(klass, *args, **kwargs)
@@ -137,10 +139,8 @@

     def _p_note_change(self):
         if self._p_status != UNSAVED:
-            if self._p_status == GHOST:
-                self._p_load_state()
+            self._p_set_status_unsaved()
             self._p_connection.note_change(self)
-            self._p_status = UNSAVED

     def _p_format_oid(self):
         return format_oid(self._p_oid)
@@ -148,6 +148,7 @@
     def _p_set_status_ghost(self, getattribute=object.__getattribute__):
         d = getattribute(self, '__dict__')
         d.clear()
+        self._p_state0 = None
         self._p_status = GHOST

     def _p_set_status_saved(self):
@@ -156,6 +157,8 @@
     def _p_set_status_unsaved(self):
         if self._p_status == GHOST:
             self._p_load_state()
+        if hasattr(self, '_p_resolve'):
+            self._p_state0 = deepcopy(self.__getstate__())
         self._p_status = UNSAVED

     def _p_is_ghost(self):
diff -urN -b tmp/Durus-3.5.orig/_persistent.c Durus-3.5/_persistent.c
--- tmp/Durus-3.5.orig/_persistent.c    2006-08-01 16:21:10.000000000 -0400
+++ Durus-3.5/_persistent.c     2006-10-08 20:43:22.000000000 -0400
@@ -16,6 +16,7 @@
        PyObject *p_serial;
        PyObject *p_connection;
        PyObject *p_oid;
+       PyObject *p_state0;
 } PersistentBaseObject;

 typedef struct {
@@ -39,6 +40,8 @@
        Py_INCREF(x->p_connection);
        x->p_oid = Py_None;
        Py_INCREF(x->p_oid);
+       x->p_state0 = Py_None;
+       Py_INCREF(x->p_state0);
        return (PyObject *)x;
 }

@@ -50,6 +53,7 @@
        Py_XDECREF(self->p_connection);
        Py_XDECREF(self->p_oid);
        Py_XDECREF(self->p_serial);
+       Py_XDECREF(self->p_state0);
        PyObject_GC_Del(self);
        Py_TRASHCAN_SAFE_END(self);
 }
@@ -60,6 +64,7 @@
        Py_VISIT(self->p_connection);
        Py_VISIT(self->p_oid);
        Py_VISIT(self->p_serial);
+       Py_VISIT(self->p_state0);
        return 0;
 }

@@ -69,6 +74,7 @@
        Py_CLEAR(self->p_connection);
        Py_CLEAR(self->p_oid);
        Py_CLEAR(self->p_serial);
+       Py_CLEAR(self->p_state0);
        return 0;
 }

@@ -186,6 +192,7 @@
        {"_p_status", T_INT, offsetof(PersistentBaseObject, p_status)},
        {"_p_connection", T_OBJECT_EX, offsetof(PersistentBaseObject,
p_connection)},
        {"_p_oid", T_OBJECT_EX, offsetof(PersistentBaseObject, p_oid)},
+       {"_p_state0", T_OBJECT_EX, offsetof(PersistentBaseObject, p_state0)},
        {NULL}
 };

reply