durusmail: durus-users: multithread and implicit cache management (was: Re: [Durus-users] RELEASED: Durus-3.5)
RELEASED: Durus-3.5
2006-08-16
2006-09-21
multithread and implicit cache management (was: Re: [Durus-users] RELEASED: Durus-3.5)
2006-09-21
Re: multithread and implicit cache management
2006-09-21
2006-09-22
2006-09-22
2006-09-22
2006-09-26
2006-09-21
2006-09-21
2006-09-22
2006-09-22
Re: RELEASED: Durus-3.5
2006-09-21
multithread and implicit cache management (was: Re: [Durus-users] RELEASED: Durus-3.5)
Jesus Cea
2006-09-21
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Jesus Cea wrote:
> I just discovered a regressiĆ³n problem with this approach, related to
> multithreading.

Some crash examples:

=====
Exception in thread Thread-16566:
Traceback (most recent call last):
[...]
  File "/export/home/correo/lmtp.py", line 85, in _monitor
    persistencia.commit()
  File "/usr/local/lib/python2.4/site-packages/durus/connection.py",
line 281, in commit
    self.shrink_cache()
  File "/usr/local/lib/python2.4/site-packages/durus/connection.py",
line 216, in shrink_cache
    self.cache.shrink(self)
  File "/usr/local/lib/python2.4/site-packages/durus/connection.py",
line 396, in shrink
    heap = self._build_heap(connection.get_transaction_serial())
  File "/usr/local/lib/python2.4/site-packages/durus/connection.py",
line 374, in _build_heap
    for oid in islice(chain(all, all), start, start + len(all)):
RuntimeError: dictionary changed size during iteration
=====

Other strange error:

=====
  File "/export/home/correo/lmtp.py", line 85, in _monitor
    persistencia.commit()
  File "/usr/local/lib/python2.5/site-packages/durus/connection.py",
line 281, in commit
    self.shrink_cache()
  File "/usr/local/lib/python2.5/site-packages/durus/connection.py",
line 216, in shrink_cache
    self.cache.shrink(self)
  File "/usr/local/lib/python2.5/site-packages/durus/connection.py",
line 396, in shrink
    heap = self._build_heap(connection.get_transaction_serial())
  File "/usr/local/lib/python2.5/site-packages/durus/connection.py",
line 376, in _build_heap
    obj = all[oid]
  File "/usr/local/lib/python2.5/weakref.py", line 56, in __getitem__
    raise KeyError, key
KeyError: '\x00\x00\x00\x00\x00&\x94a'
=====

I know that Durus is unsupported under multithreading environments, but
when you are using a filestorage and threads, you can't open multiple
connections. And when you use a serverstorage, sharing a cache is the
way to go to improve hit rate, performance, and decrease RAM consumption.

So the issue is serious.

Some approaches:

* Instead of a WeakValueDictionary, just use a normal dictionary, with
weakrefs as values. When you create the weakref, define a callback. When
the callback is called, instead of updating the dictionary, just add the
OID of the deleted object to a SET (can be updated safely in a
multithreading environment). When we do a cache shrink, replace that SET
atomically with a new empty set (something like "a=self.set();
self.set=Set()", delete the stale objects from the cache using the old SET.

* When referencing objects from the cache, keep an strong reference
around. We can keep those references until a commit/abort for that
thread comes. Then we delete them. Since the commit/abort is sure to be
called with a lock adquired, we can be sure that the collection will be
done with a lock. The code seems fairly trivial (the "recent_objects"
could be used for this. I don't understand the logic of
"recent_objects"). Beware, this code is not optimized for speed:

===
- --- connection.py.old   Thu Sep 21 21:25:39 2006
+++ connection.py       Thu Sep 21 21:47:55 2006
@@ -16,6 +16,8 @@
 from time import time
 from weakref import WeakValueDictionary, ref

+from threading import currentThread
+
 ROOT_OID = p64(0)

 class Connection(ConnectionBase):
@@ -314,6 +316,7 @@
         self.recent_objects = set()
         self.set_size(size)
         self.finger = 0
+        self.strong=dict()

     def get_size(self):
         """Return the target size of the cache."""
@@ -351,7 +354,11 @@
         return obj

     def get(self, oid):
- -        return self.objects.get(oid)
+        obj=self.objects.get(oid)
+        if obj!=None :
+          tr=currentThread()
+          self.strong.setdefault(tr,set()).add(obj)
+        return obj

     def __setitem__(self, key, obj):
         assert key not in self.objects or self.objects[key] is obj
@@ -386,6 +393,11 @@
         """(connection:Connection)
         Try to reduce the size of self.objects.
         """
+
+        tr=currentThread()
+        if tr in self.strong :
+          del self.strong[tr]
+
         current = len(self.objects)
         if current <= self.size:
             # No excess.

===

(this code is in production in my servers now, but haven't been
stress-tested)


I agree that these techniques "penalize" the single thread case. I would
be happy if the stock durus code would expose some "lightweight" hooks
to be able to operate efficiently both in single-thread and multi-thread.

(In fact, the first approach would have similar efficience, since
"WeakValueDictionary" is coded in python in the standard python library :-p)

- --
Jesus Cea Avion                         _/_/      _/_/_/        _/_/_/
jcea@argo.es http://www.argo.es/~jcea/ _/_/    _/_/  _/_/    _/_/  _/_/
jabber / xmpp:jcea@jabber.org         _/_/    _/_/          _/_/_/_/_/
                               _/_/  _/_/    _/_/          _/_/  _/_/
"Things are not so easy"      _/_/  _/_/    _/_/  _/_/    _/_/  _/_/
"My name is Dump, Core Dump"   _/_/_/        _/_/_/      _/_/  _/_/
"El amor es poner tu felicidad en la felicidad de otro" - Leibniz
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.2.2 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iQCVAwUBRRLvE5lgi5GaxT1NAQIrUAP/Ssu2LVZDi9sD7Lo4+WGspRn/M63TtFuv
IqBbyPcnp4OoiH9DEly2RjDBEI/Annf2LEzvhN6Re+sDHs8EKm2AcKPjhkR0c7eR
6/Z8OLDxU1s1Mg/cdi6LwamXMd55KqQdUT9ch2U89tgjYKrZQROwrD+7BEPKZyiQ
sO8UkktYzyw=
=oEZ4
-----END PGP SIGNATURE-----
reply